Skip to content

Commit 35b0d26

Browse files
authored
Add udp transport to socket plugin (#101)
* Add udp transport to socket plugin * Make unix socket default * Fix linting issues * Incorporate PR comment suggestions * Fix unit test config after previous changes
1 parent 8216de3 commit 35b0d26

File tree

2 files changed

+114
-9
lines changed

2 files changed

+114
-9
lines changed

plugins/transport/socket/main.go

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"net"
99
"os"
10+
"strings"
1011
"time"
1112

1213
"github.com/infrawatch/apputils/logging"
@@ -17,6 +18,8 @@ import (
1718

1819
const (
1920
maxBufferSize = 65535
21+
udp = "udp"
22+
unix = "unix"
2023
)
2124

2225
var (
@@ -31,7 +34,9 @@ func rate() int64 {
3134
}
3235

3336
type configT struct {
34-
Path string `validate:"required"`
37+
Path string `validate:"required_without=Socketaddr"`
38+
Type string
39+
Socketaddr string `validate:"required_without=Path"`
3540
DumpMessages struct {
3641
Enabled bool
3742
Path string
@@ -69,9 +74,7 @@ type Socket struct {
6974
dumpFile *os.File
7075
}
7176

72-
// Run implements type Transport
73-
func (s *Socket) Run(ctx context.Context, w transport.WriteFn, done chan bool) {
74-
77+
func (s *Socket) initUnixSocket() *net.UnixConn {
7578
var laddr net.UnixAddr
7679
laddr.Name = s.conf.Path
7780
laddr.Net = "unixgram"
@@ -80,17 +83,49 @@ func (s *Socket) Run(ctx context.Context, w transport.WriteFn, done chan bool) {
8083
pc, err := net.ListenUnixgram("unixgram", &laddr)
8184
if err != nil {
8285
s.logger.Errorf(err, "failed to bind unix socket %s", laddr.Name)
83-
return
86+
return nil
8487
}
8588
// create socket file if it does not exist
8689
skt, err := pc.File()
8790
if err != nil {
8891
s.logger.Errorf(err, "failed to retrieve file handle for %s", laddr.Name)
89-
return
92+
return nil
9093
}
9194
skt.Close()
9295

9396
s.logger.Infof("socket listening on %s", laddr.Name)
97+
98+
return pc
99+
}
100+
101+
func (s *Socket) initUDPSocket() *net.UDPConn {
102+
addr, err := net.ResolveUDPAddr(udp, s.conf.Socketaddr)
103+
if err != nil {
104+
s.logger.Errorf(err, "failed to resolve udp address: %s", s.conf.Socketaddr)
105+
return nil
106+
}
107+
pc, err := net.ListenUDP(udp, addr)
108+
if err != nil {
109+
s.logger.Errorf(err, "failed to bind udp socket to addr: %s", s.conf.Socketaddr)
110+
return nil
111+
}
112+
113+
s.logger.Infof("socket listening on %s", s.conf.Socketaddr)
114+
115+
return pc
116+
}
117+
118+
// Run implements type Transport
119+
func (s *Socket) Run(ctx context.Context, w transport.WriteFn, done chan bool) {
120+
var pc net.Conn
121+
if s.conf.Type == udp {
122+
pc = s.initUDPSocket()
123+
} else {
124+
pc = s.initUnixSocket()
125+
}
126+
if pc == nil {
127+
s.logger.Errorf(nil, "Failed to initialize socket transport plugin")
128+
}
94129
go func(maxBuffSize int64) {
95130
msgBuffer := make([]byte, maxBuffSize)
96131
for {
@@ -136,14 +171,16 @@ func (s *Socket) Run(ctx context.Context, w transport.WriteFn, done chan bool) {
136171
}
137172
Done:
138173
pc.Close()
139-
os.Remove(s.conf.Path)
174+
if s.conf.Type == unix {
175+
os.Remove(s.conf.Path)
176+
}
140177
s.dumpFile.Close()
141178
s.logger.Infof("exited")
142179
}
143180

144181
// Listen ...
145182
func (s *Socket) Listen(e data.Event) {
146-
fmt.Printf("Received event: %v\n", e)
183+
fmt.Printf("received event: %v\n", e)
147184
}
148185

149186
// Config load configurations
@@ -155,6 +192,7 @@ func (s *Socket) Config(c []byte) error {
155192
}{
156193
Path: "/dev/stdout",
157194
},
195+
Type: unix,
158196
}
159197

160198
err := config.ParseConfig(bytes.NewReader(c), &s.conf)
@@ -171,6 +209,20 @@ func (s *Socket) Config(c []byte) error {
171209
s.dumpBuf = bufio.NewWriter(s.dumpFile)
172210
}
173211

212+
s.conf.Type = strings.ToLower(s.conf.Type)
213+
if s.conf.Type != unix && s.conf.Type != udp {
214+
return fmt.Errorf("unable to determine socket type from configuration file. Should be either \"unix\" or \"udp\", received: %s",
215+
s.conf.Type)
216+
}
217+
218+
if s.conf.Type == unix && s.conf.Path == "" {
219+
return fmt.Errorf("the path configuration option is required when using unix socket type")
220+
}
221+
222+
if s.conf.Type == udp && s.conf.Socketaddr == "" {
223+
return fmt.Errorf("the socketaddr configuration option is required when using udp socket type")
224+
}
225+
174226
return nil
175227
}
176228

plugins/transport/socket/main_test.go

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717

1818
const regularBuffSize = 16384
1919

20-
func TestSocketTransport(t *testing.T) {
20+
func TestUnixSocketTransport(t *testing.T) {
2121
tmpdir, err := ioutil.TempDir(".", "socket_test_tmp")
2222
require.NoError(t, err)
2323
defer os.RemoveAll(tmpdir)
@@ -81,3 +81,56 @@ func TestSocketTransport(t *testing.T) {
8181
wskt.Close()
8282
})
8383
}
84+
85+
func TestUdpSocketTransport(t *testing.T) {
86+
tmpdir, err := ioutil.TempDir(".", "socket_test_tmp")
87+
require.NoError(t, err)
88+
defer os.RemoveAll(tmpdir)
89+
90+
logpath := path.Join(tmpdir, "test.log")
91+
logger, err := logging.NewLogger(logging.DEBUG, logpath)
92+
require.NoError(t, err)
93+
94+
trans := Socket{
95+
conf: configT{
96+
Socketaddr: "127.0.0.1:8642",
97+
Type: "udp",
98+
},
99+
logger: &logWrapper{
100+
l: logger,
101+
},
102+
}
103+
104+
t.Run("test large message transport", func(t *testing.T) {
105+
msg := make([]byte, regularBuffSize)
106+
addition := "wubba lubba dub dub"
107+
for i := 0; i < regularBuffSize; i++ {
108+
msg[i] = byte('X')
109+
}
110+
msg[regularBuffSize-1] = byte('$')
111+
msg = append(msg, []byte(addition)...)
112+
113+
// verify transport
114+
ctx, cancel := context.WithCancel(context.Background())
115+
wg := sync.WaitGroup{}
116+
go trans.Run(ctx, func(mess []byte) {
117+
wg.Add(1)
118+
strmsg := string(mess)
119+
assert.Equal(t, regularBuffSize+len(addition), len(strmsg)) // we received whole message
120+
assert.Equal(t, addition, strmsg[len(strmsg)-len(addition):]) // and the out-of-band part is correct
121+
wg.Done()
122+
}, make(chan bool))
123+
124+
// write to socket
125+
addr, err := net.ResolveUDPAddr("udp", "127.0.0.1:8642")
126+
require.NoError(t, err)
127+
wskt, err := net.DialUDP("udp", nil, addr)
128+
require.NoError(t, err)
129+
_, err = wskt.Write(msg)
130+
require.NoError(t, err)
131+
132+
cancel()
133+
wg.Wait()
134+
wskt.Close()
135+
})
136+
}

0 commit comments

Comments
 (0)