Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.Timestamps;
import com.uber.m3.tally.Scope;
import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.history.v1.HistoryEvent;
Expand All @@ -14,14 +13,12 @@
import io.temporal.api.update.v1.Input;
import io.temporal.api.update.v1.Request;
import io.temporal.failure.CanceledFailure;
import io.temporal.internal.common.FailureUtils;
import io.temporal.internal.common.ProtobufTimeUtils;
import io.temporal.internal.common.UpdateMessage;
import io.temporal.internal.statemachines.WorkflowStateMachines;
import io.temporal.internal.sync.SignalHandlerInfo;
import io.temporal.internal.sync.UpdateHandlerInfo;
import io.temporal.internal.worker.WorkflowExecutionException;
import io.temporal.worker.MetricsType;
import io.temporal.worker.NonDeterministicException;
import io.temporal.workflow.HandlerUnfinishedPolicy;
import java.util.List;
Expand Down Expand Up @@ -63,16 +60,13 @@ final class ReplayWorkflowExecutor {

private final ReplayWorkflowContextImpl context;

private final Scope metricsScope;

public ReplayWorkflowExecutor(
ReplayWorkflow workflow,
WorkflowStateMachines workflowStateMachines,
ReplayWorkflowContextImpl context) {
this.workflow = workflow;
this.workflowStateMachines = workflowStateMachines;
this.context = context;
this.metricsScope = context.getMetricsScope();
}

public void eventLoop() {
Expand Down Expand Up @@ -131,12 +125,8 @@ private void completeWorkflow(@Nullable WorkflowExecutionException failure) {

if (context.isCancelRequested()) {
workflowStateMachines.cancelWorkflow();
metricsScope.counter(MetricsType.WORKFLOW_CANCELED_COUNTER).inc(1);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These completion counters are now tracked inside workflow state machines object

} else if (failure != null) {
workflowStateMachines.failWorkflow(failure.getFailure());
if (!FailureUtils.isBenignApplicationFailure(failure.getFailure())) {
metricsScope.counter(MetricsType.WORKFLOW_FAILED_COUNTER).inc(1);
}
} else {
ContinueAsNewWorkflowExecutionCommandAttributes attributes =
context.getContinueAsNewOnCompletion();
Expand All @@ -152,23 +142,17 @@ private void completeWorkflow(@Nullable WorkflowExecutionException failure) {
// This way attributes will need to be carried over in the mutable state and the flow
// generally will be aligned with the flow of other commands.
workflowStateMachines.continueAsNewWorkflow(attributes);

// TODO Issue #1590
metricsScope.counter(MetricsType.WORKFLOW_CONTINUE_AS_NEW_COUNTER).inc(1);
} else {
Optional<Payloads> workflowOutput = workflow.getOutput();
workflowStateMachines.completeWorkflow(workflowOutput);

// TODO Issue #1590
metricsScope.counter(MetricsType.WORKFLOW_COMPLETED_COUNTER).inc(1);
}
}

com.uber.m3.util.Duration d =
ProtobufTimeUtils.toM3Duration(
Timestamps.fromMillis(System.currentTimeMillis()),
Timestamps.fromMillis(context.getRunStartedTimestampMillis()));
metricsScope.timer(MetricsType.WORKFLOW_E2E_LATENCY).record(d);
workflowStateMachines.setPostCompletionEndToEndLatency(d);
}

public void handleWorkflowExecutionCancelRequested(HistoryEvent event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,21 @@ public WorkflowTaskResult handleWorkflowTask(
if (workflow.getWorkflowContext() != null) {
result.setVersioningBehavior(workflow.getWorkflowContext().getVersioningBehavior());
}
// Setup post-completion metrics to be applied after task response accepted
String postCompleteCounter = workflowStateMachines.getPostCompletionMetricCounter();
com.uber.m3.util.Duration postCompleteLatency =
workflowStateMachines.getPostCompletionEndToEndLatency();
if (postCompleteCounter != null || postCompleteLatency != null) {
result.setApplyPostCompletionMetrics(
() -> {
if (postCompleteCounter != null) {
metricsScope.counter(postCompleteCounter).inc(1);
}
if (postCompleteLatency != null) {
metricsScope.timer(MetricsType.WORKFLOW_E2E_LATENCY).record(postCompleteLatency);
}
});
}
return result.build();
} finally {
lock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ private Result createCompletedWFTRequest(
null,
null,
result.isFinalCommand(),
eventIdSetHandle);
eventIdSetHandle,
result.getApplyPostCompletionMetrics());
}

private Result failureToWFTResult(
Expand All @@ -275,7 +276,8 @@ private Result failureToWFTResult(
.setFailure(((WorkflowExecutionException) e).getFailure()))
.build())
.build();
return new WorkflowTaskHandler.Result(workflowType, response, null, null, null, false, null);
return new WorkflowTaskHandler.Result(
workflowType, response, null, null, null, false, null, null);
}

WorkflowExecution execution = workflowTask.getWorkflowExecution();
Expand Down Expand Up @@ -316,7 +318,7 @@ private Result failureToWFTResult(
WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE);
}
return new WorkflowTaskHandler.Result(
workflowType, null, failedRequest.build(), null, null, false, null);
workflowType, null, failedRequest.build(), null, null, false, null, null);
}

private Result createDirectQueryResult(
Expand Down Expand Up @@ -346,6 +348,7 @@ private Result createDirectQueryResult(
queryCompletedRequest.build(),
null,
false,
null,
null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public static final class Builder {
private String writeSdkName;
private String writeSdkVersion;
private VersioningBehavior versioningBehavior;
private Runnable applyPostCompletionMetrics;

public Builder setCommands(List<Command> commands) {
this.commands = commands;
Expand Down Expand Up @@ -76,6 +77,11 @@ public Builder setVersioningBehavior(VersioningBehavior versioningBehavior) {
return this;
}

public Builder setApplyPostCompletionMetrics(Runnable applyPostCompletionMetrics) {
this.applyPostCompletionMetrics = applyPostCompletionMetrics;
return this;
}

public WorkflowTaskResult build() {
return new WorkflowTaskResult(
commands == null ? Collections.emptyList() : commands,
Expand All @@ -87,7 +93,8 @@ public WorkflowTaskResult build() {
sdkFlags == null ? Collections.emptyList() : sdkFlags,
writeSdkName,
writeSdkVersion,
versioningBehavior == null ? VersioningBehavior.UNSPECIFIED : versioningBehavior);
versioningBehavior == null ? VersioningBehavior.UNSPECIFIED : versioningBehavior,
applyPostCompletionMetrics);
}
}

Expand All @@ -101,6 +108,7 @@ public WorkflowTaskResult build() {
private final String writeSdkName;
private final String writeSdkVersion;
private final VersioningBehavior versioningBehavior;
private final Runnable applyPostCompletionMetrics;

private WorkflowTaskResult(
List<Command> commands,
Expand All @@ -112,7 +120,8 @@ private WorkflowTaskResult(
List<Integer> sdkFlags,
String writeSdkName,
String writeSdkVersion,
VersioningBehavior versioningBehavior) {
VersioningBehavior versioningBehavior,
Runnable applyPostCompletionMetrics) {
this.commands = commands;
this.messages = messages;
this.nonfirstLocalActivityAttempts = nonfirstLocalActivityAttempts;
Expand All @@ -126,6 +135,7 @@ private WorkflowTaskResult(
this.writeSdkName = writeSdkName;
this.writeSdkVersion = writeSdkVersion;
this.versioningBehavior = versioningBehavior;
this.applyPostCompletionMetrics = applyPostCompletionMetrics;
}

public List<Command> getCommands() {
Expand Down Expand Up @@ -168,4 +178,8 @@ public String getWriteSdkVersion() {
public VersioningBehavior getVersioningBehavior() {
return versioningBehavior;
}

public Runnable getApplyPostCompletionMetrics() {
return applyPostCompletionMetrics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.protobuf.Any;
import com.uber.m3.util.Duration;
import io.temporal.api.command.v1.*;
import io.temporal.api.common.v1.*;
import io.temporal.api.enums.v1.EventType;
Expand All @@ -26,6 +27,7 @@
import io.temporal.internal.sync.WorkflowThread;
import io.temporal.internal.worker.LocalActivityResult;
import io.temporal.serviceclient.Version;
import io.temporal.worker.MetricsType;
import io.temporal.worker.NonDeterministicException;
import io.temporal.worker.WorkflowImplementationOptions;
import io.temporal.workflow.ChildWorkflowCancellationType;
Expand Down Expand Up @@ -187,6 +189,9 @@ enum HandleEventStatus {
*/
private boolean shouldSkipUpsertVersionSA = false;

private String postCompletionMetricCounter;
private com.uber.m3.util.Duration postCompletionEndToEndLatency;

public WorkflowStateMachines(
StatesMachinesCallback callbacks,
GetSystemInfoResponse.Capabilities capabilities,
Expand Down Expand Up @@ -873,6 +878,18 @@ public long getLastStartedEventId() {
return lastWFTStartedEventId;
}

public String getPostCompletionMetricCounter() {
return postCompletionMetricCounter;
}

public Duration getPostCompletionEndToEndLatency() {
return postCompletionEndToEndLatency;
}

public void setPostCompletionEndToEndLatency(Duration postCompletionEndToEndLatency) {
this.postCompletionEndToEndLatency = postCompletionEndToEndLatency;
}

/**
* @param attributes attributes used to schedule an activity
* @param callback completion callback
Expand Down Expand Up @@ -1112,11 +1129,15 @@ public void upsertMemo(Memo memo) {
public void completeWorkflow(Optional<Payloads> workflowOutput) {
checkEventLoopExecuting();
CompleteWorkflowStateMachine.newInstance(workflowOutput, commandSink, stateMachineSink);
postCompletionMetricCounter = MetricsType.WORKFLOW_COMPLETED_COUNTER;
}

public void failWorkflow(Failure failure) {
checkEventLoopExecuting();
FailWorkflowStateMachine.newInstance(failure, commandSink, stateMachineSink);
if (!FailureUtils.isBenignApplicationFailure(failure)) {
postCompletionMetricCounter = MetricsType.WORKFLOW_FAILED_COUNTER;
}
}

public void cancelWorkflow() {
Expand All @@ -1125,11 +1146,13 @@ public void cancelWorkflow() {
CancelWorkflowExecutionCommandAttributes.getDefaultInstance(),
commandSink,
stateMachineSink);
postCompletionMetricCounter = MetricsType.WORKFLOW_CANCELED_COUNTER;
}

public void continueAsNewWorkflow(ContinueAsNewWorkflowExecutionCommandAttributes attributes) {
checkEventLoopExecuting();
ContinueAsNewWorkflowStateMachine.newInstance(attributes, commandSink, stateMachineSink);
postCompletionMetricCounter = MetricsType.WORKFLOW_CONTINUE_AS_NEW_COUNTER;
}

public boolean isReplaying() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ final class Result {
private final RpcRetryOptions requestRetryOptions;
private final boolean completionCommand;
private final Functions.Proc1<Long> resetEventIdHandle;
private final Runnable applyPostCompletionMetrics;

public Result(
String workflowType,
Expand All @@ -30,14 +31,16 @@ public Result(
RespondQueryTaskCompletedRequest queryCompleted,
RpcRetryOptions requestRetryOptions,
boolean completionCommand,
Functions.Proc1<Long> resetEventIdHandle) {
Functions.Proc1<Long> resetEventIdHandle,
Runnable applyPostCompletionMetrics) {
this.workflowType = workflowType;
this.taskCompleted = taskCompleted;
this.taskFailed = taskFailed;
this.queryCompleted = queryCompleted;
this.requestRetryOptions = requestRetryOptions;
this.completionCommand = completionCommand;
this.resetEventIdHandle = resetEventIdHandle;
this.applyPostCompletionMetrics = applyPostCompletionMetrics;
}

public RespondWorkflowTaskCompletedRequest getTaskCompleted() {
Expand Down Expand Up @@ -67,6 +70,10 @@ public Functions.Proc1<Long> getResetEventIdHandle() {
return (arg) -> {};
}

public Runnable getApplyPostCompletionMetrics() {
return applyPostCompletionMetrics;
}

@Override
public String toString() {
return "Result{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,11 @@ public void handle(WorkflowTask task) throws Exception {
result.getRequestRetryOptions(),
workflowTypeScope);
}

// Apply post-completion metrics only if runnable present and the above succeeded
if (result.getApplyPostCompletionMetrics() != null) {
result.getApplyPostCompletionMetrics().run();
}
} catch (GrpcMessageTooLargeException e) {
// Only fail workflow task on the first attempt, subsequent failures of the same
// workflow task should timeout.
Expand Down
Loading
Loading