Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func buildGRPCExporter(cfg *config.Agent, m *metrics.Metrics) (node.TerminalFunc
return nil, fmt.Errorf("missing target host or port: %s:%d",
cfg.TargetHost, cfg.TargetPort)
}
grpcExporter, err := exporter.StartGRPCProto(cfg.TargetHost, cfg.TargetPort, cfg.GRPCMessageMaxFlows, m)
grpcExporter, err := exporter.StartGRPCProto(cfg.TargetHost, cfg.TargetPort, cfg.TargetTLSCACertPath, cfg.TargetTLSUserCertPath, cfg.TargetTLSUserKeyPath, cfg.GRPCMessageMaxFlows, m)
if err != nil {
return nil, err
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ type Agent struct {
TargetHost string `env:"TARGET_HOST"`
// Port is the port the flow or packet collector, when the EXPORT variable is set to "grpc"
TargetPort int `env:"TARGET_PORT"`
// CA certificate path of the target, when TLS is used. Empty by default (no TLS).
TargetTLSCACertPath string `env:"TARGET_TLS_CA_CERT_PATH"`
// User certificate path, when mTLS is used. Empty by default (no mTLS).
TargetTLSUserCertPath string `env:"TARGET_TLS_USER_CERT_PATH"`
// User certificate key path, when mTLS is used. Empty by default (no mTLS).
TargetTLSUserKeyPath string `env:"TARGET_TLS_USER_KEY_PATH"`
// GRPCMessageMaxFlows specifies the limit, in number of flows, of each GRPC message. Messages
// larger than that number will be split and submitted sequentially.
GRPCMessageMaxFlows int `env:"GRPC_MESSAGE_MAX_FLOWS" envDefault:"10000"`
Expand Down
4 changes: 2 additions & 2 deletions pkg/exporter/grpc_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ type GRPCProto struct {
batchCounter prometheus.Counter
}

func StartGRPCProto(hostIP string, hostPort int, maxFlowsPerMessage int, m *metrics.Metrics) (*GRPCProto, error) {
clientConn, err := grpc.ConnectClient(hostIP, hostPort)
func StartGRPCProto(hostIP string, hostPort int, caPath, userCertPath, userKeyPath string, maxFlowsPerMessage int, m *metrics.Metrics) (*GRPCProto, error) {
clientConn, err := grpc.ConnectClient(hostIP, hostPort, caPath, userCertPath, userKeyPath)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/exporter/grpc_proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestIPv4GRPCProto_ExportFlows_AgentIP(t *testing.T) {
defer coll.Close()

// Start GRPCProto exporter stage
exporter, err := StartGRPCProto("127.0.0.1", port, 1000, metrics.NoOp())
exporter, err := StartGRPCProto("127.0.0.1", port, "", "", "", 1000, metrics.NoOp())
require.NoError(t, err)

// Send some flows to the input of the exporter stage
Expand Down Expand Up @@ -71,7 +71,7 @@ func TestIPv6GRPCProto_ExportFlows_AgentIP(t *testing.T) {
defer coll.Close()

// Start GRPCProto exporter stage
exporter, err := StartGRPCProto("::1", port, 1000, metrics.NoOp())
exporter, err := StartGRPCProto("::1", port, "", "", "", 1000, metrics.NoOp())
require.NoError(t, err)

// Send some flows to the input of the exporter stage
Expand Down Expand Up @@ -114,7 +114,7 @@ func TestGRPCProto_SplitLargeMessages(t *testing.T) {

const msgMaxLen = 10000
// Start GRPCProto exporter stage
exporter, err := StartGRPCProto("127.0.0.1", port, msgMaxLen, metrics.NoOp())
exporter, err := StartGRPCProto("127.0.0.1", port, "", "", "", msgMaxLen, metrics.NoOp())
require.NoError(t, err)

// Send a message much longer than the limit length
Expand Down
45 changes: 41 additions & 4 deletions pkg/grpc/flow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,60 @@
package flowgrpc

import (
"crypto/tls"
"crypto/x509"
"fmt"
"os"

"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)

var clog = logrus.WithField("component", "grpc.Client")

// ClientConnection wraps a gRPC+protobuf connection
type ClientConnection struct {
client pbflow.CollectorClient
conn *grpc.ClientConn
}

func ConnectClient(hostIP string, hostPort int) (*ClientConnection, error) {
// TODO: allow configuring some options (keepalive, backoff...)
func ConnectClient(hostIP string, hostPort int, caPath, userCertPath, userKeyPath string) (*ClientConnection, error) {
// TODO: allow configuring more options (keepalive, backoff...)
var opts []grpc.DialOption
if caPath == "" {
clog.Info("Starting GRPC client - no TLS")
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
} else {
// Configure TLS (server CA)
caCert, err := os.ReadFile(caPath)
if err != nil {
return nil, fmt.Errorf("cannot load CA certificate: %w", err)
}
pool := x509.NewCertPool()
pool.AppendCertsFromPEM(caCert)
tlsConfig := &tls.Config{
RootCAs: pool,
}
if userCertPath != "" && userKeyPath != "" {
clog.Info("Starting GRPC client with mTLS")
// Configure mTLS (client certificates)
cert, err := tls.LoadX509KeyPair(userCertPath, userKeyPath)
if err != nil {
return nil, fmt.Errorf("cannot load client certificate: %w", err)
}
tlsConfig.Certificates = []tls.Certificate{cert}
} else {
clog.Info("Starting GRPC client with TLS")
}

opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
}
socket := utils.GetSocket(hostIP, hostPort)
conn, err := grpc.NewClient(socket,
grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.NewClient(socket, opts...)
if err != nil {
return nil, err
}
Expand Down
194 changes: 185 additions & 9 deletions pkg/grpc/flow/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,32 @@ package flowgrpc

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"testing"
"time"

"github.com/mariomac/guara/pkg/test"
test2 "github.com/mariomac/guara/pkg/test"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/protobuf/types/known/timestamppb"
)

const timeout = 5 * time.Second

func TestGRPCCommunication(t *testing.T) {
port, err := test.FreeTCPPort()
port, err := test2.FreeTCPPort()
require.NoError(t, err)
serverOut := make(chan *pbflow.Records)
_, err = StartCollector(port, serverOut)
require.NoError(t, err)
cc, err := ConnectClient("127.0.0.1", port)
cc, err := ConnectClient("127.0.0.1", port, "", "", "")
require.NoError(t, err)
client := cc.Client()

Expand Down Expand Up @@ -93,7 +99,7 @@ func TestGRPCCommunication(t *testing.T) {
}

func TestConstructorOptions(t *testing.T) {
port, err := test.FreeTCPPort()
port, err := test2.FreeTCPPort()
require.NoError(t, err)
intercepted := make(chan struct{})
// Override the default GRPC collector to verify that StartCollector is applying the
Expand All @@ -109,7 +115,7 @@ func TestConstructorOptions(t *testing.T) {
return handler(ctx, req)
})))
require.NoError(t, err)
cc, err := ConnectClient("127.0.0.1", port)
cc, err := ConnectClient("127.0.0.1", port, "", "", "")
require.NoError(t, err)
client := cc.Client()

Expand All @@ -127,13 +133,13 @@ func TestConstructorOptions(t *testing.T) {
}

func BenchmarkIPv4GRPCCommunication(b *testing.B) {
port, err := test.FreeTCPPort()
port, err := test2.FreeTCPPort()
require.NoError(b, err)
serverOut := make(chan *pbflow.Records, 1000)
collector, err := StartCollector(port, serverOut)
require.NoError(b, err)
defer collector.Close()
cc, err := ConnectClient("127.0.0.1", port)
cc, err := ConnectClient("127.0.0.1", port, "", "", "")
require.NoError(b, err)
defer cc.Close()
client := cc.Client()
Expand Down Expand Up @@ -188,13 +194,13 @@ func BenchmarkIPv4GRPCCommunication(b *testing.B) {
}

func BenchmarkIPv6GRPCCommunication(b *testing.B) {
port, err := test.FreeTCPPort()
port, err := test2.FreeTCPPort()
require.NoError(b, err)
serverOut := make(chan *pbflow.Records, 1000)
collector, err := StartCollector(port, serverOut)
require.NoError(b, err)
defer collector.Close()
cc, err := ConnectClient("::1", port)
cc, err := ConnectClient("::1", port, "", "", "")
require.NoError(b, err)
defer cc.Close()
client := cc.Client()
Expand Down Expand Up @@ -249,3 +255,173 @@ func BenchmarkIPv6GRPCCommunication(b *testing.B) {
<-serverOut
}
}

// Note: there's more tests focused on TLS in FLP, that also cover agent's functions
func TestGRPCCommunication_TLS(t *testing.T) {
_, _, _, ca, cert, key, cleanup := test.CreateAllCerts(t)
defer cleanup()
opts, err := buildTLSServerOptions(cert, key, "")
require.NoError(t, err)

port, err := test2.FreeTCPPort()
require.NoError(t, err)
serverOut := make(chan *pbflow.Records)
_, err = StartCollector(port, serverOut, WithGRPCServerOptions(opts...))
require.NoError(t, err)
cc, err := ConnectClient("127.0.0.1", port, ca, "", "")
require.NoError(t, err)
client := cc.Client()

go func() {
_, err = client.Send(context.Background(),
&pbflow.Records{Entries: []*pbflow.Record{{
EthProtocol: 123, Network: &pbflow.Network{
SrcAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x11223344},
},
DstAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x55667788},
},
}}},
})
require.NoError(t, err)
}()

var rs *pbflow.Records
select {
case rs = <-serverOut:
case <-time.After(timeout):
require.Fail(t, "timeout waiting for flows")
}
assert.Len(t, rs.Entries, 1)
r := rs.Entries[0]
assert.EqualValues(t, 123, r.EthProtocol)
assert.EqualValues(t, 0x11223344, r.GetNetwork().GetSrcAddr().GetIpv4())
assert.EqualValues(t, 0x55667788, r.GetNetwork().GetDstAddr().GetIpv4())

select {
case rs = <-serverOut:
assert.Failf(t, "shouldn't have received any flow", "Got: %#v", rs)
default:
// ok!
}
}

// Note: there's more tests focused on TLS in FLP, that also cover agent's functions
func TestGRPCCommunication_MutualTLS(t *testing.T) {
clCA, clCert, clKey, ca, cert, key, cleanup := test.CreateAllCerts(t)
defer cleanup()
opts, err := buildTLSServerOptions(cert, key, clCA)
require.NoError(t, err)

port, err := test2.FreeTCPPort()
require.NoError(t, err)
serverOut := make(chan *pbflow.Records)
_, err = StartCollector(port, serverOut, WithGRPCServerOptions(opts...))
require.NoError(t, err)
cc, err := ConnectClient("127.0.0.1", port, ca, clCert, clKey)
require.NoError(t, err)
client := cc.Client()

go func() {
_, err = client.Send(context.Background(),
&pbflow.Records{Entries: []*pbflow.Record{{
EthProtocol: 123, Network: &pbflow.Network{
SrcAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x11223344},
},
DstAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x55667788},
},
}}},
})
require.NoError(t, err)
}()

var rs *pbflow.Records
select {
case rs = <-serverOut:
case <-time.After(timeout):
require.Fail(t, "timeout waiting for flows")
}
assert.Len(t, rs.Entries, 1)
r := rs.Entries[0]
assert.EqualValues(t, 123, r.EthProtocol)
assert.EqualValues(t, 0x11223344, r.GetNetwork().GetSrcAddr().GetIpv4())
assert.EqualValues(t, 0x55667788, r.GetNetwork().GetDstAddr().GetIpv4())

select {
case rs = <-serverOut:
assert.Failf(t, "shouldn't have received any flow", "Got: %#v", rs)
default:
// ok!
}
}

func TestGRPCCommunication_MutualTLS_InvalidCert(t *testing.T) {
_, clCert, clKey, ca, cert, key, cleanup := test.CreateAllCerts(t)
defer cleanup()

// Here we pass the server CA, which was NOT used to generate the client cert, which means that the client cert should be rejected upon connecting
opts, err := buildTLSServerOptions(cert, key, ca)
require.NoError(t, err)

port, err := test2.FreeTCPPort()
require.NoError(t, err)
serverOut := make(chan *pbflow.Records)
_, err = StartCollector(port, serverOut, WithGRPCServerOptions(opts...))
require.NoError(t, err)
cc, err := ConnectClient("127.0.0.1", port, ca, clCert, clKey)
require.NoError(t, err)
client := cc.Client()

go func() {
_, err = client.Send(context.Background(),
&pbflow.Records{Entries: []*pbflow.Record{{
EthProtocol: 123, Network: &pbflow.Network{
SrcAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x11223344},
},
DstAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x55667788},
},
}}},
})
require.ErrorContains(t, err, "tls: unknown certificate authority")
}()

select {
case rs := <-serverOut:
assert.Failf(t, "shouldn't have received any flow", "Got: %#v", rs)
default:
// ok!
}
}

func buildTLSServerOptions(certPath, keyPath, clientCAPath string) ([]grpc.ServerOption, error) {
var opts []grpc.ServerOption
if certPath != "" && keyPath != "" {
// TLS
cert, err := tls.LoadX509KeyPair(certPath, keyPath)
if err != nil {
return nil, fmt.Errorf("cannot load configured certificate: %w", err)
}
tlsCfg := &tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.NoClientCert,
}
if clientCAPath != "" {
// mTLS
caCert, err := os.ReadFile(clientCAPath)
if err != nil {
return nil, fmt.Errorf("cannot load configured client CA certificate: %w", err)
}
pool := x509.NewCertPool()
pool.AppendCertsFromPEM(caCert)
tlsCfg.ClientAuth = tls.RequireAndVerifyClientCert
tlsCfg.ClientCAs = pool
}
opts = append(opts, grpc.Creds(credentials.NewTLS(tlsCfg)))
}
return opts, nil
}
Loading