Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions cmd/sharddistributor-canary/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,17 @@ func opts(fixedNamespace, ephemeralNamespace, endpoint string) fx.Option {
fx.Provide(zap.NewDevelopment),
fx.Provide(log.NewLogger),

// Start the YARPC dispatcher
fx.Invoke(func(lc fx.Lifecycle, dispatcher *yarpc.Dispatcher) {
// We do decorate instead of Invoke because we want to start and stop the dispatcher at the
// correct time.
// It will start before all dependencies are started and stop after all dependencies are stopped.
// The Decorate gives fx enough information, so it can start and stop the dispatcher at the correct time.
//
// It is critical to start and stop the dispatcher at the correct time.
// Since the executors need to
// be able to send a final "drain" request to the shard distributor before the application is stopped.
fx.Decorate(func(lc fx.Lifecycle, dispatcher *yarpc.Dispatcher) *yarpc.Dispatcher {
lc.Append(fx.StartStopHook(dispatcher.Start, dispatcher.Stop))
return dispatcher
}),

// Include the canary module
Expand Down
25 changes: 21 additions & 4 deletions service/sharddistributor/client/executorclient/clientimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ const (
)

const (
heartbeatJitterCoeff = 0.1 // 10% jitter
heartbeatJitterCoeff = 0.1 // 10% jitter
drainingHeartbeatTimeout = 5 * time.Second
)

type managedProcessor[SP ShardProcessor] struct {
Expand Down Expand Up @@ -191,12 +192,14 @@ func (e *executorImpl[SP]) heartbeatloop(ctx context.Context) {
for {
select {
case <-ctx.Done():
e.logger.Info("shard distributorexecutor context done, stopping")
e.logger.Info("shard distributor executor context done, stopping")
e.stopShardProcessors()
e.sendDrainingHeartbeat()
return
case <-e.stopC:
e.logger.Info("shard distributorexecutor stopped")
e.logger.Info("shard distributor executor stopped")
e.stopShardProcessors()
e.sendDrainingHeartbeat()
return
case <-heartBeatTimer.Chan():
heartBeatTimer.Reset(backoff.JitDuration(e.heartBeatInterval, heartbeatJitterCoeff))
Expand Down Expand Up @@ -269,6 +272,10 @@ func (e *executorImpl[SP]) updateShardAssignmentMetered(ctx context.Context, sha
}

func (e *executorImpl[SP]) heartbeat(ctx context.Context) (shardAssignments map[string]*types.ShardAssignment, migrationMode types.MigrationMode, err error) {
return e.sendHeartbeat(ctx, types.ExecutorStatusACTIVE)
}

func (e *executorImpl[SP]) sendHeartbeat(ctx context.Context, status types.ExecutorStatus) (map[string]*types.ShardAssignment, types.MigrationMode, error) {
// Fill in the shard status reports
shardStatusReports := make(map[string]*types.ShardStatusReport)
e.managedProcessors.Range(func(shardID string, managedProcessor *managedProcessor[SP]) bool {
Expand All @@ -289,7 +296,7 @@ func (e *executorImpl[SP]) heartbeat(ctx context.Context) (shardAssignments map[
request := &types.ExecutorHeartbeatRequest{
Namespace: e.namespace,
ExecutorID: e.executorID,
Status: types.ExecutorStatusACTIVE,
Status: status,
ShardStatusReports: shardStatusReports,
Metadata: e.metadata.Get(),
}
Expand All @@ -314,6 +321,16 @@ func (e *executorImpl[SP]) heartbeat(ctx context.Context) (shardAssignments map[
return response.ShardAssignments, response.MigrationMode, nil
}

func (e *executorImpl[SP]) sendDrainingHeartbeat() {
ctx, cancel := context.WithTimeout(context.Background(), drainingHeartbeatTimeout)
defer cancel()

_, _, err := e.sendHeartbeat(ctx, types.ExecutorStatusDRAINING)
if err != nil {
e.logger.Error("failed to send draining heartbeat", tag.Error(err))
}
}

func (e *executorImpl[SP]) updateShardAssignment(ctx context.Context, shardAssignments map[string]*types.ShardAssignment) {
wg := sync.WaitGroup{}

Expand Down
Loading
Loading