Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ func validateAndSerializeSearchAttributes(attributes map[string]interface{}) (*c

func (wc *workflowEnvironmentImpl) UpsertMemo(memoMap map[string]interface{}) error {
// This has to be used in WorkflowEnvironment implementations instead of in Workflow for testsuite mock purpose.
memo, err := validateAndSerializeMemo(memoMap, wc.dataConverter)
memo, err := validateAndSerializeMemo(memoMap, wc.dataConverter, wc)
if err != nil {
return err
}
Expand Down Expand Up @@ -520,11 +520,11 @@ func mergeMemo(current, upsert *commonpb.Memo) *commonpb.Memo {
return current
}

func validateAndSerializeMemo(memoMap map[string]interface{}, dc converter.DataConverter) (*commonpb.Memo, error) {
func validateAndSerializeMemo(memoMap map[string]interface{}, dc converter.DataConverter, accessor memoFlagAccessor) (*commonpb.Memo, error) {
if len(memoMap) == 0 {
return nil, errMemoNotSet
}
return getWorkflowMemo(memoMap, dc)
return getWorkflowMemo(memoMap, dc, accessor)
}

func (wc *workflowEnvironmentImpl) RegisterCancelHandler(handler func()) {
Expand All @@ -540,7 +540,7 @@ func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow(
if params.WorkflowID == "" {
params.WorkflowID = wc.workflowInfo.currentRunID + "_" + wc.GenerateSequenceID()
}
memo, err := getWorkflowMemo(params.Memo, wc.dataConverter)
memo, err := getWorkflowMemo(params.Memo, wc.dataConverter, wc)
if err != nil {
if wc.sdkFlags.tryUse(SDKFlagChildWorkflowErrorExecution, !wc.isReplay) {
startedHandler(WorkflowExecution{}, &ChildWorkflowExecutionAlreadyStartedError{})
Expand Down
8 changes: 5 additions & 3 deletions internal/internal_event_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,13 @@ func Test_MergeSearchAttributes(t *testing.T) {

func Test_ValidateAndSerializeMemo(t *testing.T) {
t.Parallel()
_, err := validateAndSerializeMemo(nil, nil)
_, err := validateAndSerializeMemo(nil, nil, nil)
require.EqualError(t, err, "memo is empty")

attr := map[string]interface{}{
"JustKey": make(chan int),
}
_, err = validateAndSerializeMemo(attr, nil)
_, err = validateAndSerializeMemo(attr, nil, nil)
require.EqualError(
t,
err,
Expand All @@ -229,7 +229,7 @@ func Test_ValidateAndSerializeMemo(t *testing.T) {
attr = map[string]interface{}{
"key": 1,
}
memo, err := validateAndSerializeMemo(attr, nil)
memo, err := validateAndSerializeMemo(attr, nil, nil)
require.NoError(t, err)
require.Equal(t, 1, len(memo.Fields))
var resp int
Expand All @@ -244,6 +244,8 @@ func Test_UpsertMemo(t *testing.T) {
env := &workflowEnvironmentImpl{
commandsHelper: helper,
workflowInfo: GetWorkflowInfo(ctx),
sdkFlags: newSDKFlags(nil),
dataConverter: converter.GetDefaultDataConverter(),
}
helper.setCurrentWorkflowTaskStartedEventID(4)
err := env.UpsertMemo(nil)
Expand Down
15 changes: 14 additions & 1 deletion internal/internal_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,20 @@ const (
// SDKFlagBlockedSelectorSignalReceive will cause a signal to not be lost
// when the Default path is blocked.
SDKFlagBlockedSelectorSignalReceive = 5
SDKFlagUnknown = math.MaxUint32
// SDKFlagMemoUserDCEncode will use the user data converter when encoding a memo. If user data converter fails,
// we will fallback onto the default data converter. If the default DC fails, the user DC error will be returned.
SDKFlagMemoUserDCEncode = 6
Copy link
Member

@cretz cretz Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my confirmation (didn't review whole PR or anything), this flag will now be set for any workflow that uses UpsertMemo or ExecuteChildWorkflow but not for any other workflows? But for now this is off by default unless the MEMO_USER_DC_ENCODE env var is set? And I presume one day we'll just turn it on for all users (i.e. change var memoUserDCEncode = os.Getenv("MEMO_USER_DC_ENCODE") != "" to os.Getenv("MEMO_USER_DC_ENCODE") != "false" or something)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this flag will now be set for any workflow that uses UpsertMemo or ExecuteChildWorkflow but not for any other workflows

What do you mean "not for any other workflows"? My understanding is this should be set for all workflows, regardless of if they use UpsertMemo or ExecuteChildWorkflow.

And I presume one day we'll just turn it on for all users (i.e. change var memoUserDCEncode = os.Getenv("MEMO_USER_DC_ENCODE") != "" to os.Getenv("MEMO_USER_DC_ENCODE") != "false" or something)?

Correct, one day we will flip the switch to turn it on by default. iirc server needs a version or 2 to fall back onto, so we want to wait a bit before flipping the switch on a new SDK flag

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean "not for any other workflows"? My understanding is this should be set for all workflows, regardless of if they use UpsertMemo or ExecuteChildWorkflow.

When I looked at when .TryUse(SDKFlagMemoUserDCEncode) was invoked, it was only in those two situations, but I may be misreading

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh no I think you're right, but does it matter if we set the SDK flag if our workflow isn't exercising Memo code? I wanna say no. The flag is primarily for replaying and ensuring this fix doesn't cause NDE with pre-fix histories

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't matter, in fact I think it's better to not have the flag for everyone, just confirming that is the case.

SDKFlagUnknown = math.MaxUint32
)

// unblockSelectorSignal exists to allow us to configure the default behavior of
// SDKFlagBlockedSelectorSignalReceive. This is primarily useful with tests.
var unblockSelectorSignal = os.Getenv("UNBLOCK_SIGNAL_SELECTOR") != ""

// memoUserDCEncode exists to allow us to configure the default behavior of
// SDKFlagMemoUserDCEncode. This is primarily useful with tests.
var memoUserDCEncode = os.Getenv("MEMO_USER_DC_ENCODE") != ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says this is useful for tests, but I don't see this used in the SDK anywhere. Why did we need to add this?

Copy link
Contributor Author

@yuandrew yuandrew Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's set and used in SetMemoUserDCEncode(), so we can defer setting it back to its previous flag

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as well as configuring the default behavior for tests, so we can validate both the old and new behavior


func sdkFlagFromUint(value uint32) sdkFlag {
switch value {
case uint32(SDKFlagUnset):
Expand All @@ -50,6 +57,8 @@ func sdkFlagFromUint(value uint32) sdkFlag {
return SDKPriorityUpdateHandling
case uint32(SDKFlagBlockedSelectorSignalReceive):
return SDKFlagBlockedSelectorSignalReceive
case uint32(SDKFlagMemoUserDCEncode):
return SDKFlagMemoUserDCEncode
default:
return SDKFlagUnknown
}
Expand Down Expand Up @@ -130,3 +139,7 @@ func (sf *sdkFlags) gatherNewSDKFlags() []sdkFlag {
func SetUnblockSelectorSignal(unblockSignal bool) {
unblockSelectorSignal = unblockSignal
}

func SetMemoUserDCEncode(userEncode bool) {
memoUserDCEncode = userEncode
}
9 changes: 7 additions & 2 deletions internal/internal_schedule_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (w *workflowClientInterceptor) CreateSchedule(ctx context.Context, in *Sche
return nil, err
}

memo, err := getWorkflowMemo(in.Options.Memo, dataConverter)
memo, err := getWorkflowMemo(in.Options.Memo, dataConverter, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -875,11 +875,16 @@ func encodeScheduleWorkflowMemo(dc converter.DataConverter, input map[string]int
}

memo := make(map[string]*commonpb.Payload)
if dc == nil {
dc = converter.GetDefaultDataConverter()
}

useUserDC := shouldUseMemoUserDataConverter(nil)
for k, v := range input {
if enc, ok := v.(*commonpb.Payload); ok {
memo[k] = enc
} else {
memoBytes, err := converter.GetDefaultDataConverter().ToPayload(v)
memoBytes, err := encodeMemoValue(v, dc, useUserDC)
if err != nil {
return nil, fmt.Errorf("encode workflow memo error: %v", err.Error())
}
Expand Down
107 changes: 107 additions & 0 deletions internal/internal_schedule_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internal

import (
"context"
iconverter "go.temporal.io/sdk/internal/converter"
"testing"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -219,3 +220,109 @@ func (s *scheduleClientTestSuite) TestIteratorError() {
s.Nil(event)
s.NotNil(err)
}

func (s *scheduleClientTestSuite) TestCreateScheduleWorkflowMemoDataConverter() {
testFn := func() {
dc := iconverter.NewTestDataConverter()
s.client = NewServiceClient(s.service, nil, ClientOptions{DataConverter: dc})

memo := map[string]interface{}{
"testMemo": "memo value",
}
wf := func(ctx Context) string { panic("this is just a stub") }

options := ScheduleOptions{
ID: scheduleID,
Spec: ScheduleSpec{
CronExpressions: []string{"*"},
},
Action: &ScheduleWorkflowAction{
Workflow: wf,
ID: workflowID,
TaskQueue: taskqueue,
WorkflowExecutionTimeout: timeoutInSeconds,
WorkflowTaskTimeout: timeoutInSeconds,
Memo: memo,
},
}
createResp := &workflowservice.CreateScheduleResponse{}
s.service.EXPECT().CreateSchedule(gomock.Any(), gomock.Any(), gomock.Any()).Return(createResp, nil).
Do(func(_ interface{}, req *workflowservice.CreateScheduleRequest, _ ...interface{}) {
startWorkflow := req.Schedule.Action.GetStartWorkflow()
encoding := string(startWorkflow.Memo.Fields["testMemo"].Metadata[converter.MetadataEncoding])
if memoUserDCEncode {
s.Equal("binary/gob", encoding)
} else {
s.Equal("json/plain", encoding)
}
})

_, err := s.client.ScheduleClient().Create(context.Background(), options)
s.NoError(err)
}
s.T().Run("old behavior", func(t *testing.T) {
previousFlag := memoUserDCEncode
SetMemoUserDCEncode(false)
defer SetMemoUserDCEncode(previousFlag)
testFn()
})
s.T().Run("new behavior", func(t *testing.T) {
previousFlag := memoUserDCEncode
SetMemoUserDCEncode(true)
defer SetMemoUserDCEncode(previousFlag)
testFn()
})

}

func (s *scheduleClientTestSuite) TestCreateScheduleWorkflowMemoUserAndDefaultConverterFail() {
testFn := func() {
dc := failingMemoDataConverter{
delegate: converter.GetDefaultDataConverter(),
}
s.client = NewServiceClient(s.service, nil, ClientOptions{DataConverter: dc})

memo := map[string]interface{}{
"testMemo": make(chan int),
}
wf := func(ctx Context) string { panic("this is just a stub") }

options := ScheduleOptions{
ID: scheduleID,
Spec: ScheduleSpec{
CronExpressions: []string{"*"},
},
Action: &ScheduleWorkflowAction{
Workflow: wf,
ID: workflowID,
TaskQueue: taskqueue,
WorkflowExecutionTimeout: timeoutInSeconds,
WorkflowTaskTimeout: timeoutInSeconds,
Memo: memo,
},
}

s.service.EXPECT().CreateSchedule(gomock.Any(), gomock.Any(), gomock.Any()).Times(0)

_, err := s.client.ScheduleClient().Create(context.Background(), options)
s.Error(err)
if memoUserDCEncode {
s.ErrorContains(err, "failingMemoDataConverter memo encoding failed")
} else {
s.ErrorContains(err, "unsupported type: chan int")
}
}

s.T().Run("old behavior", func(t *testing.T) {
previousFlag := memoUserDCEncode
SetMemoUserDCEncode(false)
defer SetMemoUserDCEncode(previousFlag)
testFn()
})
s.T().Run("new behavior", func(t *testing.T) {
previousFlag := memoUserDCEncode
SetMemoUserDCEncode(true)
defer SetMemoUserDCEncode(previousFlag)
testFn()
})
}
55 changes: 49 additions & 6 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1629,15 +1629,58 @@ func (workflowRun *workflowRunImpl) follow(
return workflowRun.GetWithOptions(ctx, valuePtr, options)
}

func getWorkflowMemo(input map[string]interface{}, dc converter.DataConverter) (*commonpb.Memo, error) {
type memoFlagAccessor interface {
TryUse(flag sdkFlag) bool
GetFlag(flag sdkFlag) bool
}

func encodeMemoValue(value interface{}, dc converter.DataConverter, useUserDC bool) (*commonpb.Payload, error) {
if useUserDC {
payload, dcErr := dc.ToPayload(value)
if dcErr == nil {
return payload, nil
}

payload, err := converter.GetDefaultDataConverter().ToPayload(value)

// If fallback default data converter fails, return original user data converter error
if err != nil {
return nil, dcErr
}
return payload, nil
}
payload, err := converter.GetDefaultDataConverter().ToPayload(value)
if err != nil {
return nil, err
}
return payload, nil
}

func shouldUseMemoUserDataConverter(accessor memoFlagAccessor) bool {
if accessor == nil {
return memoUserDCEncode
}

if memoUserDCEncode {
return accessor.TryUse(SDKFlagMemoUserDCEncode)
}

return accessor.GetFlag(SDKFlagMemoUserDCEncode)
}

func getWorkflowMemo(input map[string]interface{}, dc converter.DataConverter, accessor memoFlagAccessor) (*commonpb.Memo, error) {
if input == nil {
return nil, nil
}

memo := make(map[string]*commonpb.Payload)
if dc == nil {
dc = converter.GetDefaultDataConverter()
}

memo := make(map[string]*commonpb.Payload, len(input))
useUserDC := shouldUseMemoUserDataConverter(accessor)
for k, v := range input {
// TODO (shtin): use dc here???
memoBytes, err := converter.GetDefaultDataConverter().ToPayload(v)
memoBytes, err := encodeMemoValue(v, dc, useUserDC)
if err != nil {
return nil, fmt.Errorf("encode workflow memo error: %v", err.Error())
}
Expand Down Expand Up @@ -1699,7 +1742,7 @@ func (w *workflowClientInterceptor) createStartWorkflowRequest(
return nil, err
}

memo, err := getWorkflowMemo(in.Options.Memo, dataConverter)
memo, err := getWorkflowMemo(in.Options.Memo, dataConverter, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2080,7 +2123,7 @@ func (w *workflowClientInterceptor) SignalWithStartWorkflow(
return nil, err
}

memo, err := getWorkflowMemo(in.Options.Memo, dataConverter)
memo, err := getWorkflowMemo(in.Options.Memo, dataConverter, nil)
if err != nil {
return nil, err
}
Expand Down
Loading
Loading