Skip to content

Commit 2698ed0

Browse files
authored
Merge pull request #1185 from twmb/meta-less
kgo: avoid updating metadata before issuing ListOffsets / OffsetForLeaderEpoch
2 parents b077119 + 88696a4 commit 2698ed0

File tree

4 files changed

+23
-75
lines changed

4 files changed

+23
-75
lines changed

pkg/kgo/consumer.go

Lines changed: 15 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1007,7 +1007,7 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
10071007
} else { // else we guarded it
10081008
c.unguardSessionChange(session)
10091009
}
1010-
loadOffsets.loadWithSession(session, "loading offsets in new session from assign") // odds are this assign came from a metadata update, so no reason to force a refresh with loadWithSessionNow
1010+
loadOffsets.loadWithSession(session)
10111011

10121012
// If we started a new session or if we unguarded, we have one
10131013
// worker. This one worker allowed us to safely add our load
@@ -1287,8 +1287,6 @@ func (c *consumer) doOnMetadataUpdate() {
12871287
case c.g != nil:
12881288
c.g.findNewAssignments()
12891289
}
1290-
1291-
go c.loadSession().doOnMetadataUpdate()
12921290
}
12931291

12941292
go func() {
@@ -1301,23 +1299,6 @@ func (c *consumer) doOnMetadataUpdate() {
13011299
}
13021300
}
13031301

1304-
func (s *consumerSession) doOnMetadataUpdate() {
1305-
if s == nil || s == noConsumerSession { // no session started yet
1306-
return
1307-
}
1308-
1309-
s.listOrEpochMu.Lock()
1310-
defer s.listOrEpochMu.Unlock()
1311-
1312-
if s.listOrEpochMetaCh == nil {
1313-
return // nothing waiting to load epochs / offsets
1314-
}
1315-
select {
1316-
case s.listOrEpochMetaCh <- struct{}{}:
1317-
default:
1318-
}
1319-
}
1320-
13211302
type offsetLoadMap map[string]map[int32]offsetLoad
13221303

13231304
// offsetLoad is effectively an Offset, but also includes a potential replica
@@ -1465,20 +1446,11 @@ func (l *listOrEpochLoads) mergeFrom(src listOrEpochLoads) {
14651446

14661447
func (l listOrEpochLoads) isEmpty() bool { return len(l.List) == 0 && len(l.Epoch) == 0 }
14671448

1468-
func (l listOrEpochLoads) loadWithSession(s *consumerSession, why string) {
1469-
if !l.isEmpty() {
1470-
s.incWorker()
1471-
go s.listOrEpoch(l, false, why)
1472-
}
1473-
}
1474-
1475-
func (l listOrEpochLoads) loadWithSessionNow(s *consumerSession, why string) bool {
1449+
func (l listOrEpochLoads) loadWithSession(s *consumerSession) {
14761450
if !l.isEmpty() {
14771451
s.incWorker()
1478-
go s.listOrEpoch(l, true, why)
1479-
return true
1452+
go s.listOrEpoch(l, false)
14801453
}
1481-
return false
14821454
}
14831455

14841456
// A consumer session is responsible for an era of fetching records for a set
@@ -1517,7 +1489,6 @@ type consumerSession struct {
15171489
// assignPartitions).
15181490
listOrEpochMu sync.Mutex
15191491
listOrEpochLoadsWaiting listOrEpochLoads
1520-
listOrEpochMetaCh chan struct{} // non-nil if Loads is non-nil, signalled on meta update
15211492
listOrEpochLoadsLoading listOrEpochLoads
15221493
}
15231494

@@ -1769,7 +1740,7 @@ func (c *consumer) startNewSession(tps *topicsPartitions) *consumerSession {
17691740
// This function is responsible for issuing ListOffsets or
17701741
// OffsetForLeaderEpoch. These requests's responses are only handled within
17711742
// the context of a consumer session.
1772-
func (s *consumerSession) listOrEpoch(waiting listOrEpochLoads, immediate bool, why string) {
1743+
func (s *consumerSession) listOrEpoch(waiting listOrEpochLoads, isReload bool) {
17731744
defer s.decWorker()
17741745

17751746
// It is possible for a metadata update to try to migrate partition
@@ -1781,36 +1752,34 @@ func (s *consumerSession) listOrEpoch(waiting listOrEpochLoads, immediate bool,
17811752
return
17821753
}
17831754

1784-
wait := true
1785-
if immediate {
1786-
s.c.cl.triggerUpdateMetadataNow(why)
1787-
} else {
1788-
wait = s.c.cl.triggerUpdateMetadata(false, why) // avoid trigger if within refresh interval
1789-
}
1790-
1791-
s.listOrEpochMu.Lock() // collapse any listOrEpochs that occur during meta update into one
1755+
s.listOrEpochMu.Lock() // collapse any listOrEpochs that occur during reload backoff into one
17921756
if !s.listOrEpochLoadsWaiting.isEmpty() {
17931757
s.listOrEpochLoadsWaiting.mergeFrom(waiting)
17941758
s.listOrEpochMu.Unlock()
17951759
return
17961760
}
17971761
s.listOrEpochLoadsWaiting = waiting
1798-
s.listOrEpochMetaCh = make(chan struct{}, 1)
17991762
s.listOrEpochMu.Unlock()
18001763

1801-
if wait {
1764+
// If this is a reload, we wait a bit to collect any other loads that
1765+
// are failing around the same time / new loads.
1766+
//
1767+
// We rely on the client list/epoch sharder to handle purging the cached
1768+
// metadata on any request / topic / partition error.
1769+
if isReload {
1770+
after := time.NewTimer(5 * time.Second)
18021771
select {
18031772
case <-s.ctx.Done():
1773+
after.Stop()
18041774
return
1805-
case <-s.listOrEpochMetaCh:
1775+
case <-after.C:
18061776
}
18071777
}
18081778

18091779
s.listOrEpochMu.Lock()
18101780
loading := s.listOrEpochLoadsWaiting
18111781
s.listOrEpochLoadsLoading.mergeFrom(loading)
18121782
s.listOrEpochLoadsWaiting = listOrEpochLoads{}
1813-
s.listOrEpochMetaCh = nil
18141783
s.listOrEpochMu.Unlock()
18151784

18161785
brokerLoads := s.mapLoadsToBrokers(loading)
@@ -1834,23 +1803,7 @@ func (s *consumerSession) listOrEpoch(waiting listOrEpochLoads, immediate bool,
18341803
defer func() {
18351804
if !reloads.isEmpty() {
18361805
s.incWorker()
1837-
go func() {
1838-
// Before we dec our worker, we must add the
1839-
// reloads back into the session's waiting loads.
1840-
// Doing so allows a concurrent stopSession to
1841-
// track the waiting loads, whereas if we did not
1842-
// add things back to the session, we could abandon
1843-
// loading these offsets and have a stuck cursor.
1844-
defer s.decWorker()
1845-
defer reloads.loadWithSession(s, "reload offsets from load failure")
1846-
after := time.NewTimer(time.Second)
1847-
defer after.Stop()
1848-
select {
1849-
case <-after.C:
1850-
case <-s.ctx.Done():
1851-
return
1852-
}
1853-
}()
1806+
go s.listOrEpoch(reloads, true)
18541807
}
18551808
}()
18561809

pkg/kgo/metadata.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,20 +136,19 @@ func (cl *Client) waitmeta(ctx context.Context, wait time.Duration, why string)
136136
cl.metawait.c.Broadcast()
137137
}
138138

139-
func (cl *Client) triggerUpdateMetadata(must bool, why string) bool {
139+
func (cl *Client) triggerUpdateMetadata(must bool, why string) {
140140
if !must {
141141
cl.metawait.mu.Lock()
142142
defer cl.metawait.mu.Unlock()
143143
if time.Since(cl.metawait.lastUpdate) < cl.cfg.metadataMinAge {
144-
return false
144+
return
145145
}
146146
}
147147

148148
select {
149149
case cl.updateMetadataCh <- why:
150150
default:
151151
}
152-
return true
153152
}
154153

155154
func (cl *Client) triggerUpdateMetadataNow(why string) {

pkg/kgo/source.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1023,16 +1023,12 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct
10231023
// reload offsets *always* triggers a metadata update.
10241024
if updateWhy != nil {
10251025
why := updateWhy.reason(fmt.Sprintf("fetch had inner topic errors from broker %d", s.nodeID))
1026-
// loadWithSessionNow triggers a metadata update IF there are
1027-
// offsets to reload. If there are no offsets to reload, we
1028-
// trigger one here.
1029-
if !reloadOffsets.loadWithSessionNow(consumerSession, why) {
1030-
if updateWhy.isOnly(kerr.UnknownTopicOrPartition) || updateWhy.isOnly(kerr.UnknownTopicID) {
1031-
s.cl.triggerUpdateMetadata(false, why)
1032-
} else {
1033-
s.cl.triggerUpdateMetadataNow(why)
1034-
}
1026+
if updateWhy.isOnly(kerr.UnknownTopicOrPartition) || updateWhy.isOnly(kerr.UnknownTopicID) {
1027+
s.cl.triggerUpdateMetadata(false, why)
1028+
} else {
1029+
s.cl.triggerUpdateMetadataNow(why)
10351030
}
1031+
reloadOffsets.loadWithSession(consumerSession)
10361032
}
10371033

10381034
if fetch.hasErrorsOrRecords() {

pkg/kgo/topics_and_partitions.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -957,5 +957,5 @@ func (css *consumerSessionStopper) maybeRestart() {
957957
}
958958
session := css.cl.consumer.startNewSession(css.tpsPrior)
959959
defer session.decWorker()
960-
css.reloadOffsets.loadWithSession(session, "resuming reload offsets after session stopped for cursor migrating in metadata")
960+
css.reloadOffsets.loadWithSession(session)
961961
}

0 commit comments

Comments
 (0)