Skip to content
This repository was archived by the owner on Feb 1, 2023. It is now read-only.

Commit a06a9ea

Browse files
authored
feat: coalesce and queue connection event handling (#565)
* feat: batch and queue connection event handling * address feedback * fix: mark responsive on new connection
1 parent 24c8502 commit a06a9ea

File tree

8 files changed

+312
-160
lines changed

8 files changed

+312
-160
lines changed

bitswap.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
303303
bs.engine.SetSendDontHaves(bs.engineSetSendDontHaves)
304304

305305
bs.pqm.Startup()
306-
network.SetDelegate(bs)
306+
network.Start(bs)
307307

308308
// Start up bitswaps async worker routines
309309
bs.startWorkers(ctx, px)
@@ -316,6 +316,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
316316
sm.Shutdown()
317317
cancelFunc()
318318
notif.Shutdown()
319+
network.Stop()
319320
}()
320321
procctx.CloseAfterContext(px, ctx) // parent cancelled first
321322

network/connecteventmanager.go

Lines changed: 145 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -11,96 +11,203 @@ type ConnectionListener interface {
1111
PeerDisconnected(peer.ID)
1212
}
1313

14+
type state byte
15+
16+
const (
17+
stateDisconnected = iota
18+
stateResponsive
19+
stateUnresponsive
20+
)
21+
1422
type connectEventManager struct {
1523
connListener ConnectionListener
1624
lk sync.RWMutex
17-
conns map[peer.ID]*connState
25+
cond sync.Cond
26+
peers map[peer.ID]*peerState
27+
28+
changeQueue []peer.ID
29+
stop bool
30+
done chan struct{}
1831
}
1932

20-
type connState struct {
21-
refs int
22-
responsive bool
33+
type peerState struct {
34+
newState, curState state
35+
pending bool
2336
}
2437

2538
func newConnectEventManager(connListener ConnectionListener) *connectEventManager {
26-
return &connectEventManager{
39+
evtManager := &connectEventManager{
2740
connListener: connListener,
28-
conns: make(map[peer.ID]*connState),
41+
peers: make(map[peer.ID]*peerState),
42+
done: make(chan struct{}),
2943
}
44+
evtManager.cond = sync.Cond{L: &evtManager.lk}
45+
return evtManager
3046
}
3147

32-
func (c *connectEventManager) Connected(p peer.ID) {
48+
func (c *connectEventManager) Start() {
49+
go c.worker()
50+
}
51+
52+
func (c *connectEventManager) Stop() {
3353
c.lk.Lock()
34-
defer c.lk.Unlock()
54+
c.stop = true
55+
c.lk.Unlock()
56+
c.cond.Broadcast()
3557

36-
state, ok := c.conns[p]
58+
<-c.done
59+
}
60+
61+
func (c *connectEventManager) getState(p peer.ID) state {
62+
if state, ok := c.peers[p]; ok {
63+
return state.newState
64+
} else {
65+
return stateDisconnected
66+
}
67+
}
68+
69+
func (c *connectEventManager) setState(p peer.ID, newState state) {
70+
state, ok := c.peers[p]
3771
if !ok {
38-
state = &connState{responsive: true}
39-
c.conns[p] = state
72+
state = new(peerState)
73+
c.peers[p] = state
74+
}
75+
state.newState = newState
76+
if !state.pending && state.newState != state.curState {
77+
state.pending = true
78+
c.changeQueue = append(c.changeQueue, p)
79+
c.cond.Broadcast()
4080
}
41-
state.refs++
81+
}
4282

43-
if state.refs == 1 && state.responsive {
44-
c.connListener.PeerConnected(p)
83+
// Waits for a change to be enqueued, or for the event manager to be stopped. Returns false if the
84+
// connect event manager has been stopped.
85+
func (c *connectEventManager) waitChange() bool {
86+
for !c.stop && len(c.changeQueue) == 0 {
87+
c.cond.Wait()
4588
}
89+
return !c.stop
4690
}
4791

48-
func (c *connectEventManager) Disconnected(p peer.ID) {
92+
func (c *connectEventManager) worker() {
4993
c.lk.Lock()
5094
defer c.lk.Unlock()
95+
defer close(c.done)
96+
97+
for c.waitChange() {
98+
pid := c.changeQueue[0]
99+
c.changeQueue[0] = peer.ID("") // free the peer ID (slicing won't do that)
100+
c.changeQueue = c.changeQueue[1:]
101+
102+
state, ok := c.peers[pid]
103+
// If we've disconnected and forgotten, continue.
104+
if !ok {
105+
// This shouldn't be possible because _this_ thread is responsible for
106+
// removing peers from this map, and we shouldn't get duplicate entries in
107+
// the change queue.
108+
log.Error("a change was enqueued for a peer we're not tracking")
109+
continue
110+
}
51111

52-
state, ok := c.conns[p]
53-
if !ok {
54-
// Should never happen
112+
// Record the fact that this "state" is no longer in the queue.
113+
state.pending = false
114+
115+
// Then, if there's nothing to do, continue.
116+
if state.curState == state.newState {
117+
continue
118+
}
119+
120+
// Or record the state update, then apply it.
121+
oldState := state.curState
122+
state.curState = state.newState
123+
124+
switch state.newState {
125+
case stateDisconnected:
126+
delete(c.peers, pid)
127+
fallthrough
128+
case stateUnresponsive:
129+
// Only trigger a disconnect event if the peer was responsive.
130+
// We could be transitioning from unresponsive to disconnected.
131+
if oldState == stateResponsive {
132+
c.lk.Unlock()
133+
c.connListener.PeerDisconnected(pid)
134+
c.lk.Lock()
135+
}
136+
case stateResponsive:
137+
c.lk.Unlock()
138+
c.connListener.PeerConnected(pid)
139+
c.lk.Lock()
140+
}
141+
}
142+
}
143+
144+
// Called whenever we receive a new connection. May be called many times.
145+
func (c *connectEventManager) Connected(p peer.ID) {
146+
c.lk.Lock()
147+
defer c.lk.Unlock()
148+
149+
// !responsive -> responsive
150+
151+
if c.getState(p) == stateResponsive {
55152
return
56153
}
57-
state.refs--
154+
c.setState(p, stateResponsive)
155+
}
58156

59-
if state.refs == 0 {
60-
if state.responsive {
61-
c.connListener.PeerDisconnected(p)
62-
}
63-
delete(c.conns, p)
157+
// Called when we drop the final connection to a peer.
158+
func (c *connectEventManager) Disconnected(p peer.ID) {
159+
c.lk.Lock()
160+
defer c.lk.Unlock()
161+
162+
// !disconnected -> disconnected
163+
164+
if c.getState(p) == stateDisconnected {
165+
return
64166
}
167+
168+
c.setState(p, stateDisconnected)
65169
}
66170

171+
// Called whenever a peer is unresponsive.
67172
func (c *connectEventManager) MarkUnresponsive(p peer.ID) {
68173
c.lk.Lock()
69174
defer c.lk.Unlock()
70175

71-
state, ok := c.conns[p]
72-
if !ok || !state.responsive {
176+
// responsive -> unresponsive
177+
178+
if c.getState(p) != stateResponsive {
73179
return
74180
}
75-
state.responsive = false
76181

77-
c.connListener.PeerDisconnected(p)
182+
c.setState(p, stateUnresponsive)
78183
}
79184

185+
// Called whenever we receive a message from a peer.
186+
//
187+
// - When we're connected to the peer, this will mark the peer as responsive (from unresponsive).
188+
// - When not connected, we ignore this call. Unfortunately, a peer may disconnect before we process
189+
// the "on message" event, so we can't treat this as evidence of a connection.
80190
func (c *connectEventManager) OnMessage(p peer.ID) {
81-
// This is a frequent operation so to avoid different message arrivals
82-
// getting blocked by a write lock, first take a read lock to check if
83-
// we need to modify state
84191
c.lk.RLock()
85-
state, ok := c.conns[p]
86-
responsive := ok && state.responsive
192+
unresponsive := c.getState(p) == stateUnresponsive
87193
c.lk.RUnlock()
88194

89-
if !ok || responsive {
195+
// Only continue if both connected, and unresponsive.
196+
if !unresponsive {
90197
return
91198
}
92199

200+
// unresponsive -> responsive
201+
93202
// We need to make a modification so now take a write lock
94203
c.lk.Lock()
95204
defer c.lk.Unlock()
96205

97206
// Note: state may have changed in the time between when read lock
98207
// was released and write lock taken, so check again
99-
state, ok = c.conns[p]
100-
if !ok || state.responsive {
208+
if c.getState(p) != stateUnresponsive {
101209
return
102210
}
103211

104-
state.responsive = true
105-
c.connListener.PeerConnected(p)
212+
c.setState(p, stateResponsive)
106213
}

0 commit comments

Comments
 (0)