Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions service/sharddistributor/store/etcd/executorstore/etcdstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,9 @@ func (s *executorStoreImpl) Subscribe(ctx context.Context, namespace string) (<-

func (s *executorStoreImpl) AssignShards(ctx context.Context, namespace string, request store.AssignShardsRequest, guard store.GuardFunc) error {
var ops []clientv3.Op
var opsElse []clientv3.Op
var comparisons []clientv3.Cmp
comparisonMaps := make(map[string]int64)

statsUpdates, err := s.prepareShardStatisticsUpdates(ctx, namespace, request.NewState.ShardAssignments)
if err != nil {
Expand All @@ -363,10 +365,12 @@ func (s *executorStoreImpl) AssignShards(ctx context.Context, namespace string,
// Add a comparison to ensure the executor's assigned state hasn't changed
// This prevents deleting an executor that just received a shard assignment
comparisons = append(comparisons, clientv3.Compare(clientv3.ModRevision(executorStateKey), "=", expectedModRevision))
comparisonMaps[executorStateKey] = expectedModRevision

// Delete all keys for this executor
executorPrefix := etcdkeys.BuildExecutorIDPrefix(s.prefix, namespace, executorID)
ops = append(ops, clientv3.OpDelete(executorPrefix, clientv3.WithPrefix()))
opsElse = append(opsElse, clientv3.OpGet(executorStateKey))
}

// 2. Prepare operations to update executor states and shard ownership,
Expand All @@ -386,6 +390,8 @@ func (s *executorStoreImpl) AssignShards(ctx context.Context, namespace string,
ops = append(ops, clientv3.OpPut(executorStateKey, string(compressedValue)))

comparisons = append(comparisons, clientv3.Compare(clientv3.ModRevision(executorStateKey), "=", state.ModRevision))
comparisonMaps[executorStateKey] = state.ModRevision
opsElse = append(opsElse, clientv3.OpGet(executorStateKey))
}

if len(ops) == 0 {
Expand All @@ -405,10 +411,11 @@ func (s *executorStoreImpl) AssignShards(ctx context.Context, namespace string,

// 4. Create a nested transaction operation. This allows us to add our own 'If' (comparisons)
// and 'Then' (ops) logic that will only execute if the outer guard's 'If' condition passes.
// we catch what is the state in the else operations so we can identify which part of the condition failed
nestedTxnOp := clientv3.OpTxn(
comparisons, // Our IF conditions
ops, // Our THEN operations
nil, // Our ELSE operations
opsElse, // Our ELSE operations
)

// 5. Add the nested transaction to the guarded transaction's THEN clause and commit.
Expand All @@ -429,10 +436,18 @@ func (s *executorStoreImpl) AssignShards(ctx context.Context, namespace string,
if len(txnResp.Responses) == 0 {
return fmt.Errorf("unexpected empty response from transaction")
}

nestedResp := txnResp.Responses[0].GetResponseTxn()
if !nestedResp.Succeeded {
// This means our revision checks failed.
return fmt.Errorf("%w: transaction failed, a shard may have been concurrently assigned", store.ErrVersionConflict)
failingRevisionString := ""
for _, keyValue := range nestedResp.Responses[0].GetResponseRange().Kvs {
expectedValue, ok := comparisonMaps[string(keyValue.Key)]
if !ok || expectedValue != keyValue.ModRevision {
failingRevisionString = failingRevisionString + fmt.Sprintf("{ key: %s, expected:%v, actual: %v }", string(keyValue.Key), expectedValue, keyValue.ModRevision)
}
}
return fmt.Errorf("%w: transaction failed, a shard may have been concurrently assigned, %v", store.ErrVersionConflict, failingRevisionString)
}

// Apply shard statistics updates outside the main transaction to stay within etcd's max operations per txn.
Expand Down
Loading