Skip to content

Commit ec36595

Browse files
authored
Merge pull request #1175 from carsonip/source-loop-fetch
kgo: source fetch loop checks for context before fetch
2 parents 0fef7aa + b7ce3d5 commit ec36595

File tree

1 file changed

+8
-0
lines changed

1 file changed

+8
-0
lines changed

pkg/kgo/source.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -788,6 +788,14 @@ func (s *source) loopFetch() {
788788
s.fetchState.hardFinish()
789789
return
790790
case doneFetch := <-canFetch:
791+
// Perform another last minute best-effort check on session context
792+
// to avoid calling fetch with a canceled context just because it passed
793+
// the above selects by chance due to pseudo-random select behavior.
794+
if session.ctx.Err() != nil {
795+
doneFetch <- struct{}{}
796+
s.fetchState.hardFinish()
797+
return
798+
}
791799
again = s.fetchState.maybeFinish(s.fetch(session, doneFetch))
792800
}
793801
}

0 commit comments

Comments
 (0)