Skip to content

Commit b956a85

Browse files
paramiteleifmadsen
andauthored
Increase reading buffer size on large messages (#74) (#83)
* Increase reading buffer size on large messages (#74) When larger than MaxBufferSize messages are being sent, the whole message processing is failing badly. This can easily happen when socket plugin is used in Ceilometer metrics or events processing. Fixes: https://bugzilla.redhat.com/show_bug.cgi?id=2016460 (cherry picked from commit f13e241) * Ci opstools fix (#85) (#87) (#89) * Fix centos-opstools repo in CI * Avoid gpgcheck for centos repo Co-authored-by: Leif Madsen <[email protected]> Co-authored-by: Matthias Runge <[email protected]> (cherry picked from commit 04fc691) (cherry picked from commit 33d4121) Co-authored-by: Martin Mágr <[email protected]> Co-authored-by: Leif Madsen <[email protected]>
1 parent 33d4121 commit b956a85

File tree

2 files changed

+102
-6
lines changed

2 files changed

+102
-6
lines changed

plugins/transport/socket/main.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ import (
1515
"github.com/infrawatch/sg-core/pkg/transport"
1616
)
1717

18-
const maxBufferSize = 16384
18+
const (
19+
maxBufferSize = 65535
20+
)
1921

2022
var (
2123
msgCount int64
@@ -71,22 +73,27 @@ type Socket struct {
7173
func (s *Socket) Run(ctx context.Context, w transport.WriteFn, done chan bool) {
7274

7375
var laddr net.UnixAddr
74-
7576
laddr.Name = s.conf.Path
7677
laddr.Net = "unixgram"
7778

7879
os.Remove(s.conf.Path)
79-
8080
pc, err := net.ListenUnixgram("unixgram", &laddr)
8181
if err != nil {
82-
s.logger.Errorf(err, "failed to listen on unix socket %s", laddr.Name)
82+
s.logger.Errorf(err, "failed to bind unix socket %s", laddr.Name)
8383
return
8484
}
85+
// create socket file if it does not exist
86+
skt, err := pc.File()
87+
if err != nil {
88+
s.logger.Errorf(err, "failed to retrieve file handle for %s", laddr.Name)
89+
return
90+
}
91+
skt.Close()
8592

8693
s.logger.Infof("socket listening on %s", laddr.Name)
87-
go func(buffSize int64) {
94+
go func(maxBuffSize int64) {
95+
msgBuffer := make([]byte, maxBuffSize)
8896
for {
89-
msgBuffer := make([]byte, buffSize)
9097
n, err := pc.Read(msgBuffer)
9198
if err != nil || n < 1 {
9299
if err != nil {
@@ -96,6 +103,11 @@ func (s *Socket) Run(ctx context.Context, w transport.WriteFn, done chan bool) {
96103
return
97104
}
98105

106+
// whole buffer was used, so we are potentially handling larger message
107+
if n == len(msgBuffer) {
108+
s.logger.Warnf("full read buffer used")
109+
}
110+
99111
if s.conf.DumpMessages.Enabled {
100112
_, err := s.dumpBuf.Write(msgBuffer[:n])
101113
if err != nil {
@@ -107,6 +119,7 @@ func (s *Socket) Run(ctx context.Context, w transport.WriteFn, done chan bool) {
107119
}
108120
s.dumpBuf.Flush()
109121
}
122+
110123
w(msgBuffer[:n])
111124
msgCount++
112125
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"io/ioutil"
6+
"net"
7+
"os"
8+
"path"
9+
"sync"
10+
"testing"
11+
"time"
12+
13+
"github.com/infrawatch/apputils/logging"
14+
"github.com/stretchr/testify/require"
15+
"gopkg.in/go-playground/assert.v1"
16+
)
17+
18+
const regularBuffSize = 16384
19+
20+
func TestSocketTransport(t *testing.T) {
21+
tmpdir, err := ioutil.TempDir(".", "socket_test_tmp")
22+
require.NoError(t, err)
23+
defer os.RemoveAll(tmpdir)
24+
25+
logpath := path.Join(tmpdir, "test.log")
26+
logger, err := logging.NewLogger(logging.DEBUG, logpath)
27+
require.NoError(t, err)
28+
29+
sktpath := path.Join(tmpdir, "socket")
30+
skt, err := os.OpenFile(sktpath, os.O_RDWR|os.O_CREATE, os.ModeSocket|os.ModePerm)
31+
require.NoError(t, err)
32+
defer skt.Close()
33+
34+
trans := Socket{
35+
conf: configT{
36+
Path: sktpath,
37+
},
38+
logger: &logWrapper{
39+
l: logger,
40+
},
41+
}
42+
43+
t.Run("test large message transport", func(t *testing.T) {
44+
msg := make([]byte, regularBuffSize)
45+
addition := "wubba lubba dub dub"
46+
for i := 0; i < regularBuffSize; i++ {
47+
msg[i] = byte('X')
48+
}
49+
msg[regularBuffSize-1] = byte('$')
50+
msg = append(msg, []byte(addition)...)
51+
52+
// verify transport
53+
ctx, cancel := context.WithCancel(context.Background())
54+
wg := sync.WaitGroup{}
55+
go trans.Run(ctx, func(mess []byte) {
56+
wg.Add(1)
57+
strmsg := string(mess)
58+
assert.Equal(t, regularBuffSize+len(addition), len(strmsg)) // we received whole message
59+
assert.Equal(t, addition, strmsg[len(strmsg)-len(addition):]) // and the out-of-band part is correct
60+
wg.Done()
61+
}, make(chan bool))
62+
63+
// wait for socket file to be created
64+
for {
65+
stat, err := os.Stat(sktpath)
66+
require.NoError(t, err)
67+
if stat.Mode()&os.ModeType == os.ModeSocket {
68+
break
69+
}
70+
time.Sleep(250 * time.Millisecond)
71+
}
72+
73+
// write to socket
74+
wskt, err := net.DialUnix("unixgram", nil, &net.UnixAddr{Name: sktpath, Net: "unixgram"})
75+
require.NoError(t, err)
76+
_, err = wskt.Write(msg)
77+
require.NoError(t, err)
78+
79+
cancel()
80+
wg.Wait()
81+
wskt.Close()
82+
})
83+
}

0 commit comments

Comments
 (0)