-
Notifications
You must be signed in to change notification settings - Fork 867
feat: Heartbeat shard statistics #7431
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 19 commits
2de12d8
5d95067
6e57536
595d320
d9ba54d
32d2ecd
b624a00
aad7b2e
6360f8a
1536d0a
f316fbf
4524da9
126f725
cc53f68
733bbcb
6816b8e
f97e0cf
513e88c
9833525
0332fe5
d5a13d9
634bc02
812e854
b9813e7
dfb7448
36ec08f
38a6e81
af733e6
a52e86f
abfc80e
df0feaf
8546a26
dde87ef
415e80c
c67d5c3
9ffcefb
cc769bf
8c22663
5ac3c5d
3973b82
3830d5e
443c0b1
9d159e7
18e63b7
e08a286
08eb635
f63664a
10e2ffa
8c6b0c8
158e030
dd45ff0
05e0d1d
e0779ec
9546f24
db70702
481f9c6
f754dd6
3366828
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,6 +8,7 @@ import ( | |
| "encoding/json" | ||
| "errors" | ||
| "fmt" | ||
| "math" | ||
| "strconv" | ||
| "time" | ||
|
|
||
|
|
@@ -153,9 +154,108 @@ func (s *executorStoreImpl) RecordHeartbeat(ctx context.Context, namespace, exec | |
| if err != nil { | ||
| return fmt.Errorf("record heartbeat: %w", err) | ||
| } | ||
|
|
||
| s.recordShardStatistics(ctx, namespace, executorID, request.ReportedShards) | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func (s *executorStoreImpl) recordShardStatistics(ctx context.Context, namespace, executorID string, reported map[string]*types.ShardStatusReport) { | ||
| if len(reported) == 0 { | ||
| return | ||
| } | ||
|
|
||
| now := s.timeSource.Now().Unix() | ||
|
|
||
| for shardID, report := range reported { | ||
| if report == nil { | ||
| s.logger.Warn("empty report; skipping EWMA update", | ||
| tag.ShardNamespace(namespace), | ||
| tag.ShardExecutor(executorID), | ||
| tag.ShardKey(shardID), | ||
| ) | ||
| continue | ||
| } | ||
|
|
||
| load := report.ShardLoad | ||
| if math.IsNaN(load) || math.IsInf(load, 0) { | ||
| s.logger.Warn( | ||
| "invalid shard load reported; skipping EWMA update", | ||
| tag.ShardNamespace(namespace), | ||
| tag.ShardExecutor(executorID), | ||
| tag.ShardKey(shardID), | ||
| ) | ||
| continue | ||
| } | ||
|
|
||
| shardStatsKey, err := etcdkeys.BuildShardKey(s.prefix, namespace, shardID, etcdkeys.ShardStatisticsKey) | ||
| if err != nil { | ||
| s.logger.Warn( | ||
| "failed to build shard statistics key from heartbeat", | ||
| tag.ShardNamespace(namespace), | ||
| tag.ShardExecutor(executorID), | ||
| tag.ShardKey(shardID), | ||
| tag.Error(err), | ||
| ) | ||
| continue | ||
| } | ||
|
|
||
| statsResp, err := s.client.Get(ctx, shardStatsKey) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we are performing a get operation for each shard for each heartbeat and I am not sure etcd is handling this |
||
| if err != nil { | ||
| s.logger.Warn( | ||
| "failed to read shard statistics for heartbeat update", | ||
| tag.ShardNamespace(namespace), | ||
| tag.ShardExecutor(executorID), | ||
| tag.ShardKey(shardID), | ||
| tag.Error(err), | ||
| ) | ||
| continue | ||
| } | ||
|
|
||
| var stats store.ShardStatistics | ||
| if len(statsResp.Kvs) > 0 { | ||
| err := common.DecompressAndUnmarshal(statsResp.Kvs[0].Value, &stats) | ||
| if err != nil { | ||
| s.logger.Warn( | ||
| "failed to unmarshal shard statistics for heartbeat update", | ||
| tag.ShardNamespace(namespace), | ||
| tag.ShardExecutor(executorID), | ||
| tag.ShardKey(shardID), | ||
| tag.Error(err), | ||
| ) | ||
| continue | ||
| } | ||
| } | ||
|
|
||
| // Update smoothed load via EWMA. | ||
| stats.SmoothedLoad = ewmaSmoothedLoad(stats.SmoothedLoad, load, stats.LastUpdateTime, now) | ||
| stats.LastUpdateTime = now | ||
|
|
||
| payload, err := json.Marshal(stats) | ||
| if err != nil { | ||
| s.logger.Warn( | ||
| "failed to marshal shard statistics after heartbeat", | ||
| tag.ShardNamespace(namespace), | ||
| tag.ShardExecutor(executorID), | ||
| tag.ShardKey(shardID), | ||
| tag.Error(err), | ||
| ) | ||
| continue | ||
| } | ||
|
|
||
| _, err = s.client.Put(ctx, shardStatsKey, string(payload)) | ||
|
||
| if err != nil { | ||
| s.logger.Warn( | ||
| "failed to persist shard statistics from heartbeat", | ||
| tag.ShardNamespace(namespace), | ||
| tag.ShardExecutor(executorID), | ||
| tag.ShardKey(shardID), | ||
| tag.Error(err), | ||
| ) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // GetHeartbeat retrieves the last known heartbeat state for a single executor. | ||
| func (s *executorStoreImpl) GetHeartbeat(ctx context.Context, namespace string, executorID string) (*store.HeartbeatState, *store.AssignedState, error) { | ||
| // The prefix for all keys related to a single executor. | ||
|
|
@@ -717,3 +817,13 @@ func (s *executorStoreImpl) applyShardStatisticsUpdates(ctx context.Context, nam | |
| } | ||
| } | ||
| } | ||
|
|
||
| func ewmaSmoothedLoad(prev, current float64, lastUpdate, now int64) float64 { | ||
| const tauSeconds = 30.0 // smaller = more responsive, larger = smoother | ||
| if lastUpdate <= 0 || tauSeconds <= 0 { | ||
| return current | ||
| } | ||
| dt := max(now-lastUpdate, 0) | ||
| alpha := 1 - math.Exp(-float64(dt)/tauSeconds) | ||
| return (1-alpha)*prev + alpha*current | ||
| } | ||

There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's discuss if this should happen syncrhronously when we record heartbeat