11---
22Description : " "
3- date : " 2025-11-14 "
3+ date : " 2025-12-03 "
44lastmod : " "
55tags : []
66title : ' Eino: Workflow 编排框架'
@@ -11,7 +11,16 @@ weight: 3
1111
1212是一套编排的 API,与 Graph API 在架构上处于同一层:
1313
14- <a href =" /img/eino/workflow_api_layer.png " target =" _blank " ><img src =" /img/eino/workflow_api_layer.png " width =" 100% " /></a >
14+ ``` mermaid
15+ flowchart LR
16+ E[Eino compose engine]
17+ G[Graph API]
18+ W[Workflow API]
19+ C[Chain API]
20+ E --> G
21+ E --> W
22+ G --> C
23+ ```
1524
1625本质特点是:
1726
@@ -75,40 +84,40 @@ START -> node -> END
7584<a href =" /img/eino/workflow_simple.png " target =" _blank " ><img src =" /img/eino/workflow_simple.png " width =" 100% " /></a >
7685
7786``` go
78- _ // creates and invokes a simple workflow with only a Lambda node._
79- _ // Since all field mappings are ALL to ALL mappings_
80- _ // (by using AddInput without field mappings),_
81- _ // this simple workflow is equivalent to a Graph: START -> lambda -> END._
87+ // creates and invokes a simple workflow with only a Lambda node.
88+ // Since all field mappings are ALL to ALL mappings
89+ // (by using AddInput without field mappings),
90+ // this simple workflow is equivalent to a Graph: START -> lambda -> END.
8291func main () {
83- _ // create a Workflow, just like creating a Graph_
84- _ _wf := compose.NewWorkflow [int , string ]()
92+ // create a Workflow, just like creating a Graph
93+ wf := compose.NewWorkflow [int , string ]()
8594
86- _ // add a lambda node to the Workflow, just like adding the lambda to a Graph_
87- _ _wf .AddLambdaNode (" lambda" , compose.InvokableLambda (
95+ // add a lambda node to the Workflow, just like adding the lambda to a Graph
96+ wf .AddLambdaNode (" lambda" , compose.InvokableLambda (
8897 func (ctx context.Context , in int ) (string , error ) {
8998 return strconv.Itoa (in), nil
9099 })).
91- _ // add an input to this lambda node from START._
92- _ // this means mapping all output of START to the input of the lambda._
93- _ // the effect of AddInput is to set both a control dependency_
94- _ // and a data dependency._
95- _ _AddInput (compose._START_ )
96-
97- _ // obtain the compose.END of the workflow for method chaining_
98- _ _wf .End ().
99- _ // add an input to compose.END,_
100- _ // which means 'using ALL output of lambda node as output of END'._
101- _ _AddInput (" lambda" )
102-
103- _ // compile the Workflow, just like compiling a Graph_
104- _ _run , err := wf.Compile (context.Background ())
100+ // add an input to this lambda node from START.
101+ // this means mapping all output of START to the input of the lambda.
102+ // the effect of AddInput is to set both a control dependency
103+ // and a data dependency.
104+ AddInput (compose.START )
105+
106+ // obtain the compose.END of the workflow for method chaining
107+ wf .End ().
108+ // add an input to compose.END,
109+ // which means 'using ALL output of lambda node as output of END'.
110+ AddInput (" lambda" )
111+
112+ // compile the Workflow, just like compiling a Graph
113+ run , err := wf.Compile (context.Background ())
105114 if err != nil {
106115 logs.Errorf (" workflow compile error: %v " , err)
107116 return
108117 }
109118
110- _ // invoke the Workflow, just like invoking a Graph_
111- _ _result , err := run.Invoke (context.Background (), 1 )
119+ // invoke the Workflow, just like invoking a Graph
120+ result , err := run.Invoke (context.Background (), 1 )
112121 if err != nil {
113122 logs.Errorf (" workflow run err: %v " , err)
114123 return
@@ -269,15 +278,15 @@ func main() {
269278 wf := compose.NewWorkflow [calculator, int ]()
270279
271280 wf.AddLambdaNode (" adder" , compose.InvokableLambda (adder)).
272- AddInput (compose._START_ , compose.FromField (" Add" ))
281+ AddInput (compose.START , compose.FromField (" Add" ))
273282
274283 wf.AddLambdaNode (" mul" , compose.InvokableLambda (multiplier)).
275284 AddInput (" adder" , compose.ToField (" A" )).
276- AddInputWithOptions (compose._START_ , []*compose.FieldMapping {compose.MapFields (" Multiply" , " B" )},
277- _ // use WithNoDirectDependency to declare a 'data-only' dependency,_
278- _ // in this case, START node's execution status will not determine whether 'mul' node can execute._
279- _ // START node only passes one field of its output to 'mul' node._
280- _ _compose .WithNoDirectDependency ())
285+ AddInputWithOptions (compose.START , []*compose.FieldMapping {compose.MapFields (" Multiply" , " B" )},
286+ // use WithNoDirectDependency to declare a 'data-only' dependency,
287+ // in this case, START node's execution status will not determine whether 'mul' node can execute.
288+ // START node only passes one field of its output to 'mul' node.
289+ compose .WithNoDirectDependency ())
281290
282291 wf.End ().AddInput (" mul" )
283292
@@ -315,8 +324,8 @@ func (n *WorkflowNode) AddInputWithOptions(fromNodeKey string, inputs []*FieldMa
315324``` go
316325func WithNoDirectDependency () WorkflowAddInputOpt {
317326 return func (opt *workflowAddInputOpts) {
318- opt.noDirectDependency = _true_
319- _ _ }
327+ opt.noDirectDependency = true
328+ }
320329}
321330```
322331
@@ -412,13 +421,13 @@ func (n *WorkflowNode) AddDependency(fromNodeKey string) *WorkflowNode {
412421在上面的例子中,我们用与 Graph API 几乎完全相同的方式添加了一个 branch:
413422
414423``` go
415- _ // add a branch just like adding branch in Graph._
424+ // add a branch just like adding branch in Graph.
416425 wf.AddBranch (" b1" , compose.NewGraphBranch (func (ctx context .Context , in float64 ) (string , error ) {
417426 if in > 5.0 {
418- return compose._END_ , nil
427+ return compose.END , nil
419428 }
420429 return " b2" , nil
421- }, map [string ]bool {compose._END_ : _true_ , " b2" : _true_ }))
430+ }, map [string ]bool {compose.END : true , " b2" : true }))
422431```
423432
424433branch 语义与 Graph 的 AllPredecessor 模式下的 branch 语义相同:
@@ -476,25 +485,25 @@ func main() {
476485 wf := compose.NewWorkflow [float64 , map [string ]float64 ]()
477486
478487 wf.AddLambdaNode (" b1" , compose.InvokableLambda (bidder)).
479- AddInput (compose._START_ , compose.ToField (" Price" )).
480- _ // set 'Budget' field to 3.0 for b1_
481- _ _SetStaticValue ([]string {" Budget" }, 3.0 )
488+ AddInput (compose.START , compose.ToField (" Price" )).
489+ // set 'Budget' field to 3.0 for b1
490+ SetStaticValue ([]string {" Budget" }, 3.0 )
482491
483- _ // add a branch just like adding branch in Graph._
484- _ _wf .AddBranch (" b1" , compose.NewGraphBranch (func (ctx context.Context , in float64 ) (string , error ) {
492+ // add a branch just like adding branch in Graph.
493+ wf .AddBranch (" b1" , compose.NewGraphBranch (func (ctx context.Context , in float64 ) (string , error ) {
485494 if in > 5.0 {
486- return compose._END_ , nil
495+ return compose.END , nil
487496 }
488497 return " b2" , nil
489- }, map [string ]bool {compose._END_ : _true_ , " b2" : _true_ }))
498+ }, map [string ]bool {compose.END : true , " b2" : true }))
490499
491500 wf.AddLambdaNode (" b2" , compose.InvokableLambda (bidder)).
492- _ // b2 executes strictly after b1, but does not rely on b1's output,_
493- _ // which means b2 depends on b1, but no data passing between them._
494- _ _AddDependency (" b1" ).
495- AddInputWithOptions (compose._START_ , []*compose.FieldMapping {compose.ToField (" Price" )}, compose.WithNoDirectDependency ()).
496- _ // set 'Budget' field to 4.0 for b2_
497- _ _SetStaticValue ([]string {" Budget" }, 4.0 )
501+ // b2 executes strictly after b1, but does not rely on b1's output,
502+ // which means b2 depends on b1, but no data passing between them.
503+ AddDependency (" b1" ).
504+ AddInputWithOptions (compose.START , []*compose.FieldMapping {compose.ToField (" Price" )}, compose.WithNoDirectDependency ()).
505+ // set 'Budget' field to 4.0 for b2
506+ SetStaticValue ([]string {" Budget" }, 4.0 )
498507
499508 wf.End ().AddInput (" b1" , compose.ToField (" bidder1" )).
500509 AddInput (" b2" , compose.ToField (" bidder2" ))
@@ -544,25 +553,25 @@ func (n *WorkflowNode) SetStaticValue(path FieldPath, value any) *WorkflowNode {
544553完成后的代码:
545554
546555``` go
547- _ // demonstrates the stream field mapping ability of eino workflow._
548- _ // It's modified from 2_field_mapping._
556+ // demonstrates the stream field mapping ability of eino workflow.
557+ // It's modified from 2_field_mapping.
549558func main () {
550559 type counter struct {
551- FullStr string _ // exported because we will do field mapping for this field_
552- _ _SubStr string _ // exported because we will do field mapping for this field_
553- _ _ }
560+ FullStr string // exported because we will do field mapping for this field
561+ SubStr string // exported because we will do field mapping for this field
562+ }
554563
555- _ // wordCounter is a transformable lambda function that_
556- _ // count occurrences of SubStr within FullStr, for each trunk._
557- _ _wordCounter := func (ctx context.Context , c *schema.StreamReader [counter]) (
564+ // wordCounter is a transformable lambda function that
565+ // count occurrences of SubStr within FullStr, for each trunk.
566+ wordCounter := func (ctx context.Context , c *schema.StreamReader [counter]) (
558567 *schema.StreamReader [int ], error ) {
559568 var subStr , cachedStr string
560569 return schema.StreamReaderWithConvert (c, func (co counter) (int , error ) {
561570 if len (co.SubStr ) > 0 {
562- _ // static values will not always come in the first chunk,_
563- _ // so before the static value (SubStr) comes in,_
564- _ // we need to cache the full string_
565- _ _subStr = co.SubStr
571+ // static values will not always come in the first chunk,
572+ // so before the static value (SubStr) comes in,
573+ // we need to cache the full string
574+ subStr = co.SubStr
566575 fullStr := cachedStr + co.FullStr
567576 cachedStr = " "
568577 return strings.Count (fullStr, subStr), nil
@@ -576,48 +585,48 @@ _ _subStr = co.SubStr
576585 }), nil
577586 }
578587
579- _ // create a workflow just like a Graph_
580- _ _wf := compose.NewWorkflow [*schema.Message , map [string ]int ]()
588+ // create a workflow just like a Graph
589+ wf := compose.NewWorkflow [*schema.Message , map [string ]int ]()
581590
582- _ // add lambda c1 just like in Graph_
583- _ _wf .AddLambdaNode (" c1" , compose.TransformableLambda (wordCounter)).
584- AddInput (compose._START_ , _ // add an input from START, specifying 2 field mappings_
585- _ // map START's Message's Content field to lambda c1's FullStr field_
586- _ _compose .MapFields (" Content" , " FullStr" )).
587- _ // we can set static values even if the input will be stream_
588- _ _SetStaticValue ([]string {" SubStr" }, " o" )
591+ // add lambda c1 just like in Graph
592+ wf .AddLambdaNode (" c1" , compose.TransformableLambda (wordCounter)).
593+ AddInput (compose.START , // add an input from START, specifying 2 field mappings
594+ // map START's Message's Content field to lambda c1's FullStr field
595+ compose .MapFields (" Content" , " FullStr" )).
596+ // we can set static values even if the input will be stream
597+ SetStaticValue ([]string {" SubStr" }, " o" )
589598
590- _ // add lambda c2 just like in Graph_
591- _ _wf .AddLambdaNode (" c2" , compose.TransformableLambda (wordCounter)).
592- AddInput (compose._START_ , _ // add an input from START, specifying 2 field mappings_
593- _ // map START's Message's ReasoningContent field to lambda c1's FullStr field_
594- _ _compose .MapFields (" ReasoningContent" , " FullStr" )).
599+ // add lambda c2 just like in Graph
600+ wf .AddLambdaNode (" c2" , compose.TransformableLambda (wordCounter)).
601+ AddInput (compose.START , // add an input from START, specifying 2 field mappings
602+ // map START's Message's ReasoningContent field to lambda c1's FullStr field
603+ compose .MapFields (" ReasoningContent" , " FullStr" )).
595604 SetStaticValue ([]string {" SubStr" }, " o" )
596605
597- wf.End (). _ // Obtain the compose.END for method chaining_
598- _ // add an input from c1,_
599- _ // mapping full output of c1 to the map key 'content_count'_
600- _ _AddInput (" c1" , compose.ToField (" content_count" )).
601- _ // also add an input from c2,_
602- _ // mapping full output of c2 to the map key 'reasoning_content_count'_
603- _ _AddInput (" c2" , compose.ToField (" reasoning_content_count" ))
606+ wf.End (). // Obtain the compose.END for method chaining
607+ // add an input from c1,
608+ // mapping full output of c1 to the map key 'content_count'
609+ AddInput (" c1" , compose.ToField (" content_count" )).
610+ // also add an input from c2,
611+ // mapping full output of c2 to the map key 'reasoning_content_count'
612+ AddInput (" c2" , compose.ToField (" reasoning_content_count" ))
604613
605- _ // compile the workflow just like compiling a Graph_
606- _ _run , err := wf.Compile (context.Background ())
614+ // compile the workflow just like compiling a Graph
615+ run , err := wf.Compile (context.Background ())
607616 if err != nil {
608617 logs.Errorf (" workflow compile error: %v " , err)
609618 return
610619 }
611620
612- _ // call the workflow using Transform just like calling a Graph with Transform_
613- _ _result , err := run.Transform (context.Background (),
621+ // call the workflow using Transform just like calling a Graph with Transform
622+ result , err := run.Transform (context.Background (),
614623 schema.StreamReaderFromArray ([]*schema.Message {
615624 {
616- Role: schema._Assistant_ ,
625+ Role: schema.Assistant ,
617626 ReasoningContent: " I need to say something meaningful" ,
618627 },
619628 {
620- Role: schema._Assistant_ ,
629+ Role: schema.Assistant ,
621630 Content: " Hello world!" ,
622631 },
623632 }))
0 commit comments