Skip to content

Commit a39d10c

Browse files
authored
Defer updating workflow completion metrics until completion accepted by server (#2742)
Fixes #1590
1 parent b657faa commit a39d10c

File tree

10 files changed

+367
-90
lines changed

10 files changed

+367
-90
lines changed

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowExecutor.java

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import com.google.common.annotations.VisibleForTesting;
44
import com.google.protobuf.InvalidProtocolBufferException;
55
import com.google.protobuf.util.Timestamps;
6-
import com.uber.m3.tally.Scope;
76
import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
87
import io.temporal.api.common.v1.Payloads;
98
import io.temporal.api.history.v1.HistoryEvent;
@@ -14,14 +13,12 @@
1413
import io.temporal.api.update.v1.Input;
1514
import io.temporal.api.update.v1.Request;
1615
import io.temporal.failure.CanceledFailure;
17-
import io.temporal.internal.common.FailureUtils;
1816
import io.temporal.internal.common.ProtobufTimeUtils;
1917
import io.temporal.internal.common.UpdateMessage;
2018
import io.temporal.internal.statemachines.WorkflowStateMachines;
2119
import io.temporal.internal.sync.SignalHandlerInfo;
2220
import io.temporal.internal.sync.UpdateHandlerInfo;
2321
import io.temporal.internal.worker.WorkflowExecutionException;
24-
import io.temporal.worker.MetricsType;
2522
import io.temporal.worker.NonDeterministicException;
2623
import io.temporal.workflow.HandlerUnfinishedPolicy;
2724
import java.util.List;
@@ -63,16 +60,13 @@ final class ReplayWorkflowExecutor {
6360

6461
private final ReplayWorkflowContextImpl context;
6562

66-
private final Scope metricsScope;
67-
6863
public ReplayWorkflowExecutor(
6964
ReplayWorkflow workflow,
7065
WorkflowStateMachines workflowStateMachines,
7166
ReplayWorkflowContextImpl context) {
7267
this.workflow = workflow;
7368
this.workflowStateMachines = workflowStateMachines;
7469
this.context = context;
75-
this.metricsScope = context.getMetricsScope();
7670
}
7771

7872
public void eventLoop() {
@@ -131,12 +125,8 @@ private void completeWorkflow(@Nullable WorkflowExecutionException failure) {
131125

132126
if (context.isCancelRequested()) {
133127
workflowStateMachines.cancelWorkflow();
134-
metricsScope.counter(MetricsType.WORKFLOW_CANCELED_COUNTER).inc(1);
135128
} else if (failure != null) {
136129
workflowStateMachines.failWorkflow(failure.getFailure());
137-
if (!FailureUtils.isBenignApplicationFailure(failure.getFailure())) {
138-
metricsScope.counter(MetricsType.WORKFLOW_FAILED_COUNTER).inc(1);
139-
}
140130
} else {
141131
ContinueAsNewWorkflowExecutionCommandAttributes attributes =
142132
context.getContinueAsNewOnCompletion();
@@ -152,23 +142,17 @@ private void completeWorkflow(@Nullable WorkflowExecutionException failure) {
152142
// This way attributes will need to be carried over in the mutable state and the flow
153143
// generally will be aligned with the flow of other commands.
154144
workflowStateMachines.continueAsNewWorkflow(attributes);
155-
156-
// TODO Issue #1590
157-
metricsScope.counter(MetricsType.WORKFLOW_CONTINUE_AS_NEW_COUNTER).inc(1);
158145
} else {
159146
Optional<Payloads> workflowOutput = workflow.getOutput();
160147
workflowStateMachines.completeWorkflow(workflowOutput);
161-
162-
// TODO Issue #1590
163-
metricsScope.counter(MetricsType.WORKFLOW_COMPLETED_COUNTER).inc(1);
164148
}
165149
}
166150

167151
com.uber.m3.util.Duration d =
168152
ProtobufTimeUtils.toM3Duration(
169153
Timestamps.fromMillis(System.currentTimeMillis()),
170154
Timestamps.fromMillis(context.getRunStartedTimestampMillis()));
171-
metricsScope.timer(MetricsType.WORKFLOW_E2E_LATENCY).record(d);
155+
workflowStateMachines.setPostCompletionEndToEndLatency(d);
172156
}
173157

174158
public void handleWorkflowExecutionCancelRequested(HistoryEvent event) {

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,21 @@ public WorkflowTaskResult handleWorkflowTask(
185185
if (workflow.getWorkflowContext() != null) {
186186
result.setVersioningBehavior(workflow.getWorkflowContext().getVersioningBehavior());
187187
}
188+
// Setup post-completion metrics to be applied after task response accepted
189+
String postCompleteCounter = workflowStateMachines.getPostCompletionMetricCounter();
190+
com.uber.m3.util.Duration postCompleteLatency =
191+
workflowStateMachines.getPostCompletionEndToEndLatency();
192+
if (postCompleteCounter != null || postCompleteLatency != null) {
193+
result.setApplyPostCompletionMetrics(
194+
() -> {
195+
if (postCompleteCounter != null) {
196+
metricsScope.counter(postCompleteCounter).inc(1);
197+
}
198+
if (postCompleteLatency != null) {
199+
metricsScope.timer(MetricsType.WORKFLOW_E2E_LATENCY).record(postCompleteLatency);
200+
}
201+
});
202+
}
188203
return result.build();
189204
} finally {
190205
lock.unlock();

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,8 @@ private Result createCompletedWFTRequest(
252252
null,
253253
null,
254254
result.isFinalCommand(),
255-
eventIdSetHandle);
255+
eventIdSetHandle,
256+
result.getApplyPostCompletionMetrics());
256257
}
257258

258259
private Result failureToWFTResult(
@@ -275,7 +276,8 @@ private Result failureToWFTResult(
275276
.setFailure(((WorkflowExecutionException) e).getFailure()))
276277
.build())
277278
.build();
278-
return new WorkflowTaskHandler.Result(workflowType, response, null, null, null, false, null);
279+
return new WorkflowTaskHandler.Result(
280+
workflowType, response, null, null, null, false, null, null);
279281
}
280282

281283
WorkflowExecution execution = workflowTask.getWorkflowExecution();
@@ -316,7 +318,7 @@ private Result failureToWFTResult(
316318
WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE);
317319
}
318320
return new WorkflowTaskHandler.Result(
319-
workflowType, null, failedRequest.build(), null, null, false, null);
321+
workflowType, null, failedRequest.build(), null, null, false, null, null);
320322
}
321323

322324
private Result createDirectQueryResult(
@@ -346,6 +348,7 @@ private Result createDirectQueryResult(
346348
queryCompletedRequest.build(),
347349
null,
348350
false,
351+
null,
349352
null);
350353
}
351354

temporal-sdk/src/main/java/io/temporal/internal/replay/WorkflowTaskResult.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public static final class Builder {
2525
private String writeSdkName;
2626
private String writeSdkVersion;
2727
private VersioningBehavior versioningBehavior;
28+
private Runnable applyPostCompletionMetrics;
2829

2930
public Builder setCommands(List<Command> commands) {
3031
this.commands = commands;
@@ -76,6 +77,11 @@ public Builder setVersioningBehavior(VersioningBehavior versioningBehavior) {
7677
return this;
7778
}
7879

80+
public Builder setApplyPostCompletionMetrics(Runnable applyPostCompletionMetrics) {
81+
this.applyPostCompletionMetrics = applyPostCompletionMetrics;
82+
return this;
83+
}
84+
7985
public WorkflowTaskResult build() {
8086
return new WorkflowTaskResult(
8187
commands == null ? Collections.emptyList() : commands,
@@ -87,7 +93,8 @@ public WorkflowTaskResult build() {
8793
sdkFlags == null ? Collections.emptyList() : sdkFlags,
8894
writeSdkName,
8995
writeSdkVersion,
90-
versioningBehavior == null ? VersioningBehavior.UNSPECIFIED : versioningBehavior);
96+
versioningBehavior == null ? VersioningBehavior.UNSPECIFIED : versioningBehavior,
97+
applyPostCompletionMetrics);
9198
}
9299
}
93100

@@ -101,6 +108,7 @@ public WorkflowTaskResult build() {
101108
private final String writeSdkName;
102109
private final String writeSdkVersion;
103110
private final VersioningBehavior versioningBehavior;
111+
private final Runnable applyPostCompletionMetrics;
104112

105113
private WorkflowTaskResult(
106114
List<Command> commands,
@@ -112,7 +120,8 @@ private WorkflowTaskResult(
112120
List<Integer> sdkFlags,
113121
String writeSdkName,
114122
String writeSdkVersion,
115-
VersioningBehavior versioningBehavior) {
123+
VersioningBehavior versioningBehavior,
124+
Runnable applyPostCompletionMetrics) {
116125
this.commands = commands;
117126
this.messages = messages;
118127
this.nonfirstLocalActivityAttempts = nonfirstLocalActivityAttempts;
@@ -126,6 +135,7 @@ private WorkflowTaskResult(
126135
this.writeSdkName = writeSdkName;
127136
this.writeSdkVersion = writeSdkVersion;
128137
this.versioningBehavior = versioningBehavior;
138+
this.applyPostCompletionMetrics = applyPostCompletionMetrics;
129139
}
130140

131141
public List<Command> getCommands() {
@@ -168,4 +178,8 @@ public String getWriteSdkVersion() {
168178
public VersioningBehavior getVersioningBehavior() {
169179
return versioningBehavior;
170180
}
181+
182+
public Runnable getApplyPostCompletionMetrics() {
183+
return applyPostCompletionMetrics;
184+
}
171185
}

temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.google.common.base.Preconditions;
1111
import com.google.common.base.Strings;
1212
import com.google.protobuf.Any;
13+
import com.uber.m3.util.Duration;
1314
import io.temporal.api.command.v1.*;
1415
import io.temporal.api.common.v1.*;
1516
import io.temporal.api.enums.v1.EventType;
@@ -26,6 +27,7 @@
2627
import io.temporal.internal.sync.WorkflowThread;
2728
import io.temporal.internal.worker.LocalActivityResult;
2829
import io.temporal.serviceclient.Version;
30+
import io.temporal.worker.MetricsType;
2931
import io.temporal.worker.NonDeterministicException;
3032
import io.temporal.worker.WorkflowImplementationOptions;
3133
import io.temporal.workflow.ChildWorkflowCancellationType;
@@ -187,6 +189,9 @@ enum HandleEventStatus {
187189
*/
188190
private boolean shouldSkipUpsertVersionSA = false;
189191

192+
private String postCompletionMetricCounter;
193+
private com.uber.m3.util.Duration postCompletionEndToEndLatency;
194+
190195
public WorkflowStateMachines(
191196
StatesMachinesCallback callbacks,
192197
GetSystemInfoResponse.Capabilities capabilities,
@@ -873,6 +878,18 @@ public long getLastStartedEventId() {
873878
return lastWFTStartedEventId;
874879
}
875880

881+
public String getPostCompletionMetricCounter() {
882+
return postCompletionMetricCounter;
883+
}
884+
885+
public Duration getPostCompletionEndToEndLatency() {
886+
return postCompletionEndToEndLatency;
887+
}
888+
889+
public void setPostCompletionEndToEndLatency(Duration postCompletionEndToEndLatency) {
890+
this.postCompletionEndToEndLatency = postCompletionEndToEndLatency;
891+
}
892+
876893
/**
877894
* @param attributes attributes used to schedule an activity
878895
* @param callback completion callback
@@ -1112,11 +1129,15 @@ public void upsertMemo(Memo memo) {
11121129
public void completeWorkflow(Optional<Payloads> workflowOutput) {
11131130
checkEventLoopExecuting();
11141131
CompleteWorkflowStateMachine.newInstance(workflowOutput, commandSink, stateMachineSink);
1132+
postCompletionMetricCounter = MetricsType.WORKFLOW_COMPLETED_COUNTER;
11151133
}
11161134

11171135
public void failWorkflow(Failure failure) {
11181136
checkEventLoopExecuting();
11191137
FailWorkflowStateMachine.newInstance(failure, commandSink, stateMachineSink);
1138+
if (!FailureUtils.isBenignApplicationFailure(failure)) {
1139+
postCompletionMetricCounter = MetricsType.WORKFLOW_FAILED_COUNTER;
1140+
}
11201141
}
11211142

11221143
public void cancelWorkflow() {
@@ -1125,11 +1146,13 @@ public void cancelWorkflow() {
11251146
CancelWorkflowExecutionCommandAttributes.getDefaultInstance(),
11261147
commandSink,
11271148
stateMachineSink);
1149+
postCompletionMetricCounter = MetricsType.WORKFLOW_CANCELED_COUNTER;
11281150
}
11291151

11301152
public void continueAsNewWorkflow(ContinueAsNewWorkflowExecutionCommandAttributes attributes) {
11311153
checkEventLoopExecuting();
11321154
ContinueAsNewWorkflowStateMachine.newInstance(attributes, commandSink, stateMachineSink);
1155+
postCompletionMetricCounter = MetricsType.WORKFLOW_CONTINUE_AS_NEW_COUNTER;
11331156
}
11341157

11351158
public boolean isReplaying() {

temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowTaskHandler.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ final class Result {
2222
private final RpcRetryOptions requestRetryOptions;
2323
private final boolean completionCommand;
2424
private final Functions.Proc1<Long> resetEventIdHandle;
25+
private final Runnable applyPostCompletionMetrics;
2526

2627
public Result(
2728
String workflowType,
@@ -30,14 +31,16 @@ public Result(
3031
RespondQueryTaskCompletedRequest queryCompleted,
3132
RpcRetryOptions requestRetryOptions,
3233
boolean completionCommand,
33-
Functions.Proc1<Long> resetEventIdHandle) {
34+
Functions.Proc1<Long> resetEventIdHandle,
35+
Runnable applyPostCompletionMetrics) {
3436
this.workflowType = workflowType;
3537
this.taskCompleted = taskCompleted;
3638
this.taskFailed = taskFailed;
3739
this.queryCompleted = queryCompleted;
3840
this.requestRetryOptions = requestRetryOptions;
3941
this.completionCommand = completionCommand;
4042
this.resetEventIdHandle = resetEventIdHandle;
43+
this.applyPostCompletionMetrics = applyPostCompletionMetrics;
4144
}
4245

4346
public RespondWorkflowTaskCompletedRequest getTaskCompleted() {
@@ -67,6 +70,10 @@ public Functions.Proc1<Long> getResetEventIdHandle() {
6770
return (arg) -> {};
6871
}
6972

73+
public Runnable getApplyPostCompletionMetrics() {
74+
return applyPostCompletionMetrics;
75+
}
76+
7077
@Override
7178
public String toString() {
7279
return "Result{"

temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,11 @@ public void handle(WorkflowTask task) throws Exception {
467467
result.getRequestRetryOptions(),
468468
workflowTypeScope);
469469
}
470+
471+
// Apply post-completion metrics only if runnable present and the above succeeded
472+
if (result.getApplyPostCompletionMetrics() != null) {
473+
result.getApplyPostCompletionMetrics().run();
474+
}
470475
} catch (GrpcMessageTooLargeException e) {
471476
// Only fail workflow task on the first attempt, subsequent failures of the same
472477
// workflow task should timeout.

0 commit comments

Comments
 (0)