From 27fd027d399bfa563a7904e17653764df8284ac8 Mon Sep 17 00:00:00 2001 From: Diana Zawadzki Date: Tue, 2 Dec 2025 14:29:14 -0800 Subject: [PATCH 1/3] feat: Restructure activities and operations with generator pattern Signed-off-by: Diana Zawadzki --- new_samples/activities/README.md | 70 ++++++++++++ new_samples/activities/dynamic_workflow.go | 13 ++- new_samples/activities/generator/README.md | 23 ++++ .../activities/generator/README_specific.md | 32 ++++++ new_samples/activities/generator/generate.go | 17 +++ new_samples/activities/main.go | 20 ++++ .../parallel_pick_first_workflow.go | 71 ------------ new_samples/activities/worker.go | 101 +++++++++++++++++ new_samples/operations/README.md | 93 ++++++++++++++++ new_samples/operations/cancel_workflow.go | 39 ++++--- new_samples/operations/generator/README.md | 23 ++++ .../operations/generator/README_specific.md | 55 ++++++++++ new_samples/operations/generator/generate.go | 17 +++ new_samples/operations/main.go | 20 ++++ new_samples/operations/worker.go | 103 ++++++++++++++++++ 15 files changed, 610 insertions(+), 87 deletions(-) create mode 100644 new_samples/activities/README.md create mode 100644 new_samples/activities/generator/README.md create mode 100644 new_samples/activities/generator/README_specific.md create mode 100644 new_samples/activities/generator/generate.go create mode 100644 new_samples/activities/main.go delete mode 100644 new_samples/activities/parallel_pick_first_workflow.go create mode 100644 new_samples/activities/worker.go create mode 100644 new_samples/operations/README.md create mode 100644 new_samples/operations/generator/README.md create mode 100644 new_samples/operations/generator/README_specific.md create mode 100644 new_samples/operations/generator/generate.go create mode 100644 new_samples/operations/main.go create mode 100644 new_samples/operations/worker.go diff --git a/new_samples/activities/README.md b/new_samples/activities/README.md new file mode 100644 index 00000000..5fa5fcf1 --- /dev/null +++ b/new_samples/activities/README.md @@ -0,0 +1,70 @@ + + + +# Dynamic Activity Sample + +## Prerequisites + +0. Install Cadence CLI. See instruction [here](https://cadenceworkflow.io/docs/cli/). +1. Run the Cadence server: + 1. Clone the [Cadence](https://github.com/cadence-workflow/cadence) repository if you haven't done already: `git clone https://github.com/cadence-workflow/cadence.git` + 2. Run `docker compose -f docker/docker-compose.yml up` to start Cadence server + 3. See more details at https://github.com/uber/cadence/blob/master/README.md +2. Once everything is up and running in Docker, open [localhost:8088](localhost:8088) to view Cadence UI. +3. Register the `cadence-samples` domain: + +```bash +cadence --env development --domain cadence-samples domain register +``` + +Refresh the [domains page](http://localhost:8088/domains) from step 2 to verify `cadence-samples` is registered. + +## Steps to run sample + +Inside the folder this sample is defined, run the following command: + +```bash +go run . +``` + +This will call the main function in main.go which starts the worker, which will be execute the sample workflow code + +## Dynamic Activity Workflow + +This sample demonstrates invoking activities by string name rather than function reference. + +### Start the Workflow + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --tl cadence-samples-worker \ + --et 60 \ + --workflow_type cadence_samples.DynamicWorkflow \ + --input '{"message":"Cadence"}' +``` + +Verify that your workflow started and completed successfully. + +### Key Concept + +Instead of passing the function directly: +```go +workflow.ExecuteActivity(ctx, MyActivity, input) +``` + +Pass the activity name as a string: +```go +workflow.ExecuteActivity(ctx, "cadence_samples.DynamicGreetingActivity", input) +``` + +This is useful for plugin systems or configuration-driven workflows. + + +## References + +* The website: https://cadenceworkflow.io +* Cadence's server: https://github.com/uber/cadence +* Cadence's Go client: https://github.com/uber-go/cadence-client + diff --git a/new_samples/activities/dynamic_workflow.go b/new_samples/activities/dynamic_workflow.go index 35229180..36521a99 100644 --- a/new_samples/activities/dynamic_workflow.go +++ b/new_samples/activities/dynamic_workflow.go @@ -1,19 +1,25 @@ -package workflows +package main import ( "context" + "time" + "go.uber.org/cadence/activity" "go.uber.org/cadence/workflow" "go.uber.org/zap" - "time" ) +// DynamicGreetingActivityName is the registered name for the activity. +// This demonstrates how to invoke activities by string name rather than function reference. const DynamicGreetingActivityName = "cadence_samples.DynamicGreetingActivity" type dynamicWorkflowInput struct { Message string `json:"message"` } +// DynamicWorkflow demonstrates calling activities using string names for dynamic behavior. +// Instead of passing the function directly to ExecuteActivity, we pass the activity name. +// This is useful for plugin systems or configuration-driven workflows. func DynamicWorkflow(ctx workflow.Context, input dynamicWorkflowInput) (string, error) { ao := workflow.ActivityOptions{ ScheduleToStartTimeout: time.Minute, @@ -25,6 +31,7 @@ func DynamicWorkflow(ctx workflow.Context, input dynamicWorkflowInput) (string, logger.Info("DynamicWorkflow started") var greetingMsg string + // Note: We pass the activity NAME (string) instead of the function reference err := workflow.ExecuteActivity(ctx, DynamicGreetingActivityName, input.Message).Get(ctx, &greetingMsg) if err != nil { logger.Error("DynamicGreetingActivity failed", zap.Error(err)) @@ -35,8 +42,10 @@ func DynamicWorkflow(ctx workflow.Context, input dynamicWorkflowInput) (string, return greetingMsg, nil } +// DynamicGreetingActivity is a simple activity that returns a greeting message. func DynamicGreetingActivity(ctx context.Context, message string) (string, error) { logger := activity.GetLogger(ctx) logger.Info("DynamicGreetingActivity started.") return "Hello, " + message, nil } + diff --git a/new_samples/activities/generator/README.md b/new_samples/activities/generator/README.md new file mode 100644 index 00000000..1da35022 --- /dev/null +++ b/new_samples/activities/generator/README.md @@ -0,0 +1,23 @@ + + + +# Sample Generator + +This folder is NOT part of the actual sample. It exists only for contributors who work on this sample. Please disregard it if you are trying to learn about Cadence. + +To create a better learning experience for Cadence users, each sample folder is designed to be self contained. Users can view every part of writing and running workflows, including: + +* Cadence client initialization +* Worker with workflow and activity registrations +* Workflow starter +* and the workflow code itself + +Some samples may have more or fewer parts depending on what they need to demonstrate. + +In most cases, the workflow code (e.g. `workflow.go`) is the part that users care about. The rest is boilerplate needed to run that workflow. For each sample folder, the workflow code should be written by hand. The boilerplate can be generated. Keeping all parts inside one folder gives early learners more value because they can see everything together rather than jumping across directories. + +## Contributing + +* When creating a new sample, follow the steps mentioned in the README file in the main samples folder. +* To update the sample workflow code, edit the workflow file directly. +* To update the worker, client, or other boilerplate logic, edit the generator file. If your change applies to all samples, update the common generator file inside the `template` folder. Edit the generator file in this folder only when the change should affect this sample alone. diff --git a/new_samples/activities/generator/README_specific.md b/new_samples/activities/generator/README_specific.md new file mode 100644 index 00000000..7aa4c1cc --- /dev/null +++ b/new_samples/activities/generator/README_specific.md @@ -0,0 +1,32 @@ +## Dynamic Activity Workflow + +This sample demonstrates invoking activities by string name rather than function reference. + +### Start the Workflow + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --tl cadence-samples-worker \ + --et 60 \ + --workflow_type cadence_samples.DynamicWorkflow \ + --input '{"message":"Cadence"}' +``` + +Verify that your workflow started and completed successfully. + +### Key Concept + +Instead of passing the function directly: +```go +workflow.ExecuteActivity(ctx, MyActivity, input) +``` + +Pass the activity name as a string: +```go +workflow.ExecuteActivity(ctx, "cadence_samples.DynamicGreetingActivity", input) +``` + +This is useful for plugin systems or configuration-driven workflows. + diff --git a/new_samples/activities/generator/generate.go b/new_samples/activities/generator/generate.go new file mode 100644 index 00000000..d3aaf572 --- /dev/null +++ b/new_samples/activities/generator/generate.go @@ -0,0 +1,17 @@ +package main + +import "github.com/uber-common/cadence-samples/new_samples/template" + +func main() { + // Define the data for Dynamic Activity sample + data := template.TemplateData{ + SampleName: "Dynamic Activity", + Workflows: []string{"DynamicWorkflow"}, + Activities: []string{"DynamicGreetingActivity"}, + } + + template.GenerateAll(data) +} + +// Implement custom generator below + diff --git a/new_samples/activities/main.go b/new_samples/activities/main.go new file mode 100644 index 00000000..58939998 --- /dev/null +++ b/new_samples/activities/main.go @@ -0,0 +1,20 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +package main + +import ( + "fmt" + "os" + "os/signal" + "syscall" +) + +func main() { + StartWorker() + + done := make(chan os.Signal, 1) + signal.Notify(done, syscall.SIGINT) + fmt.Println("Cadence worker started, press ctrl+c to terminate...") + <-done +} diff --git a/new_samples/activities/parallel_pick_first_workflow.go b/new_samples/activities/parallel_pick_first_workflow.go deleted file mode 100644 index 02b4d857..00000000 --- a/new_samples/activities/parallel_pick_first_workflow.go +++ /dev/null @@ -1,71 +0,0 @@ -package workflows - -import ( - "context" - "go.uber.org/cadence/activity" - "go.uber.org/cadence/workflow" - "time" -) - -type parallelBranchInput struct { - Message string `json:"message"` -} - -// ParallelBranchPickFirstWorkflow is a sample workflow simulating two parallel activities running -// at the same time and picking the first successful result. -func ParallelBranchPickFirstWorkflow(ctx workflow.Context) (string, error) { - logger := workflow.GetLogger(ctx) - logger.Info("ParallelBranchPickFirstWorkflow started") - - selector := workflow.NewSelector(ctx) - var firstResp string - - // Use a cancel handler to cancel all rest of other activities. - childCtx, cancelHandler := workflow.WithCancel(ctx) - ao := workflow.ActivityOptions{ - ScheduleToStartTimeout: time.Minute, - StartToCloseTimeout: time.Minute, - HeartbeatTimeout: time.Second * 20, - WaitForCancellation: true, // wait for cancellation to complete - } - childCtx = workflow.WithActivityOptions(childCtx, ao) - - // Set WaitForCancellation to true to demonstrate the cancellation to the other activities. In real world case, - // you might not care about them and could set WaitForCancellation to false (which is default value). - - // Run two activities in parallel - f1 := workflow.ExecuteActivity(childCtx, ParallelActivity, parallelBranchInput{Message: "first activity"}, time.Second*10) - f2 := workflow.ExecuteActivity(childCtx, ParallelActivity, parallelBranchInput{Message: "second activity"}, time.Second*2) - pendingFutures := []workflow.Future{f1, f2} - selector.AddFuture(f1, func(f workflow.Future) { - f.Get(ctx, &firstResp) - }).AddFuture(f2, func(f workflow.Future) { - f.Get(ctx, &firstResp) - }) - - // wait for any of the future to complete - selector.Select(ctx) - - // now if at least one future is complete, cancel all other pending futures - cancelHandler() - - // - If you want to wait for pending activities to finish after issuing cancellation - // then wait for the future to complete. - // - if you don't want to wait for completion of pending activities cancellation then you can choose to - // set WaitForCancellation to false through WithWaitForCancellation(false) - for _, f := range pendingFutures { - err := f.Get(ctx, &firstResp) - if err != nil { - return "", err - } - } - - logger.Info("ParallelBranchPickFirstWorkflow completed") - return firstResp, nil -} - -func ParallelActivity(ctx context.Context, input parallelBranchInput) (string, error) { - logger := activity.GetLogger(ctx) - logger.Info("ParallelActivity started") - return "Hello " + input.Message, nil -} diff --git a/new_samples/activities/worker.go b/new_samples/activities/worker.go new file mode 100644 index 00000000..e84d9ecd --- /dev/null +++ b/new_samples/activities/worker.go @@ -0,0 +1,101 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +// Package worker implements a Cadence worker with basic configurations. +package main + +import ( + "github.com/uber-go/tally" + apiv1 "github.com/uber/cadence-idl/go/proto/api/v1" + "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/compatibility" + "go.uber.org/cadence/worker" + "go.uber.org/cadence/workflow" + "go.uber.org/yarpc" + "go.uber.org/yarpc/peer" + yarpchostport "go.uber.org/yarpc/peer/hostport" + "go.uber.org/yarpc/transport/grpc" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +const ( + HostPort = "127.0.0.1:7833" + Domain = "cadence-samples" + // TaskListName identifies set of client workflows, activities, and workers. + // It could be your group or client or application name. + TaskListName = "cadence-samples-worker" + ClientName = "cadence-samples-worker" + CadenceService = "cadence-frontend" +) + +// StartWorker creates and starts a basic Cadence worker. +func StartWorker() { + logger, cadenceClient := BuildLogger(), BuildCadenceClient() + workerOptions := worker.Options{ + Logger: logger, + MetricsScope: tally.NewTestScope(TaskListName, nil), + } + + w := worker.New( + cadenceClient, + Domain, + TaskListName, + workerOptions) + // HelloWorld workflow registration + w.RegisterWorkflowWithOptions(DynamicWorkflow, workflow.RegisterOptions{Name: "cadence_samples.DynamicWorkflow"}) + w.RegisterActivityWithOptions(DynamicGreetingActivity, activity.RegisterOptions{Name: "cadence_samples.DynamicGreetingActivity"}) + + err := w.Start() + if err != nil { + panic("Failed to start worker: " + err.Error()) + } + logger.Info("Started Worker.", zap.String("worker", TaskListName)) + +} + +func BuildCadenceClient(dialOptions ...grpc.DialOption) workflowserviceclient.Interface { + grpcTransport := grpc.NewTransport() + // Create a single peer chooser that identifies the host/port and configures + // a gRPC dialer with TLS credentials + myChooser := peer.NewSingle( + yarpchostport.Identify(HostPort), + grpcTransport.NewDialer(dialOptions...), + ) + outbound := grpcTransport.NewOutbound(myChooser) + + dispatcher := yarpc.NewDispatcher(yarpc.Config{ + Name: ClientName, + Outbounds: yarpc.Outbounds{ + CadenceService: {Unary: outbound}, + }, + }) + if err := dispatcher.Start(); err != nil { + panic("Failed to start dispatcher: " + err.Error()) + } + + clientConfig := dispatcher.ClientConfig(CadenceService) + + // Create a compatibility adapter that wraps proto-based YARPC clients + // to provide a unified interface for domain, workflow, worker, and visibility APIs + return compatibility.NewThrift2ProtoAdapter( + apiv1.NewDomainAPIYARPCClient(clientConfig), + apiv1.NewWorkflowAPIYARPCClient(clientConfig), + apiv1.NewWorkerAPIYARPCClient(clientConfig), + apiv1.NewVisibilityAPIYARPCClient(clientConfig), + ) +} + +func BuildLogger() *zap.Logger { + config := zap.NewDevelopmentConfig() + config.Level.SetLevel(zapcore.InfoLevel) + + var err error + logger, err := config.Build() + if err != nil { + panic("Failed to setup logger: " + err.Error()) + } + + return logger +} diff --git a/new_samples/operations/README.md b/new_samples/operations/README.md new file mode 100644 index 00000000..80f02907 --- /dev/null +++ b/new_samples/operations/README.md @@ -0,0 +1,93 @@ + + + +# Cancel Workflow Sample + +## Prerequisites + +0. Install Cadence CLI. See instruction [here](https://cadenceworkflow.io/docs/cli/). +1. Run the Cadence server: + 1. Clone the [Cadence](https://github.com/cadence-workflow/cadence) repository if you haven't done already: `git clone https://github.com/cadence-workflow/cadence.git` + 2. Run `docker compose -f docker/docker-compose.yml up` to start Cadence server + 3. See more details at https://github.com/uber/cadence/blob/master/README.md +2. Once everything is up and running in Docker, open [localhost:8088](localhost:8088) to view Cadence UI. +3. Register the `cadence-samples` domain: + +```bash +cadence --env development --domain cadence-samples domain register +``` + +Refresh the [domains page](http://localhost:8088/domains) from step 2 to verify `cadence-samples` is registered. + +## Steps to run sample + +Inside the folder this sample is defined, run the following command: + +```bash +go run . +``` + +This will call the main function in main.go which starts the worker, which will be execute the sample workflow code + +## Cancel Workflow Sample + +This sample demonstrates how to cancel a running workflow and perform cleanup operations. + +### Start the Workflow + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --tl cadence-samples-worker \ + --et 60 \ + --workflow_type cadence_samples.CancelWorkflow +``` + +The workflow will start an activity that heartbeats every second. + +### Cancel the Workflow + +In another terminal, cancel the workflow: + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow cancel \ + --wid +``` + +### What Happens + +1. `ActivityToBeCanceled` starts and heartbeats every second +2. When you cancel the workflow, the activity receives a cancellation signal +3. The workflow runs `CleanupActivity` in a disconnected context +4. `ActivityToBeSkipped` is never executed (skipped due to cancellation) + +### Key Concept: WaitForCancellation + +```go +ao := workflow.ActivityOptions{ + WaitForCancellation: true, // Wait for activity to acknowledge cancellation +} +``` + +When `WaitForCancellation` is true, Cadence waits for the activity to handle the cancellation before proceeding. + +### Key Concept: Disconnected Context + +```go +// When workflow is canceled, get a new disconnected context for cleanup +newCtx, _ := workflow.NewDisconnectedContext(ctx) +err := workflow.ExecuteActivity(newCtx, cleanupActivity).Get(ctx, nil) +``` + +A disconnected context allows cleanup activities to run even after the workflow is canceled. + + +## References + +* The website: https://cadenceworkflow.io +* Cadence's server: https://github.com/uber/cadence +* Cadence's Go client: https://github.com/uber-go/cadence-client + diff --git a/new_samples/operations/cancel_workflow.go b/new_samples/operations/cancel_workflow.go index 8dc9e646..846963e1 100644 --- a/new_samples/operations/cancel_workflow.go +++ b/new_samples/operations/cancel_workflow.go @@ -1,15 +1,21 @@ -package workflows +package main import ( "context" "fmt" + "time" + "go.uber.org/cadence" "go.uber.org/cadence/activity" "go.uber.org/cadence/workflow" "go.uber.org/zap" - "time" ) +// CancelWorkflow demonstrates cancellation handling in Cadence workflows. +// It shows how to: +// - Handle workflow cancellation +// - Use WaitForCancellation for graceful activity shutdown +// - Run cleanup activities using disconnected context func CancelWorkflow(ctx workflow.Context) (retError error) { ao := workflow.ActivityOptions{ ScheduleToStartTimeout: time.Minute, @@ -19,7 +25,7 @@ func CancelWorkflow(ctx workflow.Context) (retError error) { } ctx = workflow.WithActivityOptions(ctx, ao) logger := workflow.GetLogger(ctx) - logger.Info("cancel workflow started") + logger.Info("CancelWorkflow started") defer func() { if cadence.IsCanceledError(retError) { @@ -32,53 +38,58 @@ func CancelWorkflow(ctx workflow.Context) (retError error) { return } retError = nil - logger.Info("Workflow completed.") + logger.Info("Workflow completed with cleanup.") } }() var result string err := workflow.ExecuteActivity(ctx, ActivityToBeCanceled).Get(ctx, &result) if err != nil && !cadence.IsCanceledError(err) { - logger.Error("Error from activityToBeCanceled", zap.Error(err)) + logger.Error("Error from ActivityToBeCanceled", zap.Error(err)) return err } - logger.Info(fmt.Sprintf("activityToBeCanceled returns %v, %v", result, err)) + logger.Info(fmt.Sprintf("ActivityToBeCanceled returns %v, %v", result, err)) // Execute activity using a canceled ctx, - // activity won't be scheduled and a cancelled error will be returned + // activity won't be scheduled and a canceled error will be returned err = workflow.ExecuteActivity(ctx, ActivityToBeSkipped).Get(ctx, nil) if err != nil && !cadence.IsCanceledError(err) { - logger.Error("Error from activityToBeSkipped", zap.Error(err)) + logger.Error("Error from ActivityToBeSkipped", zap.Error(err)) } return err } +// ActivityToBeCanceled is an activity that heartbeats until canceled. +// To cancel: use CLI 'cadence --domain cadence-samples workflow cancel --wid ' func ActivityToBeCanceled(ctx context.Context) (string, error) { logger := activity.GetLogger(ctx) - logger.Info("activity started, to cancel workflow, use CLI: 'cadence --do default wf cancel -w ' to cancel") + logger.Info("ActivityToBeCanceled started - waiting for cancellation") + logger.Info("To cancel: cadence --env development --domain cadence-samples workflow cancel --wid ") + for { select { case <-time.After(1 * time.Second): - logger.Info("heart beating...") + logger.Info("heartbeating...") activity.RecordHeartbeat(ctx, "") case <-ctx.Done(): logger.Info("context is cancelled") - // returned canceled error here so that in workflow history we can see ActivityTaskCanceled event - // or if not cancelled, return timeout error return "I am canceled by Done", ctx.Err() } } } +// CleanupActivity runs after workflow cancellation to perform cleanup. func CleanupActivity(ctx context.Context) error { logger := activity.GetLogger(ctx) - logger.Info("cleanupActivity started") + logger.Info("CleanupActivity started - performing cleanup after cancellation") return nil } +// ActivityToBeSkipped demonstrates that activities are skipped when workflow is canceled. func ActivityToBeSkipped(ctx context.Context) error { logger := activity.GetLogger(ctx) - logger.Info("this activity will be skipped due to cancellation") + logger.Info("ActivityToBeSkipped - this should not run if workflow is canceled") return nil } + diff --git a/new_samples/operations/generator/README.md b/new_samples/operations/generator/README.md new file mode 100644 index 00000000..1da35022 --- /dev/null +++ b/new_samples/operations/generator/README.md @@ -0,0 +1,23 @@ + + + +# Sample Generator + +This folder is NOT part of the actual sample. It exists only for contributors who work on this sample. Please disregard it if you are trying to learn about Cadence. + +To create a better learning experience for Cadence users, each sample folder is designed to be self contained. Users can view every part of writing and running workflows, including: + +* Cadence client initialization +* Worker with workflow and activity registrations +* Workflow starter +* and the workflow code itself + +Some samples may have more or fewer parts depending on what they need to demonstrate. + +In most cases, the workflow code (e.g. `workflow.go`) is the part that users care about. The rest is boilerplate needed to run that workflow. For each sample folder, the workflow code should be written by hand. The boilerplate can be generated. Keeping all parts inside one folder gives early learners more value because they can see everything together rather than jumping across directories. + +## Contributing + +* When creating a new sample, follow the steps mentioned in the README file in the main samples folder. +* To update the sample workflow code, edit the workflow file directly. +* To update the worker, client, or other boilerplate logic, edit the generator file. If your change applies to all samples, update the common generator file inside the `template` folder. Edit the generator file in this folder only when the change should affect this sample alone. diff --git a/new_samples/operations/generator/README_specific.md b/new_samples/operations/generator/README_specific.md new file mode 100644 index 00000000..fe01e518 --- /dev/null +++ b/new_samples/operations/generator/README_specific.md @@ -0,0 +1,55 @@ +## Cancel Workflow Sample + +This sample demonstrates how to cancel a running workflow and perform cleanup operations. + +### Start the Workflow + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --tl cadence-samples-worker \ + --et 60 \ + --workflow_type cadence_samples.CancelWorkflow +``` + +The workflow will start an activity that heartbeats every second. + +### Cancel the Workflow + +In another terminal, cancel the workflow: + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow cancel \ + --wid +``` + +### What Happens + +1. `ActivityToBeCanceled` starts and heartbeats every second +2. When you cancel the workflow, the activity receives a cancellation signal +3. The workflow runs `CleanupActivity` in a disconnected context +4. `ActivityToBeSkipped` is never executed (skipped due to cancellation) + +### Key Concept: WaitForCancellation + +```go +ao := workflow.ActivityOptions{ + WaitForCancellation: true, // Wait for activity to acknowledge cancellation +} +``` + +When `WaitForCancellation` is true, Cadence waits for the activity to handle the cancellation before proceeding. + +### Key Concept: Disconnected Context + +```go +// When workflow is canceled, get a new disconnected context for cleanup +newCtx, _ := workflow.NewDisconnectedContext(ctx) +err := workflow.ExecuteActivity(newCtx, cleanupActivity).Get(ctx, nil) +``` + +A disconnected context allows cleanup activities to run even after the workflow is canceled. + diff --git a/new_samples/operations/generator/generate.go b/new_samples/operations/generator/generate.go new file mode 100644 index 00000000..1cb520b4 --- /dev/null +++ b/new_samples/operations/generator/generate.go @@ -0,0 +1,17 @@ +package main + +import "github.com/uber-common/cadence-samples/new_samples/template" + +func main() { + // Define the data for Cancel Workflow sample + data := template.TemplateData{ + SampleName: "Cancel Workflow", + Workflows: []string{"CancelWorkflow"}, + Activities: []string{"ActivityToBeCanceled", "ActivityToBeSkipped", "CleanupActivity"}, + } + + template.GenerateAll(data) +} + +// Implement custom generator below + diff --git a/new_samples/operations/main.go b/new_samples/operations/main.go new file mode 100644 index 00000000..58939998 --- /dev/null +++ b/new_samples/operations/main.go @@ -0,0 +1,20 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +package main + +import ( + "fmt" + "os" + "os/signal" + "syscall" +) + +func main() { + StartWorker() + + done := make(chan os.Signal, 1) + signal.Notify(done, syscall.SIGINT) + fmt.Println("Cadence worker started, press ctrl+c to terminate...") + <-done +} diff --git a/new_samples/operations/worker.go b/new_samples/operations/worker.go new file mode 100644 index 00000000..4a336d91 --- /dev/null +++ b/new_samples/operations/worker.go @@ -0,0 +1,103 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +// Package worker implements a Cadence worker with basic configurations. +package main + +import ( + "github.com/uber-go/tally" + apiv1 "github.com/uber/cadence-idl/go/proto/api/v1" + "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/compatibility" + "go.uber.org/cadence/worker" + "go.uber.org/cadence/workflow" + "go.uber.org/yarpc" + "go.uber.org/yarpc/peer" + yarpchostport "go.uber.org/yarpc/peer/hostport" + "go.uber.org/yarpc/transport/grpc" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +const ( + HostPort = "127.0.0.1:7833" + Domain = "cadence-samples" + // TaskListName identifies set of client workflows, activities, and workers. + // It could be your group or client or application name. + TaskListName = "cadence-samples-worker" + ClientName = "cadence-samples-worker" + CadenceService = "cadence-frontend" +) + +// StartWorker creates and starts a basic Cadence worker. +func StartWorker() { + logger, cadenceClient := BuildLogger(), BuildCadenceClient() + workerOptions := worker.Options{ + Logger: logger, + MetricsScope: tally.NewTestScope(TaskListName, nil), + } + + w := worker.New( + cadenceClient, + Domain, + TaskListName, + workerOptions) + // HelloWorld workflow registration + w.RegisterWorkflowWithOptions(CancelWorkflow, workflow.RegisterOptions{Name: "cadence_samples.CancelWorkflow"}) + w.RegisterActivityWithOptions(ActivityToBeCanceled, activity.RegisterOptions{Name: "cadence_samples.ActivityToBeCanceled"}) + w.RegisterActivityWithOptions(ActivityToBeSkipped, activity.RegisterOptions{Name: "cadence_samples.ActivityToBeSkipped"}) + w.RegisterActivityWithOptions(CleanupActivity, activity.RegisterOptions{Name: "cadence_samples.CleanupActivity"}) + + err := w.Start() + if err != nil { + panic("Failed to start worker: " + err.Error()) + } + logger.Info("Started Worker.", zap.String("worker", TaskListName)) + +} + +func BuildCadenceClient(dialOptions ...grpc.DialOption) workflowserviceclient.Interface { + grpcTransport := grpc.NewTransport() + // Create a single peer chooser that identifies the host/port and configures + // a gRPC dialer with TLS credentials + myChooser := peer.NewSingle( + yarpchostport.Identify(HostPort), + grpcTransport.NewDialer(dialOptions...), + ) + outbound := grpcTransport.NewOutbound(myChooser) + + dispatcher := yarpc.NewDispatcher(yarpc.Config{ + Name: ClientName, + Outbounds: yarpc.Outbounds{ + CadenceService: {Unary: outbound}, + }, + }) + if err := dispatcher.Start(); err != nil { + panic("Failed to start dispatcher: " + err.Error()) + } + + clientConfig := dispatcher.ClientConfig(CadenceService) + + // Create a compatibility adapter that wraps proto-based YARPC clients + // to provide a unified interface for domain, workflow, worker, and visibility APIs + return compatibility.NewThrift2ProtoAdapter( + apiv1.NewDomainAPIYARPCClient(clientConfig), + apiv1.NewWorkflowAPIYARPCClient(clientConfig), + apiv1.NewWorkerAPIYARPCClient(clientConfig), + apiv1.NewVisibilityAPIYARPCClient(clientConfig), + ) +} + +func BuildLogger() *zap.Logger { + config := zap.NewDevelopmentConfig() + config.Level.SetLevel(zapcore.InfoLevel) + + var err error + logger, err := config.Build() + if err != nil { + panic("Failed to setup logger: " + err.Error()) + } + + return logger +} From 04acfd826b2130f316dd775e0a05e0a53c0c1a18 Mon Sep 17 00:00:00 2001 From: Diana Zawadzki Date: Tue, 2 Dec 2025 15:39:26 -0800 Subject: [PATCH 2/3] feat: Migrate 5 recipe samples to new_samples with generator pattern - greetings: Sequential activity execution - timer: Timeout and delayed notifications - branch: Parallel activity execution - choice: Conditional execution based on results - pickfirst: Race condition handling Signed-off-by: Diana Zawadzki --- new_samples/branch/README.md | 143 ++++++++++++++++++ new_samples/branch/branch_workflow.go | 112 ++++++++++++++ new_samples/branch/generator/README.md | 23 +++ .../branch/generator/README_specific.md | 105 +++++++++++++ new_samples/branch/generator/generate.go | 14 ++ new_samples/branch/main.go | 20 +++ new_samples/branch/worker.go | 102 +++++++++++++ new_samples/choice/README.md | 107 +++++++++++++ new_samples/choice/choice_workflow.go | 88 +++++++++++ new_samples/choice/generator/README.md | 23 +++ .../choice/generator/README_specific.md | 69 +++++++++ new_samples/choice/generator/generate.go | 14 ++ new_samples/choice/main.go | 20 +++ new_samples/choice/worker.go | 104 +++++++++++++ new_samples/greetings/README.md | 84 ++++++++++ new_samples/greetings/generator/README.md | 23 +++ .../greetings/generator/README_specific.md | 46 ++++++ new_samples/greetings/generator/generate.go | 14 ++ new_samples/greetings/greetings_workflow.go | 69 +++++++++ new_samples/greetings/main.go | 20 +++ new_samples/greetings/worker.go | 103 +++++++++++++ new_samples/pickfirst/README.md | 127 ++++++++++++++++ new_samples/pickfirst/generator/README.md | 23 +++ .../pickfirst/generator/README_specific.md | 89 +++++++++++ new_samples/pickfirst/generator/generate.go | 14 ++ new_samples/pickfirst/main.go | 20 +++ new_samples/pickfirst/pickfirst_workflow.go | 90 +++++++++++ new_samples/pickfirst/worker.go | 101 +++++++++++++ new_samples/timer/README.md | 106 +++++++++++++ new_samples/timer/generator/README.md | 23 +++ .../timer/generator/README_specific.md | 68 +++++++++ new_samples/timer/generator/generate.go | 14 ++ new_samples/timer/main.go | 20 +++ new_samples/timer/timer_workflow.go | 84 ++++++++++ new_samples/timer/worker.go | 102 +++++++++++++ 35 files changed, 2184 insertions(+) create mode 100644 new_samples/branch/README.md create mode 100644 new_samples/branch/branch_workflow.go create mode 100644 new_samples/branch/generator/README.md create mode 100644 new_samples/branch/generator/README_specific.md create mode 100644 new_samples/branch/generator/generate.go create mode 100644 new_samples/branch/main.go create mode 100644 new_samples/branch/worker.go create mode 100644 new_samples/choice/README.md create mode 100644 new_samples/choice/choice_workflow.go create mode 100644 new_samples/choice/generator/README.md create mode 100644 new_samples/choice/generator/README_specific.md create mode 100644 new_samples/choice/generator/generate.go create mode 100644 new_samples/choice/main.go create mode 100644 new_samples/choice/worker.go create mode 100644 new_samples/greetings/README.md create mode 100644 new_samples/greetings/generator/README.md create mode 100644 new_samples/greetings/generator/README_specific.md create mode 100644 new_samples/greetings/generator/generate.go create mode 100644 new_samples/greetings/greetings_workflow.go create mode 100644 new_samples/greetings/main.go create mode 100644 new_samples/greetings/worker.go create mode 100644 new_samples/pickfirst/README.md create mode 100644 new_samples/pickfirst/generator/README.md create mode 100644 new_samples/pickfirst/generator/README_specific.md create mode 100644 new_samples/pickfirst/generator/generate.go create mode 100644 new_samples/pickfirst/main.go create mode 100644 new_samples/pickfirst/pickfirst_workflow.go create mode 100644 new_samples/pickfirst/worker.go create mode 100644 new_samples/timer/README.md create mode 100644 new_samples/timer/generator/README.md create mode 100644 new_samples/timer/generator/README_specific.md create mode 100644 new_samples/timer/generator/generate.go create mode 100644 new_samples/timer/main.go create mode 100644 new_samples/timer/timer_workflow.go create mode 100644 new_samples/timer/worker.go diff --git a/new_samples/branch/README.md b/new_samples/branch/README.md new file mode 100644 index 00000000..337fb0e6 --- /dev/null +++ b/new_samples/branch/README.md @@ -0,0 +1,143 @@ + + + +# Branch Sample + +## Prerequisites + +0. Install Cadence CLI. See instruction [here](https://cadenceworkflow.io/docs/cli/). +1. Run the Cadence server: + 1. Clone the [Cadence](https://github.com/cadence-workflow/cadence) repository if you haven't done already: `git clone https://github.com/cadence-workflow/cadence.git` + 2. Run `docker compose -f docker/docker-compose.yml up` to start Cadence server + 3. See more details at https://github.com/uber/cadence/blob/master/README.md +2. Once everything is up and running in Docker, open [localhost:8088](localhost:8088) to view Cadence UI. +3. Register the `cadence-samples` domain: + +```bash +cadence --env development --domain cadence-samples domain register +``` + +Refresh the [domains page](http://localhost:8088/domains) from step 2 to verify `cadence-samples` is registered. + +## Steps to run sample + +Inside the folder this sample is defined, run the following command: + +```bash +go run . +``` + +This will call the main function in main.go which starts the worker, which will be execute the sample workflow code + +## Branch Workflow + +This sample demonstrates **parallel activity execution** - running multiple activities concurrently. + +### Start Branch Workflow + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --tl cadence-samples-worker \ + --et 60 \ + --workflow_type cadence_samples.BranchWorkflow +``` + +### Start Parallel Workflow + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --tl cadence-samples-worker \ + --et 60 \ + --workflow_type cadence_samples.ParallelWorkflow +``` + +### What Happens + +**BranchWorkflow** - Executes activities in parallel and waits for all: + +``` + ┌─────────────────┐ + │ BranchWorkflow │ + └────────┬────────┘ + │ + ┌─────────────┼─────────────┐ + ▼ ▼ ▼ +┌───────┐ ┌───────┐ ┌───────┐ +│Branch1│ │Branch2│ │Branch3│ +└───┬───┘ └───┬───┘ └───┬───┘ + │ │ │ + └─────────────┼───────────┘ + ▼ + Wait for all to complete +``` + +**ParallelWorkflow** - Uses `workflow.Go()` for coroutines: + +``` + ┌──────────────────┐ + │ ParallelWorkflow │ + └────────┬─────────┘ + │ + ┌──────────┴──────────┐ + ▼ ▼ + workflow.Go() workflow.Go() + │ │ + ┌────┴────┐ ┌───┴───┐ + ▼ ▼ ▼ +branch1.1 branch1.2 branch2 + │ │ │ + └────┬────┘ │ + └────────┬────────┘ + ▼ + Wait for both coroutines +``` + +### Key Concept: Parallel with Futures + +```go +var futures []workflow.Future +for i := 1; i <= totalBranches; i++ { + future := workflow.ExecuteActivity(ctx, BranchActivity, input) + futures = append(futures, future) +} +// Wait for all +for _, future := range futures { + future.Get(ctx, nil) +} +``` + +### Key Concept: Parallel with workflow.Go() + +```go +waitChannel := workflow.NewChannel(ctx) + +workflow.Go(ctx, func(ctx workflow.Context) { + // Run activities sequentially in this branch + workflow.ExecuteActivity(ctx, activity1).Get(ctx, nil) + workflow.ExecuteActivity(ctx, activity2).Get(ctx, nil) + waitChannel.Send(ctx, "done") +}) + +workflow.Go(ctx, func(ctx workflow.Context) { + // Run in parallel + workflow.ExecuteActivity(ctx, activity3).Get(ctx, nil) + waitChannel.Send(ctx, "done") +}) + +// Wait for both coroutines +for i := 0; i < 2; i++ { + waitChannel.Receive(ctx, nil) +} +``` + + +## References + +* The website: https://cadenceworkflow.io +* Cadence's server: https://github.com/uber/cadence +* Cadence's Go client: https://github.com/uber-go/cadence-client + diff --git a/new_samples/branch/branch_workflow.go b/new_samples/branch/branch_workflow.go new file mode 100644 index 00000000..a24c7cb9 --- /dev/null +++ b/new_samples/branch/branch_workflow.go @@ -0,0 +1,112 @@ +package main + +import ( + "errors" + "fmt" + "time" + + "go.uber.org/cadence/workflow" + "go.uber.org/zap" +) + +const totalBranches = 3 + +// BranchWorkflow demonstrates executing multiple activities in parallel using Futures. +// All branches run concurrently and we wait for all to complete. +func BranchWorkflow(ctx workflow.Context) error { + ao := workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: time.Minute, + HeartbeatTimeout: time.Second * 20, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + logger := workflow.GetLogger(ctx) + logger.Info("BranchWorkflow started") + + // Start all activities in parallel + var futures []workflow.Future + for i := 1; i <= totalBranches; i++ { + activityInput := fmt.Sprintf("branch %d of %d", i, totalBranches) + future := workflow.ExecuteActivity(ctx, BranchActivity, activityInput) + futures = append(futures, future) + } + + // Wait for all futures to complete + for i, future := range futures { + var result string + if err := future.Get(ctx, &result); err != nil { + logger.Error("Branch failed", zap.Int("branch", i+1), zap.Error(err)) + return err + } + logger.Info("Branch completed", zap.Int("branch", i+1), zap.String("result", result)) + } + + logger.Info("BranchWorkflow completed - all branches finished") + return nil +} + +// ParallelWorkflow demonstrates using workflow.Go() to run coroutines in parallel. +// Each coroutine can run multiple sequential activities. +func ParallelWorkflow(ctx workflow.Context) error { + ao := workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: time.Minute, + HeartbeatTimeout: time.Second * 20, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + logger := workflow.GetLogger(ctx) + logger.Info("ParallelWorkflow started") + + waitChannel := workflow.NewChannel(ctx) + + // First coroutine: runs two activities sequentially + workflow.Go(ctx, func(ctx workflow.Context) { + err := workflow.ExecuteActivity(ctx, BranchActivity, "branch1.1").Get(ctx, nil) + if err != nil { + logger.Error("Activity failed", zap.Error(err)) + waitChannel.Send(ctx, err.Error()) + return + } + err = workflow.ExecuteActivity(ctx, BranchActivity, "branch1.2").Get(ctx, nil) + if err != nil { + logger.Error("Activity failed", zap.Error(err)) + waitChannel.Send(ctx, err.Error()) + return + } + waitChannel.Send(ctx, "") + }) + + // Second coroutine: runs one activity + workflow.Go(ctx, func(ctx workflow.Context) { + err := workflow.ExecuteActivity(ctx, BranchActivity, "branch2").Get(ctx, nil) + if err != nil { + logger.Error("Activity failed", zap.Error(err)) + waitChannel.Send(ctx, err.Error()) + return + } + waitChannel.Send(ctx, "") + }) + + // Wait for both coroutines to complete + var errMsg string + for i := 0; i < 2; i++ { + waitChannel.Receive(ctx, &errMsg) + if errMsg != "" { + err := errors.New(errMsg) + logger.Error("Coroutine failed", zap.Error(err)) + return err + } + } + + logger.Info("ParallelWorkflow completed") + return nil +} + +// BranchActivity is a simple activity that logs and returns a result. +func BranchActivity(input string) (string, error) { + fmt.Printf("BranchActivity running with input: %s\n", input) + return "Result_" + input, nil +} + diff --git a/new_samples/branch/generator/README.md b/new_samples/branch/generator/README.md new file mode 100644 index 00000000..1da35022 --- /dev/null +++ b/new_samples/branch/generator/README.md @@ -0,0 +1,23 @@ + + + +# Sample Generator + +This folder is NOT part of the actual sample. It exists only for contributors who work on this sample. Please disregard it if you are trying to learn about Cadence. + +To create a better learning experience for Cadence users, each sample folder is designed to be self contained. Users can view every part of writing and running workflows, including: + +* Cadence client initialization +* Worker with workflow and activity registrations +* Workflow starter +* and the workflow code itself + +Some samples may have more or fewer parts depending on what they need to demonstrate. + +In most cases, the workflow code (e.g. `workflow.go`) is the part that users care about. The rest is boilerplate needed to run that workflow. For each sample folder, the workflow code should be written by hand. The boilerplate can be generated. Keeping all parts inside one folder gives early learners more value because they can see everything together rather than jumping across directories. + +## Contributing + +* When creating a new sample, follow the steps mentioned in the README file in the main samples folder. +* To update the sample workflow code, edit the workflow file directly. +* To update the worker, client, or other boilerplate logic, edit the generator file. If your change applies to all samples, update the common generator file inside the `template` folder. Edit the generator file in this folder only when the change should affect this sample alone. diff --git a/new_samples/branch/generator/README_specific.md b/new_samples/branch/generator/README_specific.md new file mode 100644 index 00000000..10acdf04 --- /dev/null +++ b/new_samples/branch/generator/README_specific.md @@ -0,0 +1,105 @@ +## Branch Workflow + +This sample demonstrates **parallel activity execution** - running multiple activities concurrently. + +### Start Branch Workflow + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --tl cadence-samples-worker \ + --et 60 \ + --workflow_type cadence_samples.BranchWorkflow +``` + +### Start Parallel Workflow + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --tl cadence-samples-worker \ + --et 60 \ + --workflow_type cadence_samples.ParallelWorkflow +``` + +### What Happens + +**BranchWorkflow** - Executes activities in parallel and waits for all: + +``` + ┌─────────────────┐ + │ BranchWorkflow │ + └────────┬────────┘ + │ + ┌─────────────┼─────────────┐ + ▼ ▼ ▼ +┌───────┐ ┌───────┐ ┌───────┐ +│Branch1│ │Branch2│ │Branch3│ +└───┬───┘ └───┬───┘ └───┬───┘ + │ │ │ + └─────────────┼───────────┘ + ▼ + Wait for all to complete +``` + +**ParallelWorkflow** - Uses `workflow.Go()` for coroutines: + +``` + ┌──────────────────┐ + │ ParallelWorkflow │ + └────────┬─────────┘ + │ + ┌──────────┴──────────┐ + ▼ ▼ + workflow.Go() workflow.Go() + │ │ + ┌────┴────┐ ┌───┴───┐ + ▼ ▼ ▼ +branch1.1 branch1.2 branch2 + │ │ │ + └────┬────┘ │ + └────────┬────────┘ + ▼ + Wait for both coroutines +``` + +### Key Concept: Parallel with Futures + +```go +var futures []workflow.Future +for i := 1; i <= totalBranches; i++ { + future := workflow.ExecuteActivity(ctx, BranchActivity, input) + futures = append(futures, future) +} +// Wait for all +for _, future := range futures { + future.Get(ctx, nil) +} +``` + +### Key Concept: Parallel with workflow.Go() + +```go +waitChannel := workflow.NewChannel(ctx) + +workflow.Go(ctx, func(ctx workflow.Context) { + // Run activities sequentially in this branch + workflow.ExecuteActivity(ctx, activity1).Get(ctx, nil) + workflow.ExecuteActivity(ctx, activity2).Get(ctx, nil) + waitChannel.Send(ctx, "done") +}) + +workflow.Go(ctx, func(ctx workflow.Context) { + // Run in parallel + workflow.ExecuteActivity(ctx, activity3).Get(ctx, nil) + waitChannel.Send(ctx, "done") +}) + +// Wait for both coroutines +for i := 0; i < 2; i++ { + waitChannel.Receive(ctx, nil) +} +``` + diff --git a/new_samples/branch/generator/generate.go b/new_samples/branch/generator/generate.go new file mode 100644 index 00000000..1ca15212 --- /dev/null +++ b/new_samples/branch/generator/generate.go @@ -0,0 +1,14 @@ +package main + +import "github.com/uber-common/cadence-samples/new_samples/template" + +func main() { + data := template.TemplateData{ + SampleName: "Branch", + Workflows: []string{"BranchWorkflow", "ParallelWorkflow"}, + Activities: []string{"BranchActivity"}, + } + + template.GenerateAll(data) +} + diff --git a/new_samples/branch/main.go b/new_samples/branch/main.go new file mode 100644 index 00000000..58939998 --- /dev/null +++ b/new_samples/branch/main.go @@ -0,0 +1,20 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +package main + +import ( + "fmt" + "os" + "os/signal" + "syscall" +) + +func main() { + StartWorker() + + done := make(chan os.Signal, 1) + signal.Notify(done, syscall.SIGINT) + fmt.Println("Cadence worker started, press ctrl+c to terminate...") + <-done +} diff --git a/new_samples/branch/worker.go b/new_samples/branch/worker.go new file mode 100644 index 00000000..44d6ba08 --- /dev/null +++ b/new_samples/branch/worker.go @@ -0,0 +1,102 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +// Package worker implements a Cadence worker with basic configurations. +package main + +import ( + "github.com/uber-go/tally" + apiv1 "github.com/uber/cadence-idl/go/proto/api/v1" + "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/compatibility" + "go.uber.org/cadence/worker" + "go.uber.org/cadence/workflow" + "go.uber.org/yarpc" + "go.uber.org/yarpc/peer" + yarpchostport "go.uber.org/yarpc/peer/hostport" + "go.uber.org/yarpc/transport/grpc" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +const ( + HostPort = "127.0.0.1:7833" + Domain = "cadence-samples" + // TaskListName identifies set of client workflows, activities, and workers. + // It could be your group or client or application name. + TaskListName = "cadence-samples-worker" + ClientName = "cadence-samples-worker" + CadenceService = "cadence-frontend" +) + +// StartWorker creates and starts a basic Cadence worker. +func StartWorker() { + logger, cadenceClient := BuildLogger(), BuildCadenceClient() + workerOptions := worker.Options{ + Logger: logger, + MetricsScope: tally.NewTestScope(TaskListName, nil), + } + + w := worker.New( + cadenceClient, + Domain, + TaskListName, + workerOptions) + // HelloWorld workflow registration + w.RegisterWorkflowWithOptions(BranchWorkflow, workflow.RegisterOptions{Name: "cadence_samples.BranchWorkflow"}) + w.RegisterWorkflowWithOptions(ParallelWorkflow, workflow.RegisterOptions{Name: "cadence_samples.ParallelWorkflow"}) + w.RegisterActivityWithOptions(BranchActivity, activity.RegisterOptions{Name: "cadence_samples.BranchActivity"}) + + err := w.Start() + if err != nil { + panic("Failed to start worker: " + err.Error()) + } + logger.Info("Started Worker.", zap.String("worker", TaskListName)) + +} + +func BuildCadenceClient(dialOptions ...grpc.DialOption) workflowserviceclient.Interface { + grpcTransport := grpc.NewTransport() + // Create a single peer chooser that identifies the host/port and configures + // a gRPC dialer with TLS credentials + myChooser := peer.NewSingle( + yarpchostport.Identify(HostPort), + grpcTransport.NewDialer(dialOptions...), + ) + outbound := grpcTransport.NewOutbound(myChooser) + + dispatcher := yarpc.NewDispatcher(yarpc.Config{ + Name: ClientName, + Outbounds: yarpc.Outbounds{ + CadenceService: {Unary: outbound}, + }, + }) + if err := dispatcher.Start(); err != nil { + panic("Failed to start dispatcher: " + err.Error()) + } + + clientConfig := dispatcher.ClientConfig(CadenceService) + + // Create a compatibility adapter that wraps proto-based YARPC clients + // to provide a unified interface for domain, workflow, worker, and visibility APIs + return compatibility.NewThrift2ProtoAdapter( + apiv1.NewDomainAPIYARPCClient(clientConfig), + apiv1.NewWorkflowAPIYARPCClient(clientConfig), + apiv1.NewWorkerAPIYARPCClient(clientConfig), + apiv1.NewVisibilityAPIYARPCClient(clientConfig), + ) +} + +func BuildLogger() *zap.Logger { + config := zap.NewDevelopmentConfig() + config.Level.SetLevel(zapcore.InfoLevel) + + var err error + logger, err := config.Build() + if err != nil { + panic("Failed to setup logger: " + err.Error()) + } + + return logger +} diff --git a/new_samples/choice/README.md b/new_samples/choice/README.md new file mode 100644 index 00000000..351975f9 --- /dev/null +++ b/new_samples/choice/README.md @@ -0,0 +1,107 @@ + + + +# Choice Sample + +## Prerequisites + +0. Install Cadence CLI. See instruction [here](https://cadenceworkflow.io/docs/cli/). +1. Run the Cadence server: + 1. Clone the [Cadence](https://github.com/cadence-workflow/cadence) repository if you haven't done already: `git clone https://github.com/cadence-workflow/cadence.git` + 2. Run `docker compose -f docker/docker-compose.yml up` to start Cadence server + 3. See more details at https://github.com/uber/cadence/blob/master/README.md +2. Once everything is up and running in Docker, open [localhost:8088](localhost:8088) to view Cadence UI. +3. Register the `cadence-samples` domain: + +```bash +cadence --env development --domain cadence-samples domain register +``` + +Refresh the [domains page](http://localhost:8088/domains) from step 2 to verify `cadence-samples` is registered. + +## Steps to run sample + +Inside the folder this sample is defined, run the following command: + +```bash +go run . +``` + +This will call the main function in main.go which starts the worker, which will be execute the sample workflow code + +## Choice Workflow + +This sample demonstrates **conditional execution** - running different activities based on the result of a previous activity. + +### Start the Workflow + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --tl cadence-samples-worker \ + --et 60 \ + --workflow_type cadence_samples.ChoiceWorkflow +``` + +### What Happens + +``` + ┌─────────────────┐ + │ ChoiceWorkflow │ + └────────┬────────┘ + │ + ▼ + ┌─────────────────┐ + │ GetOrderActivity│ + │ (returns random │ + │ fruit order) │ + └────────┬────────┘ + │ + ┌────────────┼────────────┬────────────┐ + │ │ │ │ + ▼ ▼ ▼ ▼ + "apple" "banana" "cherry" (other) + │ │ │ │ + ▼ ▼ ▼ ▼ +┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────┐ +│ProcessA │ │ProcessB │ │ProcessC │ │Error│ +│pple │ │anana │ │herry │ │ │ +└─────────┘ └─────────┘ └─────────┘ └─────┘ +``` + +1. `GetOrderActivity` returns a random fruit (apple, banana, or cherry) +2. Based on the result, the workflow executes the corresponding activity +3. Only one processing activity runs (exclusive choice) + +### Key Concept: Conditional Branching + +```go +var orderChoice string +err := workflow.ExecuteActivity(ctx, GetOrderActivity).Get(ctx, &orderChoice) + +switch orderChoice { +case "apple": + workflow.ExecuteActivity(ctx, ProcessAppleActivity, orderChoice) +case "banana": + workflow.ExecuteActivity(ctx, ProcessBananaActivity, orderChoice) +case "cherry": + workflow.ExecuteActivity(ctx, ProcessCherryActivity, orderChoice) +default: + return errors.New("unknown order type") +} +``` + +### Real-World Use Cases + +- Order routing based on product type +- User authentication with different providers +- Document processing based on file type + + +## References + +* The website: https://cadenceworkflow.io +* Cadence's server: https://github.com/uber/cadence +* Cadence's Go client: https://github.com/uber-go/cadence-client + diff --git a/new_samples/choice/choice_workflow.go b/new_samples/choice/choice_workflow.go new file mode 100644 index 00000000..ec4d57f1 --- /dev/null +++ b/new_samples/choice/choice_workflow.go @@ -0,0 +1,88 @@ +package main + +import ( + "errors" + "fmt" + "math/rand" + "time" + + "go.uber.org/cadence/workflow" + "go.uber.org/zap" +) + +const ( + orderChoiceApple = "apple" + orderChoiceBanana = "banana" + orderChoiceCherry = "cherry" +) + +var orderChoices = []string{orderChoiceApple, orderChoiceBanana, orderChoiceCherry} + +// ChoiceWorkflow demonstrates conditional execution based on activity results. +// It executes different activities depending on the order type returned. +func ChoiceWorkflow(ctx workflow.Context) error { + ao := workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: time.Minute, + HeartbeatTimeout: time.Second * 20, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + logger := workflow.GetLogger(ctx) + logger.Info("ChoiceWorkflow started") + + // Get the order type + var orderChoice string + err := workflow.ExecuteActivity(ctx, GetOrderActivity).Get(ctx, &orderChoice) + if err != nil { + return err + } + logger.Info("Order received", zap.String("choice", orderChoice)) + + // Execute different activity based on order type + switch orderChoice { + case orderChoiceApple: + err = workflow.ExecuteActivity(ctx, ProcessAppleActivity, orderChoice).Get(ctx, nil) + case orderChoiceBanana: + err = workflow.ExecuteActivity(ctx, ProcessBananaActivity, orderChoice).Get(ctx, nil) + case orderChoiceCherry: + err = workflow.ExecuteActivity(ctx, ProcessCherryActivity, orderChoice).Get(ctx, nil) + default: + logger.Error("Unexpected order", zap.String("choice", orderChoice)) + return errors.New("unknown order type: " + orderChoice) + } + + if err != nil { + return err + } + + logger.Info("ChoiceWorkflow completed") + return nil +} + +// GetOrderActivity returns a random order type. +func GetOrderActivity() (string, error) { + idx := rand.Intn(len(orderChoices)) + order := orderChoices[idx] + fmt.Printf("GetOrderActivity: Order is for %s\n", order) + return order, nil +} + +// ProcessAppleActivity handles apple orders. +func ProcessAppleActivity(choice string) error { + fmt.Printf("ProcessAppleActivity: Processing %s order\n", choice) + return nil +} + +// ProcessBananaActivity handles banana orders. +func ProcessBananaActivity(choice string) error { + fmt.Printf("ProcessBananaActivity: Processing %s order\n", choice) + return nil +} + +// ProcessCherryActivity handles cherry orders. +func ProcessCherryActivity(choice string) error { + fmt.Printf("ProcessCherryActivity: Processing %s order\n", choice) + return nil +} + diff --git a/new_samples/choice/generator/README.md b/new_samples/choice/generator/README.md new file mode 100644 index 00000000..1da35022 --- /dev/null +++ b/new_samples/choice/generator/README.md @@ -0,0 +1,23 @@ + + + +# Sample Generator + +This folder is NOT part of the actual sample. It exists only for contributors who work on this sample. Please disregard it if you are trying to learn about Cadence. + +To create a better learning experience for Cadence users, each sample folder is designed to be self contained. Users can view every part of writing and running workflows, including: + +* Cadence client initialization +* Worker with workflow and activity registrations +* Workflow starter +* and the workflow code itself + +Some samples may have more or fewer parts depending on what they need to demonstrate. + +In most cases, the workflow code (e.g. `workflow.go`) is the part that users care about. The rest is boilerplate needed to run that workflow. For each sample folder, the workflow code should be written by hand. The boilerplate can be generated. Keeping all parts inside one folder gives early learners more value because they can see everything together rather than jumping across directories. + +## Contributing + +* When creating a new sample, follow the steps mentioned in the README file in the main samples folder. +* To update the sample workflow code, edit the workflow file directly. +* To update the worker, client, or other boilerplate logic, edit the generator file. If your change applies to all samples, update the common generator file inside the `template` folder. Edit the generator file in this folder only when the change should affect this sample alone. diff --git a/new_samples/choice/generator/README_specific.md b/new_samples/choice/generator/README_specific.md new file mode 100644 index 00000000..cb621a4a --- /dev/null +++ b/new_samples/choice/generator/README_specific.md @@ -0,0 +1,69 @@ +## Choice Workflow + +This sample demonstrates **conditional execution** - running different activities based on the result of a previous activity. + +### Start the Workflow + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --tl cadence-samples-worker \ + --et 60 \ + --workflow_type cadence_samples.ChoiceWorkflow +``` + +### What Happens + +``` + ┌─────────────────┐ + │ ChoiceWorkflow │ + └────────┬────────┘ + │ + ▼ + ┌─────────────────┐ + │ GetOrderActivity│ + │ (returns random │ + │ fruit order) │ + └────────┬────────┘ + │ + ┌────────────┼────────────┬────────────┐ + │ │ │ │ + ▼ ▼ ▼ ▼ + "apple" "banana" "cherry" (other) + │ │ │ │ + ▼ ▼ ▼ ▼ +┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────┐ +│ProcessA │ │ProcessB │ │ProcessC │ │Error│ +│pple │ │anana │ │herry │ │ │ +└─────────┘ └─────────┘ └─────────┘ └─────┘ +``` + +1. `GetOrderActivity` returns a random fruit (apple, banana, or cherry) +2. Based on the result, the workflow executes the corresponding activity +3. Only one processing activity runs (exclusive choice) + +### Key Concept: Conditional Branching + +```go +var orderChoice string +err := workflow.ExecuteActivity(ctx, GetOrderActivity).Get(ctx, &orderChoice) + +switch orderChoice { +case "apple": + workflow.ExecuteActivity(ctx, ProcessAppleActivity, orderChoice) +case "banana": + workflow.ExecuteActivity(ctx, ProcessBananaActivity, orderChoice) +case "cherry": + workflow.ExecuteActivity(ctx, ProcessCherryActivity, orderChoice) +default: + return errors.New("unknown order type") +} +``` + +### Real-World Use Cases + +- Order routing based on product type +- User authentication with different providers +- Document processing based on file type + diff --git a/new_samples/choice/generator/generate.go b/new_samples/choice/generator/generate.go new file mode 100644 index 00000000..9cb75e7e --- /dev/null +++ b/new_samples/choice/generator/generate.go @@ -0,0 +1,14 @@ +package main + +import "github.com/uber-common/cadence-samples/new_samples/template" + +func main() { + data := template.TemplateData{ + SampleName: "Choice", + Workflows: []string{"ChoiceWorkflow"}, + Activities: []string{"GetOrderActivity", "ProcessAppleActivity", "ProcessBananaActivity", "ProcessCherryActivity"}, + } + + template.GenerateAll(data) +} + diff --git a/new_samples/choice/main.go b/new_samples/choice/main.go new file mode 100644 index 00000000..58939998 --- /dev/null +++ b/new_samples/choice/main.go @@ -0,0 +1,20 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +package main + +import ( + "fmt" + "os" + "os/signal" + "syscall" +) + +func main() { + StartWorker() + + done := make(chan os.Signal, 1) + signal.Notify(done, syscall.SIGINT) + fmt.Println("Cadence worker started, press ctrl+c to terminate...") + <-done +} diff --git a/new_samples/choice/worker.go b/new_samples/choice/worker.go new file mode 100644 index 00000000..331043d1 --- /dev/null +++ b/new_samples/choice/worker.go @@ -0,0 +1,104 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +// Package worker implements a Cadence worker with basic configurations. +package main + +import ( + "github.com/uber-go/tally" + apiv1 "github.com/uber/cadence-idl/go/proto/api/v1" + "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/compatibility" + "go.uber.org/cadence/worker" + "go.uber.org/cadence/workflow" + "go.uber.org/yarpc" + "go.uber.org/yarpc/peer" + yarpchostport "go.uber.org/yarpc/peer/hostport" + "go.uber.org/yarpc/transport/grpc" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +const ( + HostPort = "127.0.0.1:7833" + Domain = "cadence-samples" + // TaskListName identifies set of client workflows, activities, and workers. + // It could be your group or client or application name. + TaskListName = "cadence-samples-worker" + ClientName = "cadence-samples-worker" + CadenceService = "cadence-frontend" +) + +// StartWorker creates and starts a basic Cadence worker. +func StartWorker() { + logger, cadenceClient := BuildLogger(), BuildCadenceClient() + workerOptions := worker.Options{ + Logger: logger, + MetricsScope: tally.NewTestScope(TaskListName, nil), + } + + w := worker.New( + cadenceClient, + Domain, + TaskListName, + workerOptions) + // HelloWorld workflow registration + w.RegisterWorkflowWithOptions(ChoiceWorkflow, workflow.RegisterOptions{Name: "cadence_samples.ChoiceWorkflow"}) + w.RegisterActivityWithOptions(GetOrderActivity, activity.RegisterOptions{Name: "cadence_samples.GetOrderActivity"}) + w.RegisterActivityWithOptions(ProcessAppleActivity, activity.RegisterOptions{Name: "cadence_samples.ProcessAppleActivity"}) + w.RegisterActivityWithOptions(ProcessBananaActivity, activity.RegisterOptions{Name: "cadence_samples.ProcessBananaActivity"}) + w.RegisterActivityWithOptions(ProcessCherryActivity, activity.RegisterOptions{Name: "cadence_samples.ProcessCherryActivity"}) + + err := w.Start() + if err != nil { + panic("Failed to start worker: " + err.Error()) + } + logger.Info("Started Worker.", zap.String("worker", TaskListName)) + +} + +func BuildCadenceClient(dialOptions ...grpc.DialOption) workflowserviceclient.Interface { + grpcTransport := grpc.NewTransport() + // Create a single peer chooser that identifies the host/port and configures + // a gRPC dialer with TLS credentials + myChooser := peer.NewSingle( + yarpchostport.Identify(HostPort), + grpcTransport.NewDialer(dialOptions...), + ) + outbound := grpcTransport.NewOutbound(myChooser) + + dispatcher := yarpc.NewDispatcher(yarpc.Config{ + Name: ClientName, + Outbounds: yarpc.Outbounds{ + CadenceService: {Unary: outbound}, + }, + }) + if err := dispatcher.Start(); err != nil { + panic("Failed to start dispatcher: " + err.Error()) + } + + clientConfig := dispatcher.ClientConfig(CadenceService) + + // Create a compatibility adapter that wraps proto-based YARPC clients + // to provide a unified interface for domain, workflow, worker, and visibility APIs + return compatibility.NewThrift2ProtoAdapter( + apiv1.NewDomainAPIYARPCClient(clientConfig), + apiv1.NewWorkflowAPIYARPCClient(clientConfig), + apiv1.NewWorkerAPIYARPCClient(clientConfig), + apiv1.NewVisibilityAPIYARPCClient(clientConfig), + ) +} + +func BuildLogger() *zap.Logger { + config := zap.NewDevelopmentConfig() + config.Level.SetLevel(zapcore.InfoLevel) + + var err error + logger, err := config.Build() + if err != nil { + panic("Failed to setup logger: " + err.Error()) + } + + return logger +} diff --git a/new_samples/greetings/README.md b/new_samples/greetings/README.md new file mode 100644 index 00000000..b0a40fab --- /dev/null +++ b/new_samples/greetings/README.md @@ -0,0 +1,84 @@ + + + +# Greetings Sample + +## Prerequisites + +0. Install Cadence CLI. See instruction [here](https://cadenceworkflow.io/docs/cli/). +1. Run the Cadence server: + 1. Clone the [Cadence](https://github.com/cadence-workflow/cadence) repository if you haven't done already: `git clone https://github.com/cadence-workflow/cadence.git` + 2. Run `docker compose -f docker/docker-compose.yml up` to start Cadence server + 3. See more details at https://github.com/uber/cadence/blob/master/README.md +2. Once everything is up and running in Docker, open [localhost:8088](localhost:8088) to view Cadence UI. +3. Register the `cadence-samples` domain: + +```bash +cadence --env development --domain cadence-samples domain register +``` + +Refresh the [domains page](http://localhost:8088/domains) from step 2 to verify `cadence-samples` is registered. + +## Steps to run sample + +Inside the folder this sample is defined, run the following command: + +```bash +go run . +``` + +This will call the main function in main.go which starts the worker, which will be execute the sample workflow code + +## Greetings Workflow + +This sample demonstrates **sequential activity execution** - running activities one after another and passing results between them. + +### Start the Workflow + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --tl cadence-samples-worker \ + --et 60 \ + --workflow_type cadence_samples.GreetingsWorkflow +``` + +### What Happens + +The workflow executes three activities in sequence: + +``` +┌─────────────────┐ ┌─────────────────┐ ┌─────────────────────┐ +│ GetGreeting() │───▶│ GetName() │───▶│ SayGreeting(g, n) │ +│ returns "Hello" │ │ returns "Cadence"│ │ returns "Hello │ +└─────────────────┘ └─────────────────┘ │ Cadence!" │ + └─────────────────────┘ +``` + +1. `GetGreetingActivity` - Returns "Hello" +2. `GetNameActivity` - Returns "Cadence" +3. `SayGreetingActivity` - Combines them into "Hello Cadence!" + +### Key Concept: Sequential Execution + +```go +// First activity +err := workflow.ExecuteActivity(ctx, GetGreetingActivity).Get(ctx, &greeting) + +// Second activity (waits for first to complete) +err = workflow.ExecuteActivity(ctx, GetNameActivity).Get(ctx, &name) + +// Third activity uses results from first two +err = workflow.ExecuteActivity(ctx, SayGreetingActivity, greeting, name).Get(ctx, &result) +``` + +Each `.Get()` call blocks until the activity completes, ensuring sequential execution. + + +## References + +* The website: https://cadenceworkflow.io +* Cadence's server: https://github.com/uber/cadence +* Cadence's Go client: https://github.com/uber-go/cadence-client + diff --git a/new_samples/greetings/generator/README.md b/new_samples/greetings/generator/README.md new file mode 100644 index 00000000..1da35022 --- /dev/null +++ b/new_samples/greetings/generator/README.md @@ -0,0 +1,23 @@ + + + +# Sample Generator + +This folder is NOT part of the actual sample. It exists only for contributors who work on this sample. Please disregard it if you are trying to learn about Cadence. + +To create a better learning experience for Cadence users, each sample folder is designed to be self contained. Users can view every part of writing and running workflows, including: + +* Cadence client initialization +* Worker with workflow and activity registrations +* Workflow starter +* and the workflow code itself + +Some samples may have more or fewer parts depending on what they need to demonstrate. + +In most cases, the workflow code (e.g. `workflow.go`) is the part that users care about. The rest is boilerplate needed to run that workflow. For each sample folder, the workflow code should be written by hand. The boilerplate can be generated. Keeping all parts inside one folder gives early learners more value because they can see everything together rather than jumping across directories. + +## Contributing + +* When creating a new sample, follow the steps mentioned in the README file in the main samples folder. +* To update the sample workflow code, edit the workflow file directly. +* To update the worker, client, or other boilerplate logic, edit the generator file. If your change applies to all samples, update the common generator file inside the `template` folder. Edit the generator file in this folder only when the change should affect this sample alone. diff --git a/new_samples/greetings/generator/README_specific.md b/new_samples/greetings/generator/README_specific.md new file mode 100644 index 00000000..5eac189c --- /dev/null +++ b/new_samples/greetings/generator/README_specific.md @@ -0,0 +1,46 @@ +## Greetings Workflow + +This sample demonstrates **sequential activity execution** - running activities one after another and passing results between them. + +### Start the Workflow + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --tl cadence-samples-worker \ + --et 60 \ + --workflow_type cadence_samples.GreetingsWorkflow +``` + +### What Happens + +The workflow executes three activities in sequence: + +``` +┌─────────────────┐ ┌─────────────────┐ ┌─────────────────────┐ +│ GetGreeting() │───▶│ GetName() │───▶│ SayGreeting(g, n) │ +│ returns "Hello" │ │ returns "Cadence"│ │ returns "Hello │ +└─────────────────┘ └─────────────────┘ │ Cadence!" │ + └─────────────────────┘ +``` + +1. `GetGreetingActivity` - Returns "Hello" +2. `GetNameActivity` - Returns "Cadence" +3. `SayGreetingActivity` - Combines them into "Hello Cadence!" + +### Key Concept: Sequential Execution + +```go +// First activity +err := workflow.ExecuteActivity(ctx, GetGreetingActivity).Get(ctx, &greeting) + +// Second activity (waits for first to complete) +err = workflow.ExecuteActivity(ctx, GetNameActivity).Get(ctx, &name) + +// Third activity uses results from first two +err = workflow.ExecuteActivity(ctx, SayGreetingActivity, greeting, name).Get(ctx, &result) +``` + +Each `.Get()` call blocks until the activity completes, ensuring sequential execution. + diff --git a/new_samples/greetings/generator/generate.go b/new_samples/greetings/generator/generate.go new file mode 100644 index 00000000..93791ca6 --- /dev/null +++ b/new_samples/greetings/generator/generate.go @@ -0,0 +1,14 @@ +package main + +import "github.com/uber-common/cadence-samples/new_samples/template" + +func main() { + data := template.TemplateData{ + SampleName: "Greetings", + Workflows: []string{"GreetingsWorkflow"}, + Activities: []string{"GetGreetingActivity", "GetNameActivity", "SayGreetingActivity"}, + } + + template.GenerateAll(data) +} + diff --git a/new_samples/greetings/greetings_workflow.go b/new_samples/greetings/greetings_workflow.go new file mode 100644 index 00000000..45e8f59b --- /dev/null +++ b/new_samples/greetings/greetings_workflow.go @@ -0,0 +1,69 @@ +package main + +import ( + "fmt" + "time" + + "go.uber.org/cadence/workflow" + "go.uber.org/zap" +) + +// GreetingsWorkflow demonstrates sequential activity execution. +// It executes 3 activities in sequence, passing results from one to the next. +func GreetingsWorkflow(ctx workflow.Context) error { + ao := workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: time.Minute, + HeartbeatTimeout: time.Second * 20, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + logger := workflow.GetLogger(ctx) + logger.Info("GreetingsWorkflow started") + + // Step 1: Get greeting + var greeting string + err := workflow.ExecuteActivity(ctx, GetGreetingActivity).Get(ctx, &greeting) + if err != nil { + logger.Error("GetGreetingActivity failed", zap.Error(err)) + return err + } + logger.Info("Got greeting", zap.String("greeting", greeting)) + + // Step 2: Get name + var name string + err = workflow.ExecuteActivity(ctx, GetNameActivity).Get(ctx, &name) + if err != nil { + logger.Error("GetNameActivity failed", zap.Error(err)) + return err + } + logger.Info("Got name", zap.String("name", name)) + + // Step 3: Combine greeting and name + var result string + err = workflow.ExecuteActivity(ctx, SayGreetingActivity, greeting, name).Get(ctx, &result) + if err != nil { + logger.Error("SayGreetingActivity failed", zap.Error(err)) + return err + } + + logger.Info("Workflow completed", zap.String("result", result)) + return nil +} + +// GetGreetingActivity returns a greeting word. +func GetGreetingActivity() (string, error) { + return "Hello", nil +} + +// GetNameActivity returns a name. +func GetNameActivity() (string, error) { + return "Cadence", nil +} + +// SayGreetingActivity combines greeting and name into a full greeting message. +func SayGreetingActivity(greeting string, name string) (string, error) { + result := fmt.Sprintf("%s %s!", greeting, name) + return result, nil +} + diff --git a/new_samples/greetings/main.go b/new_samples/greetings/main.go new file mode 100644 index 00000000..58939998 --- /dev/null +++ b/new_samples/greetings/main.go @@ -0,0 +1,20 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +package main + +import ( + "fmt" + "os" + "os/signal" + "syscall" +) + +func main() { + StartWorker() + + done := make(chan os.Signal, 1) + signal.Notify(done, syscall.SIGINT) + fmt.Println("Cadence worker started, press ctrl+c to terminate...") + <-done +} diff --git a/new_samples/greetings/worker.go b/new_samples/greetings/worker.go new file mode 100644 index 00000000..0eb049e4 --- /dev/null +++ b/new_samples/greetings/worker.go @@ -0,0 +1,103 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +// Package worker implements a Cadence worker with basic configurations. +package main + +import ( + "github.com/uber-go/tally" + apiv1 "github.com/uber/cadence-idl/go/proto/api/v1" + "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/compatibility" + "go.uber.org/cadence/worker" + "go.uber.org/cadence/workflow" + "go.uber.org/yarpc" + "go.uber.org/yarpc/peer" + yarpchostport "go.uber.org/yarpc/peer/hostport" + "go.uber.org/yarpc/transport/grpc" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +const ( + HostPort = "127.0.0.1:7833" + Domain = "cadence-samples" + // TaskListName identifies set of client workflows, activities, and workers. + // It could be your group or client or application name. + TaskListName = "cadence-samples-worker" + ClientName = "cadence-samples-worker" + CadenceService = "cadence-frontend" +) + +// StartWorker creates and starts a basic Cadence worker. +func StartWorker() { + logger, cadenceClient := BuildLogger(), BuildCadenceClient() + workerOptions := worker.Options{ + Logger: logger, + MetricsScope: tally.NewTestScope(TaskListName, nil), + } + + w := worker.New( + cadenceClient, + Domain, + TaskListName, + workerOptions) + // HelloWorld workflow registration + w.RegisterWorkflowWithOptions(GreetingsWorkflow, workflow.RegisterOptions{Name: "cadence_samples.GreetingsWorkflow"}) + w.RegisterActivityWithOptions(GetGreetingActivity, activity.RegisterOptions{Name: "cadence_samples.GetGreetingActivity"}) + w.RegisterActivityWithOptions(GetNameActivity, activity.RegisterOptions{Name: "cadence_samples.GetNameActivity"}) + w.RegisterActivityWithOptions(SayGreetingActivity, activity.RegisterOptions{Name: "cadence_samples.SayGreetingActivity"}) + + err := w.Start() + if err != nil { + panic("Failed to start worker: " + err.Error()) + } + logger.Info("Started Worker.", zap.String("worker", TaskListName)) + +} + +func BuildCadenceClient(dialOptions ...grpc.DialOption) workflowserviceclient.Interface { + grpcTransport := grpc.NewTransport() + // Create a single peer chooser that identifies the host/port and configures + // a gRPC dialer with TLS credentials + myChooser := peer.NewSingle( + yarpchostport.Identify(HostPort), + grpcTransport.NewDialer(dialOptions...), + ) + outbound := grpcTransport.NewOutbound(myChooser) + + dispatcher := yarpc.NewDispatcher(yarpc.Config{ + Name: ClientName, + Outbounds: yarpc.Outbounds{ + CadenceService: {Unary: outbound}, + }, + }) + if err := dispatcher.Start(); err != nil { + panic("Failed to start dispatcher: " + err.Error()) + } + + clientConfig := dispatcher.ClientConfig(CadenceService) + + // Create a compatibility adapter that wraps proto-based YARPC clients + // to provide a unified interface for domain, workflow, worker, and visibility APIs + return compatibility.NewThrift2ProtoAdapter( + apiv1.NewDomainAPIYARPCClient(clientConfig), + apiv1.NewWorkflowAPIYARPCClient(clientConfig), + apiv1.NewWorkerAPIYARPCClient(clientConfig), + apiv1.NewVisibilityAPIYARPCClient(clientConfig), + ) +} + +func BuildLogger() *zap.Logger { + config := zap.NewDevelopmentConfig() + config.Level.SetLevel(zapcore.InfoLevel) + + var err error + logger, err := config.Build() + if err != nil { + panic("Failed to setup logger: " + err.Error()) + } + + return logger +} diff --git a/new_samples/pickfirst/README.md b/new_samples/pickfirst/README.md new file mode 100644 index 00000000..492ca244 --- /dev/null +++ b/new_samples/pickfirst/README.md @@ -0,0 +1,127 @@ + + + +# Pick First Sample + +## Prerequisites + +0. Install Cadence CLI. See instruction [here](https://cadenceworkflow.io/docs/cli/). +1. Run the Cadence server: + 1. Clone the [Cadence](https://github.com/cadence-workflow/cadence) repository if you haven't done already: `git clone https://github.com/cadence-workflow/cadence.git` + 2. Run `docker compose -f docker/docker-compose.yml up` to start Cadence server + 3. See more details at https://github.com/uber/cadence/blob/master/README.md +2. Once everything is up and running in Docker, open [localhost:8088](localhost:8088) to view Cadence UI. +3. Register the `cadence-samples` domain: + +```bash +cadence --env development --domain cadence-samples domain register +``` + +Refresh the [domains page](http://localhost:8088/domains) from step 2 to verify `cadence-samples` is registered. + +## Steps to run sample + +Inside the folder this sample is defined, run the following command: + +```bash +go run . +``` + +This will call the main function in main.go which starts the worker, which will be execute the sample workflow code + +## Pick First Workflow + +This sample demonstrates **race condition handling** - running multiple activities in parallel and using the result from whichever completes first. + +### Start the Workflow + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --tl cadence-samples-worker \ + --et 60 \ + --workflow_type cadence_samples.PickFirstWorkflow +``` + +### What Happens + +``` + ┌──────────────────┐ + │ PickFirstWorkflow│ + └────────┬─────────┘ + │ + ┌──────────┴──────────┐ + ▼ ▼ +┌─────────────┐ ┌─────────────┐ +│ RaceActivity│ │ RaceActivity│ +│ (2 seconds) │ │ (10 seconds)│ +└──────┬──────┘ └──────┬──────┘ + │ │ + ▼ │ + Completes first! │ + │ │ + ▼ ▼ + Use result CANCELLED +``` + +1. Two activities start in parallel with different durations +2. The first one to complete "wins" +3. All other pending activities are cancelled +4. Workflow uses the winner's result + +### Key Concept: Selector with Cancellation + +```go +childCtx, cancelHandler := workflow.WithCancel(ctx) + +// Start activities in parallel +f1 := workflow.ExecuteActivity(childCtx, RaceActivity, 0, 2*time.Second) +f2 := workflow.ExecuteActivity(childCtx, RaceActivity, 1, 10*time.Second) + +selector := workflow.NewSelector(ctx) +selector.AddFuture(f1, func(f workflow.Future) { + f.Get(ctx, &result) +}) +selector.AddFuture(f2, func(f workflow.Future) { + f.Get(ctx, &result) +}) + +// Wait for first to complete +selector.Select(ctx) + +// Cancel all others +cancelHandler() +``` + +### Key Concept: Activity Cancellation Handling + +```go +func RaceActivity(ctx context.Context, ...) (string, error) { + for { + activity.RecordHeartbeat(ctx, "status") + + select { + case <-ctx.Done(): + // We've been cancelled + return "cancelled", ctx.Err() + default: + // Continue working + } + } +} +``` + +### Real-World Use Cases + +- Multi-provider API calls (use fastest response) +- Redundant service calls for reliability +- Load balancing with failover + + +## References + +* The website: https://cadenceworkflow.io +* Cadence's server: https://github.com/uber/cadence +* Cadence's Go client: https://github.com/uber-go/cadence-client + diff --git a/new_samples/pickfirst/generator/README.md b/new_samples/pickfirst/generator/README.md new file mode 100644 index 00000000..1da35022 --- /dev/null +++ b/new_samples/pickfirst/generator/README.md @@ -0,0 +1,23 @@ + + + +# Sample Generator + +This folder is NOT part of the actual sample. It exists only for contributors who work on this sample. Please disregard it if you are trying to learn about Cadence. + +To create a better learning experience for Cadence users, each sample folder is designed to be self contained. Users can view every part of writing and running workflows, including: + +* Cadence client initialization +* Worker with workflow and activity registrations +* Workflow starter +* and the workflow code itself + +Some samples may have more or fewer parts depending on what they need to demonstrate. + +In most cases, the workflow code (e.g. `workflow.go`) is the part that users care about. The rest is boilerplate needed to run that workflow. For each sample folder, the workflow code should be written by hand. The boilerplate can be generated. Keeping all parts inside one folder gives early learners more value because they can see everything together rather than jumping across directories. + +## Contributing + +* When creating a new sample, follow the steps mentioned in the README file in the main samples folder. +* To update the sample workflow code, edit the workflow file directly. +* To update the worker, client, or other boilerplate logic, edit the generator file. If your change applies to all samples, update the common generator file inside the `template` folder. Edit the generator file in this folder only when the change should affect this sample alone. diff --git a/new_samples/pickfirst/generator/README_specific.md b/new_samples/pickfirst/generator/README_specific.md new file mode 100644 index 00000000..a5d23e61 --- /dev/null +++ b/new_samples/pickfirst/generator/README_specific.md @@ -0,0 +1,89 @@ +## Pick First Workflow + +This sample demonstrates **race condition handling** - running multiple activities in parallel and using the result from whichever completes first. + +### Start the Workflow + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --tl cadence-samples-worker \ + --et 60 \ + --workflow_type cadence_samples.PickFirstWorkflow +``` + +### What Happens + +``` + ┌──────────────────┐ + │ PickFirstWorkflow│ + └────────┬─────────┘ + │ + ┌──────────┴──────────┐ + ▼ ▼ +┌─────────────┐ ┌─────────────┐ +│ RaceActivity│ │ RaceActivity│ +│ (2 seconds) │ │ (10 seconds)│ +└──────┬──────┘ └──────┬──────┘ + │ │ + ▼ │ + Completes first! │ + │ │ + ▼ ▼ + Use result CANCELLED +``` + +1. Two activities start in parallel with different durations +2. The first one to complete "wins" +3. All other pending activities are cancelled +4. Workflow uses the winner's result + +### Key Concept: Selector with Cancellation + +```go +childCtx, cancelHandler := workflow.WithCancel(ctx) + +// Start activities in parallel +f1 := workflow.ExecuteActivity(childCtx, RaceActivity, 0, 2*time.Second) +f2 := workflow.ExecuteActivity(childCtx, RaceActivity, 1, 10*time.Second) + +selector := workflow.NewSelector(ctx) +selector.AddFuture(f1, func(f workflow.Future) { + f.Get(ctx, &result) +}) +selector.AddFuture(f2, func(f workflow.Future) { + f.Get(ctx, &result) +}) + +// Wait for first to complete +selector.Select(ctx) + +// Cancel all others +cancelHandler() +``` + +### Key Concept: Activity Cancellation Handling + +```go +func RaceActivity(ctx context.Context, ...) (string, error) { + for { + activity.RecordHeartbeat(ctx, "status") + + select { + case <-ctx.Done(): + // We've been cancelled + return "cancelled", ctx.Err() + default: + // Continue working + } + } +} +``` + +### Real-World Use Cases + +- Multi-provider API calls (use fastest response) +- Redundant service calls for reliability +- Load balancing with failover + diff --git a/new_samples/pickfirst/generator/generate.go b/new_samples/pickfirst/generator/generate.go new file mode 100644 index 00000000..a71db3eb --- /dev/null +++ b/new_samples/pickfirst/generator/generate.go @@ -0,0 +1,14 @@ +package main + +import "github.com/uber-common/cadence-samples/new_samples/template" + +func main() { + data := template.TemplateData{ + SampleName: "Pick First", + Workflows: []string{"PickFirstWorkflow"}, + Activities: []string{"RaceActivity"}, + } + + template.GenerateAll(data) +} + diff --git a/new_samples/pickfirst/main.go b/new_samples/pickfirst/main.go new file mode 100644 index 00000000..58939998 --- /dev/null +++ b/new_samples/pickfirst/main.go @@ -0,0 +1,20 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +package main + +import ( + "fmt" + "os" + "os/signal" + "syscall" +) + +func main() { + StartWorker() + + done := make(chan os.Signal, 1) + signal.Notify(done, syscall.SIGINT) + fmt.Println("Cadence worker started, press ctrl+c to terminate...") + <-done +} diff --git a/new_samples/pickfirst/pickfirst_workflow.go b/new_samples/pickfirst/pickfirst_workflow.go new file mode 100644 index 00000000..82db56e4 --- /dev/null +++ b/new_samples/pickfirst/pickfirst_workflow.go @@ -0,0 +1,90 @@ +package main + +import ( + "context" + "fmt" + "time" + + "go.uber.org/cadence/activity" + "go.uber.org/cadence/workflow" + "go.uber.org/zap" +) + +// PickFirstWorkflow demonstrates race condition handling. +// It executes activities in parallel and uses the result of the first to complete. +func PickFirstWorkflow(ctx workflow.Context) error { + logger := workflow.GetLogger(ctx) + logger.Info("PickFirstWorkflow started") + + // Create cancellable context for all activities + childCtx, cancelHandler := workflow.WithCancel(ctx) + ao := workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: time.Minute, + HeartbeatTimeout: time.Second * 20, + WaitForCancellation: true, // Wait for cancellation to complete + } + childCtx = workflow.WithActivityOptions(childCtx, ao) + + selector := workflow.NewSelector(ctx) + var firstResponse string + + // Start two activities with different durations + // Activity 0: takes 2 seconds (will win) + // Activity 1: takes 10 seconds (will be cancelled) + f1 := workflow.ExecuteActivity(childCtx, RaceActivity, 0, time.Second*2) + f2 := workflow.ExecuteActivity(childCtx, RaceActivity, 1, time.Second*10) + pendingFutures := []workflow.Future{f1, f2} + + selector.AddFuture(f1, func(f workflow.Future) { + f.Get(ctx, &firstResponse) + }).AddFuture(f2, func(f workflow.Future) { + f.Get(ctx, &firstResponse) + }) + + // Wait for first to complete + selector.Select(ctx) + logger.Info("First activity completed", zap.String("result", firstResponse)) + + // Cancel all other pending activities + cancelHandler() + + // Wait for all activities to acknowledge cancellation + for _, f := range pendingFutures { + f.Get(ctx, nil) + } + + logger.Info("PickFirstWorkflow completed") + return nil +} + +// RaceActivity simulates an activity that takes a specified duration. +// It heartbeats every second and checks for cancellation. +func RaceActivity(ctx context.Context, branchID int, duration time.Duration) (string, error) { + logger := activity.GetLogger(ctx) + logger.Info("RaceActivity started", zap.Int("branch", branchID), zap.Duration("duration", duration)) + + elapsed := time.Duration(0) + for elapsed < duration { + time.Sleep(time.Second) + elapsed += time.Second + + // Heartbeat to check for cancellation + activity.RecordHeartbeat(ctx, fmt.Sprintf("branch %d: %v elapsed", branchID, elapsed)) + + select { + case <-ctx.Done(): + // Activity was cancelled + msg := fmt.Sprintf("Branch %d cancelled after %v", branchID, elapsed) + logger.Info(msg) + return msg, ctx.Err() + default: + // Continue working + } + } + + msg := fmt.Sprintf("Branch %d completed in %v", branchID, duration) + logger.Info(msg) + return msg, nil +} + diff --git a/new_samples/pickfirst/worker.go b/new_samples/pickfirst/worker.go new file mode 100644 index 00000000..8b691355 --- /dev/null +++ b/new_samples/pickfirst/worker.go @@ -0,0 +1,101 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +// Package worker implements a Cadence worker with basic configurations. +package main + +import ( + "github.com/uber-go/tally" + apiv1 "github.com/uber/cadence-idl/go/proto/api/v1" + "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/compatibility" + "go.uber.org/cadence/worker" + "go.uber.org/cadence/workflow" + "go.uber.org/yarpc" + "go.uber.org/yarpc/peer" + yarpchostport "go.uber.org/yarpc/peer/hostport" + "go.uber.org/yarpc/transport/grpc" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +const ( + HostPort = "127.0.0.1:7833" + Domain = "cadence-samples" + // TaskListName identifies set of client workflows, activities, and workers. + // It could be your group or client or application name. + TaskListName = "cadence-samples-worker" + ClientName = "cadence-samples-worker" + CadenceService = "cadence-frontend" +) + +// StartWorker creates and starts a basic Cadence worker. +func StartWorker() { + logger, cadenceClient := BuildLogger(), BuildCadenceClient() + workerOptions := worker.Options{ + Logger: logger, + MetricsScope: tally.NewTestScope(TaskListName, nil), + } + + w := worker.New( + cadenceClient, + Domain, + TaskListName, + workerOptions) + // HelloWorld workflow registration + w.RegisterWorkflowWithOptions(PickFirstWorkflow, workflow.RegisterOptions{Name: "cadence_samples.PickFirstWorkflow"}) + w.RegisterActivityWithOptions(RaceActivity, activity.RegisterOptions{Name: "cadence_samples.RaceActivity"}) + + err := w.Start() + if err != nil { + panic("Failed to start worker: " + err.Error()) + } + logger.Info("Started Worker.", zap.String("worker", TaskListName)) + +} + +func BuildCadenceClient(dialOptions ...grpc.DialOption) workflowserviceclient.Interface { + grpcTransport := grpc.NewTransport() + // Create a single peer chooser that identifies the host/port and configures + // a gRPC dialer with TLS credentials + myChooser := peer.NewSingle( + yarpchostport.Identify(HostPort), + grpcTransport.NewDialer(dialOptions...), + ) + outbound := grpcTransport.NewOutbound(myChooser) + + dispatcher := yarpc.NewDispatcher(yarpc.Config{ + Name: ClientName, + Outbounds: yarpc.Outbounds{ + CadenceService: {Unary: outbound}, + }, + }) + if err := dispatcher.Start(); err != nil { + panic("Failed to start dispatcher: " + err.Error()) + } + + clientConfig := dispatcher.ClientConfig(CadenceService) + + // Create a compatibility adapter that wraps proto-based YARPC clients + // to provide a unified interface for domain, workflow, worker, and visibility APIs + return compatibility.NewThrift2ProtoAdapter( + apiv1.NewDomainAPIYARPCClient(clientConfig), + apiv1.NewWorkflowAPIYARPCClient(clientConfig), + apiv1.NewWorkerAPIYARPCClient(clientConfig), + apiv1.NewVisibilityAPIYARPCClient(clientConfig), + ) +} + +func BuildLogger() *zap.Logger { + config := zap.NewDevelopmentConfig() + config.Level.SetLevel(zapcore.InfoLevel) + + var err error + logger, err := config.Build() + if err != nil { + panic("Failed to setup logger: " + err.Error()) + } + + return logger +} diff --git a/new_samples/timer/README.md b/new_samples/timer/README.md new file mode 100644 index 00000000..920e50b4 --- /dev/null +++ b/new_samples/timer/README.md @@ -0,0 +1,106 @@ + + + +# Timer Sample + +## Prerequisites + +0. Install Cadence CLI. See instruction [here](https://cadenceworkflow.io/docs/cli/). +1. Run the Cadence server: + 1. Clone the [Cadence](https://github.com/cadence-workflow/cadence) repository if you haven't done already: `git clone https://github.com/cadence-workflow/cadence.git` + 2. Run `docker compose -f docker/docker-compose.yml up` to start Cadence server + 3. See more details at https://github.com/uber/cadence/blob/master/README.md +2. Once everything is up and running in Docker, open [localhost:8088](localhost:8088) to view Cadence UI. +3. Register the `cadence-samples` domain: + +```bash +cadence --env development --domain cadence-samples domain register +``` + +Refresh the [domains page](http://localhost:8088/domains) from step 2 to verify `cadence-samples` is registered. + +## Steps to run sample + +Inside the folder this sample is defined, run the following command: + +```bash +go run . +``` + +This will call the main function in main.go which starts the worker, which will be execute the sample workflow code + +## Timer Workflow + +This sample demonstrates **timer usage** for timeouts and delayed notifications. + +### Start the Workflow + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --tl cadence-samples-worker \ + --et 60 \ + --workflow_type cadence_samples.TimerWorkflow \ + --input '5000000000' +``` + +The input is the processing threshold in nanoseconds (5 seconds = 5000000000). + +### What Happens + +``` +┌──────────────────────────────────────────────────────────────┐ +│ TimerWorkflow │ +│ │ +│ ┌─────────────────┐ ┌─────────────────┐ │ +│ │ OrderProcessing │ │ Timer (5s) │ │ +│ │ (random 0-10s) │ │ │ │ +│ └────────┬────────┘ └────────┬────────┘ │ +│ │ │ │ +│ ▼ ▼ │ +│ If completes first: If fires first: │ +│ Cancel timer Send notification email │ +│ Wait for processing │ +└──────────────────────────────────────────────────────────────┘ +``` + +1. Starts a long-running `OrderProcessingActivity` (takes random 0-10 seconds) +2. Starts a timer for the threshold duration +3. **If processing finishes first**: Timer is cancelled +4. **If timer fires first**: Sends notification email, then waits for processing + +### Key Concept: Timer with Cancellation + +```go +childCtx, cancelHandler := workflow.WithCancel(ctx) + +// Start processing +f := workflow.ExecuteActivity(ctx, orderProcessingActivity) +selector.AddFuture(f, func(f workflow.Future) { + processingDone = true + cancelHandler() // Cancel the timer if processing completes +}) + +// Start timer +timerFuture := workflow.NewTimer(childCtx, threshold) +selector.AddFuture(timerFuture, func(f workflow.Future) { + if !processingDone { + workflow.ExecuteActivity(ctx, sendEmailActivity) + } +}) +``` + +### Real-World Use Cases + +- Order processing with SLA monitoring +- Payment processing with timeout alerts +- API calls with fallback mechanisms + + +## References + +* The website: https://cadenceworkflow.io +* Cadence's server: https://github.com/uber/cadence +* Cadence's Go client: https://github.com/uber-go/cadence-client + diff --git a/new_samples/timer/generator/README.md b/new_samples/timer/generator/README.md new file mode 100644 index 00000000..1da35022 --- /dev/null +++ b/new_samples/timer/generator/README.md @@ -0,0 +1,23 @@ + + + +# Sample Generator + +This folder is NOT part of the actual sample. It exists only for contributors who work on this sample. Please disregard it if you are trying to learn about Cadence. + +To create a better learning experience for Cadence users, each sample folder is designed to be self contained. Users can view every part of writing and running workflows, including: + +* Cadence client initialization +* Worker with workflow and activity registrations +* Workflow starter +* and the workflow code itself + +Some samples may have more or fewer parts depending on what they need to demonstrate. + +In most cases, the workflow code (e.g. `workflow.go`) is the part that users care about. The rest is boilerplate needed to run that workflow. For each sample folder, the workflow code should be written by hand. The boilerplate can be generated. Keeping all parts inside one folder gives early learners more value because they can see everything together rather than jumping across directories. + +## Contributing + +* When creating a new sample, follow the steps mentioned in the README file in the main samples folder. +* To update the sample workflow code, edit the workflow file directly. +* To update the worker, client, or other boilerplate logic, edit the generator file. If your change applies to all samples, update the common generator file inside the `template` folder. Edit the generator file in this folder only when the change should affect this sample alone. diff --git a/new_samples/timer/generator/README_specific.md b/new_samples/timer/generator/README_specific.md new file mode 100644 index 00000000..f746d1c8 --- /dev/null +++ b/new_samples/timer/generator/README_specific.md @@ -0,0 +1,68 @@ +## Timer Workflow + +This sample demonstrates **timer usage** for timeouts and delayed notifications. + +### Start the Workflow + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --tl cadence-samples-worker \ + --et 60 \ + --workflow_type cadence_samples.TimerWorkflow \ + --input '5000000000' +``` + +The input is the processing threshold in nanoseconds (5 seconds = 5000000000). + +### What Happens + +``` +┌──────────────────────────────────────────────────────────────┐ +│ TimerWorkflow │ +│ │ +│ ┌─────────────────┐ ┌─────────────────┐ │ +│ │ OrderProcessing │ │ Timer (5s) │ │ +│ │ (random 0-10s) │ │ │ │ +│ └────────┬────────┘ └────────┬────────┘ │ +│ │ │ │ +│ ▼ ▼ │ +│ If completes first: If fires first: │ +│ Cancel timer Send notification email │ +│ Wait for processing │ +└──────────────────────────────────────────────────────────────┘ +``` + +1. Starts a long-running `OrderProcessingActivity` (takes random 0-10 seconds) +2. Starts a timer for the threshold duration +3. **If processing finishes first**: Timer is cancelled +4. **If timer fires first**: Sends notification email, then waits for processing + +### Key Concept: Timer with Cancellation + +```go +childCtx, cancelHandler := workflow.WithCancel(ctx) + +// Start processing +f := workflow.ExecuteActivity(ctx, orderProcessingActivity) +selector.AddFuture(f, func(f workflow.Future) { + processingDone = true + cancelHandler() // Cancel the timer if processing completes +}) + +// Start timer +timerFuture := workflow.NewTimer(childCtx, threshold) +selector.AddFuture(timerFuture, func(f workflow.Future) { + if !processingDone { + workflow.ExecuteActivity(ctx, sendEmailActivity) + } +}) +``` + +### Real-World Use Cases + +- Order processing with SLA monitoring +- Payment processing with timeout alerts +- API calls with fallback mechanisms + diff --git a/new_samples/timer/generator/generate.go b/new_samples/timer/generator/generate.go new file mode 100644 index 00000000..380f0ac3 --- /dev/null +++ b/new_samples/timer/generator/generate.go @@ -0,0 +1,14 @@ +package main + +import "github.com/uber-common/cadence-samples/new_samples/template" + +func main() { + data := template.TemplateData{ + SampleName: "Timer", + Workflows: []string{"TimerWorkflow"}, + Activities: []string{"OrderProcessingActivity", "SendEmailActivity"}, + } + + template.GenerateAll(data) +} + diff --git a/new_samples/timer/main.go b/new_samples/timer/main.go new file mode 100644 index 00000000..58939998 --- /dev/null +++ b/new_samples/timer/main.go @@ -0,0 +1,20 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +package main + +import ( + "fmt" + "os" + "os/signal" + "syscall" +) + +func main() { + StartWorker() + + done := make(chan os.Signal, 1) + signal.Notify(done, syscall.SIGINT) + fmt.Println("Cadence worker started, press ctrl+c to terminate...") + <-done +} diff --git a/new_samples/timer/timer_workflow.go b/new_samples/timer/timer_workflow.go new file mode 100644 index 00000000..161fef98 --- /dev/null +++ b/new_samples/timer/timer_workflow.go @@ -0,0 +1,84 @@ +package main + +import ( + "context" + "math/rand" + "time" + + "go.uber.org/cadence/activity" + "go.uber.org/cadence/workflow" + "go.uber.org/zap" +) + +// TimerWorkflow demonstrates using timers for timeouts and delayed notifications. +// It starts a long-running process and sends a notification if it takes too long. +func TimerWorkflow(ctx workflow.Context, processingTimeThreshold time.Duration) error { + ao := workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: time.Minute, + HeartbeatTimeout: time.Second * 20, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + logger := workflow.GetLogger(ctx) + logger.Info("TimerWorkflow started", zap.Duration("threshold", processingTimeThreshold)) + + // Create a cancellable context for the timer + childCtx, cancelHandler := workflow.WithCancel(ctx) + selector := workflow.NewSelector(ctx) + + // Track if processing is done + var processingDone bool + + // Start the order processing activity + f := workflow.ExecuteActivity(ctx, OrderProcessingActivity) + selector.AddFuture(f, func(f workflow.Future) { + processingDone = true + // Cancel the timer since processing completed + cancelHandler() + logger.Info("Processing completed, timer cancelled") + }) + + // Start a timer that fires if processing takes too long + timerFuture := workflow.NewTimer(childCtx, processingTimeThreshold) + selector.AddFuture(timerFuture, func(f workflow.Future) { + if !processingDone { + // Processing not done when timer fires, send notification + logger.Info("Timer fired - processing taking too long, sending notification") + workflow.ExecuteActivity(ctx, SendEmailActivity).Get(ctx, nil) + } + }) + + // Wait for either timer or processing to complete first + selector.Select(ctx) + + // If timer fired first, still wait for processing to complete + if !processingDone { + selector.Select(ctx) + } + + logger.Info("TimerWorkflow completed") + return nil +} + +// OrderProcessingActivity simulates a long-running order processing operation. +// Processing time is random between 0-10 seconds. +func OrderProcessingActivity(ctx context.Context) error { + logger := activity.GetLogger(ctx) + logger.Info("OrderProcessingActivity started") + + // Simulate random processing time + timeNeededToProcess := time.Second * time.Duration(rand.Intn(10)) + time.Sleep(timeNeededToProcess) + + logger.Info("OrderProcessingActivity completed", zap.Duration("duration", timeNeededToProcess)) + return nil +} + +// SendEmailActivity sends a notification email when processing takes too long. +func SendEmailActivity(ctx context.Context) error { + logger := activity.GetLogger(ctx) + logger.Info("SendEmailActivity: Sending notification - processing is taking longer than expected") + return nil +} + diff --git a/new_samples/timer/worker.go b/new_samples/timer/worker.go new file mode 100644 index 00000000..aa716cbf --- /dev/null +++ b/new_samples/timer/worker.go @@ -0,0 +1,102 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +// Package worker implements a Cadence worker with basic configurations. +package main + +import ( + "github.com/uber-go/tally" + apiv1 "github.com/uber/cadence-idl/go/proto/api/v1" + "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/compatibility" + "go.uber.org/cadence/worker" + "go.uber.org/cadence/workflow" + "go.uber.org/yarpc" + "go.uber.org/yarpc/peer" + yarpchostport "go.uber.org/yarpc/peer/hostport" + "go.uber.org/yarpc/transport/grpc" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +const ( + HostPort = "127.0.0.1:7833" + Domain = "cadence-samples" + // TaskListName identifies set of client workflows, activities, and workers. + // It could be your group or client or application name. + TaskListName = "cadence-samples-worker" + ClientName = "cadence-samples-worker" + CadenceService = "cadence-frontend" +) + +// StartWorker creates and starts a basic Cadence worker. +func StartWorker() { + logger, cadenceClient := BuildLogger(), BuildCadenceClient() + workerOptions := worker.Options{ + Logger: logger, + MetricsScope: tally.NewTestScope(TaskListName, nil), + } + + w := worker.New( + cadenceClient, + Domain, + TaskListName, + workerOptions) + // HelloWorld workflow registration + w.RegisterWorkflowWithOptions(TimerWorkflow, workflow.RegisterOptions{Name: "cadence_samples.TimerWorkflow"}) + w.RegisterActivityWithOptions(OrderProcessingActivity, activity.RegisterOptions{Name: "cadence_samples.OrderProcessingActivity"}) + w.RegisterActivityWithOptions(SendEmailActivity, activity.RegisterOptions{Name: "cadence_samples.SendEmailActivity"}) + + err := w.Start() + if err != nil { + panic("Failed to start worker: " + err.Error()) + } + logger.Info("Started Worker.", zap.String("worker", TaskListName)) + +} + +func BuildCadenceClient(dialOptions ...grpc.DialOption) workflowserviceclient.Interface { + grpcTransport := grpc.NewTransport() + // Create a single peer chooser that identifies the host/port and configures + // a gRPC dialer with TLS credentials + myChooser := peer.NewSingle( + yarpchostport.Identify(HostPort), + grpcTransport.NewDialer(dialOptions...), + ) + outbound := grpcTransport.NewOutbound(myChooser) + + dispatcher := yarpc.NewDispatcher(yarpc.Config{ + Name: ClientName, + Outbounds: yarpc.Outbounds{ + CadenceService: {Unary: outbound}, + }, + }) + if err := dispatcher.Start(); err != nil { + panic("Failed to start dispatcher: " + err.Error()) + } + + clientConfig := dispatcher.ClientConfig(CadenceService) + + // Create a compatibility adapter that wraps proto-based YARPC clients + // to provide a unified interface for domain, workflow, worker, and visibility APIs + return compatibility.NewThrift2ProtoAdapter( + apiv1.NewDomainAPIYARPCClient(clientConfig), + apiv1.NewWorkflowAPIYARPCClient(clientConfig), + apiv1.NewWorkerAPIYARPCClient(clientConfig), + apiv1.NewVisibilityAPIYARPCClient(clientConfig), + ) +} + +func BuildLogger() *zap.Logger { + config := zap.NewDevelopmentConfig() + config.Level.SetLevel(zapcore.InfoLevel) + + var err error + logger, err := config.Build() + if err != nil { + panic("Failed to setup logger: " + err.Error()) + } + + return logger +} From d0152f891970993ae8f4e993b9754cca459e6c2e Mon Sep 17 00:00:00 2001 From: Diana Zawadzki Date: Tue, 2 Dec 2025 16:05:38 -0800 Subject: [PATCH 3/3] fix: Verify all samples run end-to-end and fix heartbeat timeout bug Ran all 7 samples against Cadence server to verify they work: - greetings, timer, branch, choice, pickfirst, activities, operations Discovered bug: Activities without heartbeating were failing with ActivityTaskTimedOut (HEARTBEAT) because HeartbeatTimeout was set but the activities never called activity.RecordHeartbeat(). Fix: Removed HeartbeatTimeout from simple activities (greetings, branch, choice) that complete quickly and don't need heartbeating. Signed-off-by: Diana Zawadzki --- new_samples/branch/branch_workflow.go | 2 -- new_samples/choice/choice_workflow.go | 1 - new_samples/greetings/greetings_workflow.go | 1 - 3 files changed, 4 deletions(-) diff --git a/new_samples/branch/branch_workflow.go b/new_samples/branch/branch_workflow.go index a24c7cb9..133e892f 100644 --- a/new_samples/branch/branch_workflow.go +++ b/new_samples/branch/branch_workflow.go @@ -17,7 +17,6 @@ func BranchWorkflow(ctx workflow.Context) error { ao := workflow.ActivityOptions{ ScheduleToStartTimeout: time.Minute, StartToCloseTimeout: time.Minute, - HeartbeatTimeout: time.Second * 20, } ctx = workflow.WithActivityOptions(ctx, ao) @@ -52,7 +51,6 @@ func ParallelWorkflow(ctx workflow.Context) error { ao := workflow.ActivityOptions{ ScheduleToStartTimeout: time.Minute, StartToCloseTimeout: time.Minute, - HeartbeatTimeout: time.Second * 20, } ctx = workflow.WithActivityOptions(ctx, ao) diff --git a/new_samples/choice/choice_workflow.go b/new_samples/choice/choice_workflow.go index ec4d57f1..6e2598be 100644 --- a/new_samples/choice/choice_workflow.go +++ b/new_samples/choice/choice_workflow.go @@ -24,7 +24,6 @@ func ChoiceWorkflow(ctx workflow.Context) error { ao := workflow.ActivityOptions{ ScheduleToStartTimeout: time.Minute, StartToCloseTimeout: time.Minute, - HeartbeatTimeout: time.Second * 20, } ctx = workflow.WithActivityOptions(ctx, ao) diff --git a/new_samples/greetings/greetings_workflow.go b/new_samples/greetings/greetings_workflow.go index 45e8f59b..0c57aa5f 100644 --- a/new_samples/greetings/greetings_workflow.go +++ b/new_samples/greetings/greetings_workflow.go @@ -14,7 +14,6 @@ func GreetingsWorkflow(ctx workflow.Context) error { ao := workflow.ActivityOptions{ ScheduleToStartTimeout: time.Minute, StartToCloseTimeout: time.Minute, - HeartbeatTimeout: time.Second * 20, } ctx = workflow.WithActivityOptions(ctx, ao)