Skip to content

Commit 7c7ca2b

Browse files
committed
kgo: call OnPartitionsCallbackBlocked concurrently
This was kinda meant to already be the case, and by not being the case currently, you can accidentally deadlock your code. Running the callback in a goroutine should help avoid user side deadlocks. Relates to #1162.
1 parent 0b8520c commit 7c7ca2b

File tree

2 files changed

+4
-1
lines changed

2 files changed

+4
-1
lines changed

pkg/kgo/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1877,6 +1877,9 @@ func OnPartitionsLost(onLost func(context.Context, *Client, map[string][]int32))
18771877
// callbacks are blocked from [BlockRebalanceOnPoll]. You can use this as a
18781878
// signal in your processing function to hurry up and unblock rebalancing
18791879
// before your group member is kicked from the group at the session timeout.
1880+
//
1881+
// Since this is meant to be a signal to blocking operations, and to ensure
1882+
// nothing can block this signal, this callback is called in a goroutine.
18801883
func OnPartitionsCallbackBlocked(fn func(context.Context, *Client)) GroupOpt {
18811884
return groupOpt{func(cfg *cfg) { cfg.onBlocked = fn }}
18821885
}

pkg/kgo/consumer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ func (c *consumer) waitAndAddRebalance() {
280280
for c.pollWaitState&math.MaxUint32 != 0 {
281281
if !blockedCalled {
282282
if c.cl.cfg.onBlocked != nil {
283-
c.cl.cfg.onBlocked(c.cl.ctx, c.cl)
283+
go c.cl.cfg.onBlocked(c.cl.ctx, c.cl)
284284
}
285285
blockedCalled = true
286286
}

0 commit comments

Comments
 (0)