Skip to content

Commit 0fef7aa

Browse files
authored
Merge pull request #1182 from twmb/cb_concurrent
kgo: call OnPartitionsCallbackBlocked concurrently
2 parents 0793754 + 7c7ca2b commit 0fef7aa

File tree

2 files changed

+4
-1
lines changed

2 files changed

+4
-1
lines changed

pkg/kgo/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1877,6 +1877,9 @@ func OnPartitionsLost(onLost func(context.Context, *Client, map[string][]int32))
18771877
// callbacks are blocked from [BlockRebalanceOnPoll]. You can use this as a
18781878
// signal in your processing function to hurry up and unblock rebalancing
18791879
// before your group member is kicked from the group at the session timeout.
1880+
//
1881+
// Since this is meant to be a signal to blocking operations, and to ensure
1882+
// nothing can block this signal, this callback is called in a goroutine.
18801883
func OnPartitionsCallbackBlocked(fn func(context.Context, *Client)) GroupOpt {
18811884
return groupOpt{func(cfg *cfg) { cfg.onBlocked = fn }}
18821885
}

pkg/kgo/consumer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ func (c *consumer) waitAndAddRebalance() {
280280
for c.pollWaitState&math.MaxUint32 != 0 {
281281
if !blockedCalled {
282282
if c.cl.cfg.onBlocked != nil {
283-
c.cl.cfg.onBlocked(c.cl.ctx, c.cl)
283+
go c.cl.cfg.onBlocked(c.cl.ctx, c.cl)
284284
}
285285
blockedCalled = true
286286
}

0 commit comments

Comments
 (0)