Skip to content

Commit 2bc16f0

Browse files
authored
Protect connectHandler.state behind RWMutex (#194)
* Protect connectHandler.state behind RWMutex * Protect connectHandler.state with RWMutex This change protects the state of connectHandler by using a RWMutex to ensure safe concurrent access.
1 parent 22b786a commit 2bc16f0

File tree

2 files changed

+21
-5
lines changed

2 files changed

+21
-5
lines changed

.changeset/quiet-pigs-hunt.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"inngestgo": patch
3+
---
4+
5+
Protect connectHandler.state behind RWMutex

connect/handler.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"os"
1212
"runtime"
1313
"strconv"
14+
"sync"
1415
"sync/atomic"
1516
"time"
1617

@@ -150,7 +151,9 @@ type connectHandler struct {
150151

151152
// Global connection state
152153

153-
state ConnectionState
154+
stateLock sync.RWMutex
155+
state ConnectionState
156+
154157
workerCtx context.Context
155158
cancelWorkerCtx context.CancelFunc
156159
gracefulCloseEg errgroup.Group
@@ -238,7 +241,7 @@ func (h *connectHandler) Connect(ctx context.Context) (WorkerConnection, error)
238241
isInitialConnection = false
239242
initialConnectionDone <- nil
240243
}
241-
h.state = ConnectionStateActive
244+
h.setState(ConnectionStateActive)
242245
attempts = 0
243246
continue
244247

@@ -247,7 +250,7 @@ func (h *connectHandler) Connect(ctx context.Context) (WorkerConnection, error)
247250
h.logger.Error("connect failed", "err", err, "reconnect", msg.reconnect)
248251

249252
if !msg.reconnect {
250-
h.state = ConnectionStateClosed
253+
h.setState(ConnectionStateClosed)
251254

252255
if isInitialConnection {
253256
isInitialConnection = false
@@ -257,7 +260,7 @@ func (h *connectHandler) Connect(ctx context.Context) (WorkerConnection, error)
257260
return err
258261
}
259262

260-
h.state = ConnectionStateReconnecting
263+
h.setState(ConnectionStateReconnecting)
261264

262265
// Some errors should be handled differently (e.g. auth failed)
263266
if msg.err != nil {
@@ -449,15 +452,23 @@ func (h *connectHandler) Close() error {
449452
return fmt.Errorf("failed to connect: %w", err)
450453
}
451454

452-
h.state = ConnectionStateClosed
455+
h.setState(ConnectionStateClosed)
453456

454457
return nil
455458
}
456459

457460
func (h *connectHandler) State() ConnectionState {
461+
h.stateLock.RLock()
462+
defer h.stateLock.RUnlock()
458463
return h.state
459464
}
460465

466+
func (h *connectHandler) setState(state ConnectionState) {
467+
h.stateLock.Lock()
468+
defer h.stateLock.Unlock()
469+
h.state = state
470+
}
471+
461472
var errGatewayDraining = errors.New("gateway is draining")
462473

463474
func (h *connectHandler) processExecutorRequest(msg workerPoolMsg) {

0 commit comments

Comments
 (0)