Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 15 additions & 62 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,7 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
} else { // else we guarded it
c.unguardSessionChange(session)
}
loadOffsets.loadWithSession(session, "loading offsets in new session from assign") // odds are this assign came from a metadata update, so no reason to force a refresh with loadWithSessionNow
loadOffsets.loadWithSession(session)

// If we started a new session or if we unguarded, we have one
// worker. This one worker allowed us to safely add our load
Expand Down Expand Up @@ -1287,8 +1287,6 @@ func (c *consumer) doOnMetadataUpdate() {
case c.g != nil:
c.g.findNewAssignments()
}

go c.loadSession().doOnMetadataUpdate()
}

go func() {
Expand All @@ -1301,23 +1299,6 @@ func (c *consumer) doOnMetadataUpdate() {
}
}

func (s *consumerSession) doOnMetadataUpdate() {
if s == nil || s == noConsumerSession { // no session started yet
return
}

s.listOrEpochMu.Lock()
defer s.listOrEpochMu.Unlock()

if s.listOrEpochMetaCh == nil {
return // nothing waiting to load epochs / offsets
}
select {
case s.listOrEpochMetaCh <- struct{}{}:
default:
}
}

type offsetLoadMap map[string]map[int32]offsetLoad

// offsetLoad is effectively an Offset, but also includes a potential replica
Expand Down Expand Up @@ -1465,20 +1446,11 @@ func (l *listOrEpochLoads) mergeFrom(src listOrEpochLoads) {

func (l listOrEpochLoads) isEmpty() bool { return len(l.List) == 0 && len(l.Epoch) == 0 }

func (l listOrEpochLoads) loadWithSession(s *consumerSession, why string) {
if !l.isEmpty() {
s.incWorker()
go s.listOrEpoch(l, false, why)
}
}

func (l listOrEpochLoads) loadWithSessionNow(s *consumerSession, why string) bool {
func (l listOrEpochLoads) loadWithSession(s *consumerSession) {
if !l.isEmpty() {
s.incWorker()
go s.listOrEpoch(l, true, why)
return true
go s.listOrEpoch(l, false)
}
return false
}

// A consumer session is responsible for an era of fetching records for a set
Expand Down Expand Up @@ -1517,7 +1489,6 @@ type consumerSession struct {
// assignPartitions).
listOrEpochMu sync.Mutex
listOrEpochLoadsWaiting listOrEpochLoads
listOrEpochMetaCh chan struct{} // non-nil if Loads is non-nil, signalled on meta update
listOrEpochLoadsLoading listOrEpochLoads
}

Expand Down Expand Up @@ -1769,7 +1740,7 @@ func (c *consumer) startNewSession(tps *topicsPartitions) *consumerSession {
// This function is responsible for issuing ListOffsets or
// OffsetForLeaderEpoch. These requests's responses are only handled within
// the context of a consumer session.
func (s *consumerSession) listOrEpoch(waiting listOrEpochLoads, immediate bool, why string) {
func (s *consumerSession) listOrEpoch(waiting listOrEpochLoads, isReload bool) {
defer s.decWorker()

// It is possible for a metadata update to try to migrate partition
Expand All @@ -1781,36 +1752,34 @@ func (s *consumerSession) listOrEpoch(waiting listOrEpochLoads, immediate bool,
return
}

wait := true
if immediate {
s.c.cl.triggerUpdateMetadataNow(why)
} else {
wait = s.c.cl.triggerUpdateMetadata(false, why) // avoid trigger if within refresh interval
}

s.listOrEpochMu.Lock() // collapse any listOrEpochs that occur during meta update into one
s.listOrEpochMu.Lock() // collapse any listOrEpochs that occur during reload backoff into one
if !s.listOrEpochLoadsWaiting.isEmpty() {
s.listOrEpochLoadsWaiting.mergeFrom(waiting)
s.listOrEpochMu.Unlock()
return
}
s.listOrEpochLoadsWaiting = waiting
s.listOrEpochMetaCh = make(chan struct{}, 1)
s.listOrEpochMu.Unlock()

if wait {
// If this is a reload, we wait a bit to collect any other loads that
// are failing around the same time / new loads.
//
// We rely on the client list/epoch sharder to handle purging the cached
// metadata on any request / topic / partition error.
if isReload {
after := time.NewTimer(5 * time.Second)
select {
case <-s.ctx.Done():
after.Stop()
return
case <-s.listOrEpochMetaCh:
case <-after.C:
}
}

s.listOrEpochMu.Lock()
loading := s.listOrEpochLoadsWaiting
s.listOrEpochLoadsLoading.mergeFrom(loading)
s.listOrEpochLoadsWaiting = listOrEpochLoads{}
s.listOrEpochMetaCh = nil
s.listOrEpochMu.Unlock()

brokerLoads := s.mapLoadsToBrokers(loading)
Expand All @@ -1834,23 +1803,7 @@ func (s *consumerSession) listOrEpoch(waiting listOrEpochLoads, immediate bool,
defer func() {
if !reloads.isEmpty() {
s.incWorker()
go func() {
// Before we dec our worker, we must add the
// reloads back into the session's waiting loads.
// Doing so allows a concurrent stopSession to
// track the waiting loads, whereas if we did not
// add things back to the session, we could abandon
// loading these offsets and have a stuck cursor.
defer s.decWorker()
defer reloads.loadWithSession(s, "reload offsets from load failure")
after := time.NewTimer(time.Second)
defer after.Stop()
select {
case <-after.C:
case <-s.ctx.Done():
return
}
}()
go s.listOrEpoch(reloads, true)
}
}()

Expand Down
5 changes: 2 additions & 3 deletions pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,20 +136,19 @@ func (cl *Client) waitmeta(ctx context.Context, wait time.Duration, why string)
cl.metawait.c.Broadcast()
}

func (cl *Client) triggerUpdateMetadata(must bool, why string) bool {
func (cl *Client) triggerUpdateMetadata(must bool, why string) {
if !must {
cl.metawait.mu.Lock()
defer cl.metawait.mu.Unlock()
if time.Since(cl.metawait.lastUpdate) < cl.cfg.metadataMinAge {
return false
return
}
}

select {
case cl.updateMetadataCh <- why:
default:
}
return true
}

func (cl *Client) triggerUpdateMetadataNow(why string) {
Expand Down
14 changes: 5 additions & 9 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -1023,16 +1023,12 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct
// reload offsets *always* triggers a metadata update.
if updateWhy != nil {
why := updateWhy.reason(fmt.Sprintf("fetch had inner topic errors from broker %d", s.nodeID))
// loadWithSessionNow triggers a metadata update IF there are
// offsets to reload. If there are no offsets to reload, we
// trigger one here.
if !reloadOffsets.loadWithSessionNow(consumerSession, why) {
if updateWhy.isOnly(kerr.UnknownTopicOrPartition) || updateWhy.isOnly(kerr.UnknownTopicID) {
s.cl.triggerUpdateMetadata(false, why)
} else {
s.cl.triggerUpdateMetadataNow(why)
}
if updateWhy.isOnly(kerr.UnknownTopicOrPartition) || updateWhy.isOnly(kerr.UnknownTopicID) {
s.cl.triggerUpdateMetadata(false, why)
} else {
s.cl.triggerUpdateMetadataNow(why)
}
reloadOffsets.loadWithSession(consumerSession)
}

if fetch.hasErrorsOrRecords() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/topics_and_partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -957,5 +957,5 @@ func (css *consumerSessionStopper) maybeRestart() {
}
session := css.cl.consumer.startNewSession(css.tpsPrior)
defer session.decWorker()
css.reloadOffsets.loadWithSession(session, "resuming reload offsets after session stopped for cursor migrating in metadata")
css.reloadOffsets.loadWithSession(session)
}