Skip to content

Commit 6bd6cab

Browse files
authored
Merge pull request #6997 from onflow/petera/make-peer-cache-config
[Network] Make ProtocolPeerCache configurable
2 parents 8fbbd56 + 16503c2 commit 6bd6cab

File tree

9 files changed

+80
-62
lines changed

9 files changed

+80
-62
lines changed

cmd/access/node_builder/access_node_builder.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2362,6 +2362,7 @@ func (builder *FlowAccessNodeBuilder) initPublicLibp2pNode(networkKey crypto.Pri
23622362
&p2pbuilderconfig.UnicastConfig{
23632363
Unicast: builder.FlowConfig.NetworkConfig.Unicast,
23642364
}).
2365+
SetProtocolPeerCacheList(protocols.FlowProtocolID(builder.SporkID)).
23652366
SetBasicResolver(builder.Resolver).
23662367
SetSubscriptionFilter(networkingsubscription.NewRoleBasedFilter(flow.RoleAccess, builder.IdentityProvider)).
23672368
SetConnectionManager(connManager).

cmd/scaffold.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,7 @@ func (fnb *FlowNodeBuilder) BuildPublicLibp2pNode(address string, bootstrapIdent
568568
&p2pbuilderconfig.UnicastConfig{
569569
Unicast: fnb.FlowConfig.NetworkConfig.Unicast,
570570
}).
571+
SetProtocolPeerCacheList(protocols.FlowProtocolID(fnb.SporkID)).
571572
SetSubscriptionFilter(
572573
subscription.NewRoleBasedFilter(
573574
subscription.UnstakedRole, fnb.IdentityProvider,

network/p2p/builder.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/libp2p/go-libp2p/core/host"
99
"github.com/libp2p/go-libp2p/core/network"
1010
"github.com/libp2p/go-libp2p/core/peer"
11+
"github.com/libp2p/go-libp2p/core/protocol"
1112
"github.com/libp2p/go-libp2p/core/routing"
1213
madns "github.com/multiformats/go-multiaddr-dns"
1314
"github.com/rs/zerolog"
@@ -206,7 +207,8 @@ type NodeConfig struct {
206207
// logger used to provide logging
207208
Logger zerolog.Logger `validate:"required"`
208209
// reference to the libp2p host (https://godoc.org/github.com/libp2p/go-libp2p/core/host)
209-
Host host.Host `validate:"required"`
210-
PeerManager PeerManager
211-
DisallowListCacheCfg *DisallowListCacheConfig `validate:"required"`
210+
Host host.Host `validate:"required"`
211+
PeerManager PeerManager
212+
DisallowListCacheCfg *DisallowListCacheConfig `validate:"required"`
213+
ProtocolPeerCacheList []protocol.ID
212214
}

network/p2p/builder/libp2pNodeBuilder.go

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/libp2p/go-libp2p/core/connmgr"
1414
"github.com/libp2p/go-libp2p/core/host"
1515
"github.com/libp2p/go-libp2p/core/network"
16+
"github.com/libp2p/go-libp2p/core/protocol"
1617
"github.com/libp2p/go-libp2p/core/routing"
1718
"github.com/libp2p/go-libp2p/core/transport"
1819
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
@@ -63,16 +64,17 @@ type LibP2PNodeBuilder struct {
6364
metricsConfig *p2pbuilderconfig.MetricsConfig
6465
basicResolver madns.BasicResolver
6566

66-
resourceManager network.ResourceManager
67-
resourceManagerCfg *p2pconfig.ResourceManagerConfig
68-
connManager connmgr.ConnManager
69-
connGater p2p.ConnectionGater
70-
routingFactory func(context.Context, host.Host) (routing.Routing, error)
71-
peerManagerConfig *p2pbuilderconfig.PeerManagerConfig
72-
createNode p2p.NodeConstructor
73-
disallowListCacheCfg *p2p.DisallowListCacheConfig
74-
unicastConfig *p2pbuilderconfig.UnicastConfig
75-
networkingType flownet.NetworkingType // whether the node is running in private (staked) or public (unstaked) network
67+
resourceManager network.ResourceManager
68+
resourceManagerCfg *p2pconfig.ResourceManagerConfig
69+
connManager connmgr.ConnManager
70+
connGater p2p.ConnectionGater
71+
routingFactory func(context.Context, host.Host) (routing.Routing, error)
72+
peerManagerConfig *p2pbuilderconfig.PeerManagerConfig
73+
createNode p2p.NodeConstructor
74+
disallowListCacheCfg *p2p.DisallowListCacheConfig
75+
unicastConfig *p2pbuilderconfig.UnicastConfig
76+
networkingType flownet.NetworkingType // whether the node is running in private (staked) or public (unstaked) network
77+
protocolPeerCacheList []protocol.ID
7678
}
7779

7880
func NewNodeBuilder(
@@ -155,6 +157,12 @@ func (builder *LibP2PNodeBuilder) OverrideDefaultValidateQueueSize(size int) p2p
155157
return builder
156158
}
157159

160+
// SetProtocolPeerCacheList sets the protocols to track in the protocol peer cache.
161+
func (builder *LibP2PNodeBuilder) SetProtocolPeerCacheList(protocols ...protocol.ID) p2p.NodeBuilder {
162+
builder.protocolPeerCacheList = protocols
163+
return builder
164+
}
165+
158166
// OverrideGossipSubFactory overrides the default gossipsub factory for the GossipSub protocol.
159167
// The purpose of override is to allow the node to provide a custom gossipsub factory for sake of testing or experimentation.
160168
// Note: it is not recommended to override the default gossipsub factory in production unless you know what you are doing.
@@ -284,10 +292,11 @@ func (builder *LibP2PNodeBuilder) Build() (p2p.LibP2PNode, error) {
284292
Parameters: &p2p.NodeParameters{
285293
EnableProtectedStreams: builder.unicastConfig.EnableStreamProtection,
286294
},
287-
Logger: builder.logger,
288-
Host: h,
289-
PeerManager: peerManager,
290-
DisallowListCacheCfg: builder.disallowListCacheCfg,
295+
Logger: builder.logger,
296+
Host: h,
297+
PeerManager: peerManager,
298+
DisallowListCacheCfg: builder.disallowListCacheCfg,
299+
ProtocolPeerCacheList: builder.protocolPeerCacheList,
291300
})
292301
if err != nil {
293302
return nil, fmt.Errorf("could not create libp2p node: %w", err)

network/p2p/cache.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ type ProtocolPeerCache interface {
1818
// RemoveProtocols removes the specified protocols for the given peer from the protocol cache.
1919
RemoveProtocols(peerID peer.ID, protocols []protocol.ID)
2020

21-
// GetPeers returns a copy of the set of peers that support the given protocol.
22-
GetPeers(pid protocol.ID) map[peer.ID]struct{}
21+
// GetPeers returns the set of peers that support the given protocol.
22+
GetPeers(pid protocol.ID) peer.IDSlice
2323
}
2424

2525
// UpdateFunction is a function that adjusts the GossipSub spam record of a peer.

network/p2p/mock/protocol_peer_cache.go

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

network/p2p/node/internal/protocolPeerCache.go

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/libp2p/go-libp2p/core/peer"
1111
"github.com/libp2p/go-libp2p/core/protocol"
1212
"github.com/rs/zerolog"
13+
"golang.org/x/exp/maps"
1314

1415
p2plogging "github.com/onflow/flow-go/network/p2p/logging"
1516
)
@@ -20,13 +21,26 @@ type ProtocolPeerCache struct {
2021
sync.RWMutex
2122
}
2223

23-
func NewProtocolPeerCache(logger zerolog.Logger, h host.Host) (*ProtocolPeerCache, error) {
24+
// NewProtocolPeerCache creates a new ProtocolPeerCache instance using the given host and supported protocols
25+
// Only protocols passed in the protocols list will be tracked
26+
func NewProtocolPeerCache(logger zerolog.Logger, h host.Host, protocols []protocol.ID) (*ProtocolPeerCache, error) {
27+
protocolPeers := make(map[protocol.ID]map[peer.ID]struct{})
28+
for _, pid := range protocols {
29+
protocolPeers[pid] = make(map[peer.ID]struct{})
30+
}
31+
p := &ProtocolPeerCache{protocolPeers: protocolPeers}
32+
33+
// If no protocols are passed, this is a noop cache
34+
if len(protocols) == 0 {
35+
return p, nil
36+
}
37+
2438
sub, err := h.EventBus().
2539
Subscribe([]interface{}{new(event.EvtPeerIdentificationCompleted), new(event.EvtPeerProtocolsUpdated)})
2640
if err != nil {
2741
return nil, fmt.Errorf("could not subscribe to peer protocol update events: %w", err)
2842
}
29-
p := &ProtocolPeerCache{protocolPeers: make(map[protocol.ID]map[peer.ID]struct{})}
43+
3044
h.Network().Notify(&libp2pnet.NotifyBundle{
3145
DisconnectedF: func(n libp2pnet.Network, c libp2pnet.Conn) {
3246
peer := c.RemotePeer()
@@ -43,49 +57,41 @@ func NewProtocolPeerCache(logger zerolog.Logger, h host.Host) (*ProtocolPeerCach
4357
func (p *ProtocolPeerCache) RemovePeer(peerID peer.ID) {
4458
p.Lock()
4559
defer p.Unlock()
46-
for pid, peers := range p.protocolPeers {
60+
for _, peers := range p.protocolPeers {
4761
delete(peers, peerID)
48-
if len(peers) == 0 {
49-
delete(p.protocolPeers, pid)
50-
}
5162
}
5263
}
5364

5465
func (p *ProtocolPeerCache) AddProtocols(peerID peer.ID, protocols []protocol.ID) {
5566
p.Lock()
5667
defer p.Unlock()
5768
for _, pid := range protocols {
58-
peers, ok := p.protocolPeers[pid]
59-
if !ok {
60-
peers = make(map[peer.ID]struct{})
61-
p.protocolPeers[pid] = peers
69+
if peers, ok := p.protocolPeers[pid]; ok {
70+
peers[peerID] = struct{}{}
6271
}
63-
peers[peerID] = struct{}{}
6472
}
6573
}
6674

6775
func (p *ProtocolPeerCache) RemoveProtocols(peerID peer.ID, protocols []protocol.ID) {
6876
p.Lock()
6977
defer p.Unlock()
7078
for _, pid := range protocols {
71-
peers := p.protocolPeers[pid]
72-
delete(peers, peerID)
73-
if len(peers) == 0 {
74-
delete(p.protocolPeers, pid)
79+
if peers, ok := p.protocolPeers[pid]; ok {
80+
delete(peers, peerID)
7581
}
7682
}
7783
}
7884

79-
func (p *ProtocolPeerCache) GetPeers(pid protocol.ID) map[peer.ID]struct{} {
85+
func (p *ProtocolPeerCache) GetPeers(pid protocol.ID) peer.IDSlice {
8086
p.RLock()
8187
defer p.RUnlock()
8288

83-
// it is not safe to return a reference to the map, so we make a copy
84-
peersCopy := make(map[peer.ID]struct{}, len(p.protocolPeers[pid]))
85-
for peerID := range p.protocolPeers[pid] {
86-
peersCopy[peerID] = struct{}{}
89+
peers, ok := p.protocolPeers[pid]
90+
if !ok {
91+
return peer.IDSlice{}
8792
}
88-
return peersCopy
93+
94+
return maps.Keys(peers)
8995
}
9096

9197
func (p *ProtocolPeerCache) consumeSubscription(logger zerolog.Logger, h host.Host, sub event.Subscription) {

network/p2p/node/internal/protocolPeerCache_test.go

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package internal_test
22

33
import (
44
"context"
5+
"slices"
56
"testing"
67
"time"
78

@@ -22,20 +23,22 @@ func TestProtocolPeerCache(t *testing.T) {
2223
ctx, cancel := context.WithCancel(context.Background())
2324
defer cancel()
2425

26+
p1 := protocol.ID("p1")
27+
p2 := protocol.ID("p2")
28+
p3 := protocol.ID("p3")
29+
2530
// create three hosts, and a pcache for the first
31+
// the cache supports all 3
2632
h1, err := p2pbuilder.DefaultLibP2PHost(unittest.DefaultAddress, unittest.KeyFixture(crypto.ECDSASecp256k1))
2733
require.NoError(t, err)
28-
pcache, err := internal.NewProtocolPeerCache(zerolog.Nop(), h1)
34+
pcache, err := internal.NewProtocolPeerCache(zerolog.Nop(), h1, []protocol.ID{p1, p2, p3})
2935
require.NoError(t, err)
3036
h2, err := p2pbuilder.DefaultLibP2PHost(unittest.DefaultAddress, unittest.KeyFixture(crypto.ECDSASecp256k1))
3137
require.NoError(t, err)
3238
h3, err := p2pbuilder.DefaultLibP2PHost(unittest.DefaultAddress, unittest.KeyFixture(crypto.ECDSASecp256k1))
3339
require.NoError(t, err)
3440

3541
// register each host on a separate protocol
36-
p1 := protocol.ID("p1")
37-
p2 := protocol.ID("p2")
38-
p3 := protocol.ID("p3")
3942
noopHandler := func(s network.Stream) {}
4043
h1.SetStreamHandler(p1, noopHandler)
4144
h2.SetStreamHandler(p2, noopHandler)
@@ -50,8 +53,8 @@ func TestProtocolPeerCache(t *testing.T) {
5053
assert.Eventually(t, func() bool {
5154
peers2 := pcache.GetPeers(p2)
5255
peers3 := pcache.GetPeers(p3)
53-
_, ok2 := peers2[h2.ID()]
54-
_, ok3 := peers3[h3.ID()]
56+
ok2 := slices.Contains(peers2, h2.ID())
57+
ok3 := slices.Contains(peers3, h3.ID())
5558
return len(peers2) == 1 && len(peers3) == 1 && ok2 && ok3
5659
}, 3*time.Second, 50*time.Millisecond)
5760

@@ -64,15 +67,16 @@ func TestProtocolPeerCache(t *testing.T) {
6467
}, 3*time.Second, 50*time.Millisecond)
6568

6669
// add support for p4 on h2 and h3
70+
// note: pcache does NOT support p4 and should not cache it
6771
p4 := protocol.ID("p4")
6872
h2.SetStreamHandler(p4, noopHandler)
6973
h3.SetStreamHandler(p4, noopHandler)
7074

71-
// check that h1's pcache reflects the change
72-
assert.Eventually(t, func() bool {
75+
// check that h1's pcache never contains p4
76+
assert.Never(t, func() bool {
7377
peers4 := pcache.GetPeers(p4)
74-
_, ok2 := peers4[h2.ID()]
75-
_, ok3 := peers4[h3.ID()]
78+
ok2 := slices.Contains(peers4, h2.ID())
79+
ok3 := slices.Contains(peers4, h3.ID())
7680
return len(peers4) == 2 && ok2 && ok3
77-
}, 3*time.Second, 50*time.Millisecond)
81+
}, 1*time.Second, 50*time.Millisecond)
7882
}

network/p2p/node/libp2pNode.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func NewNode(cfg *p2p.NodeConfig) (*Node, error) {
8282
return nil, fmt.Errorf("invalid config: %w", err)
8383
}
8484

85-
pCache, err := nodeinternal.NewProtocolPeerCache(cfg.Logger, cfg.Host)
85+
pCache, err := nodeinternal.NewProtocolPeerCache(cfg.Logger, cfg.Host, cfg.ProtocolPeerCacheList)
8686
if err != nil {
8787
return nil, fmt.Errorf("failed to create protocol peer cache: %w", err)
8888
}
@@ -182,12 +182,7 @@ func (n *Node) RemovePeer(peerID peer.ID) error {
182182

183183
// GetPeersForProtocol returns slice peer IDs for the specified protocol ID.
184184
func (n *Node) GetPeersForProtocol(pid protocol.ID) peer.IDSlice {
185-
pMap := n.pCache.GetPeers(pid)
186-
peers := make(peer.IDSlice, 0, len(pMap))
187-
for p := range pMap {
188-
peers = append(peers, p)
189-
}
190-
return peers
185+
return n.pCache.GetPeers(pid)
191186
}
192187

193188
// OpenAndWriteOnStream opens a new stream to a peer. The stream is opened to the given peerID

0 commit comments

Comments
 (0)