1111.. image :: https://pypip.in/d/workflow/badge.png
1212 :target: https://pypi.python.org/pypi/workflow/
1313
14- Introduction
15- ============
16-
17- I was looking for a workflow engine some time ago, and there weren't many for
18- Python. Google will show up quite a few, but:
19-
20- * They are Plone or Django or Project-X specific
21- * I found them too complicated (non-intuitive)
22- * or abandoned
23- * or any combination of the above...
24-
25- So I created my own workflow engine (alas) - but it sort of works... quite well,
26- so I haven't looked for a better alternative.
27-
28- Details
29- =======
14+ About
15+ =====
3016
31- Workflow engine is a Finite State Machine with memory.
32- It is used to execute set of methods in a specified order.
17+ Workflow is a Finite State Machine with memory. It is used to execute
18+ set of methods in a specified order.
3319
34- Here is a simple example of a configuration:
20+ Here is a simple example of a workflow configuration:
3521
3622.. code-block :: text
3723
@@ -51,298 +37,28 @@ Here is a simple example of a configuration:
5137 translate_token,
5238 ]
5339
54- You can probably guess what the processing pipeline does with tokens - the
55- whole task is made of four steps and the whole configuration is just stored
56- as a Python list. Every task is implemented as a function that takes two objects:
57-
58- * currently processed object
59- * workflow engine instance
60-
61- Example:
62-
63- .. code-block :: python
64-
65- def next_token (obj , eng ):
66- eng.ContinueNextToken()
67-
68- There are NO explicit states, conditions, transitions - the job of the
69- engine is simply to run the tasks one after another. It is the
70- responsibility of the task to tell the engine what is going to happen
71- next; whether to continue, stop, jump back, jump forward and few other
72- options.
73-
74- This is actually a *feature *, I knew that there will be a lot of possible
75- exceptions and transition states to implement for NLP processing and I also
76- wanted to make the workflow engine simple and fast -- but it has disadvantages,
77- you can make more errors and workflow engine will not warn you.
78-
79- The workflow module comes with many patterns that can be directly used in the
80- definition of the pipeline, such as IF, IF_NOT, PARALLEL_SPLIT and others.
81-
82- The individual tasks then can influence the whole pipeline, available
83- ''commands'' are:
84-
85- .. code-block :: text
86-
87- eng.stopProcessing #stops the current workflow
88- eng.haltProcessing: #halts the workflow (can be used for nested wf engines)
89- eng.continueNextToken # can be called many levels deep, jumps up to next token
90- eng.jumpTokenForward # will skip the next object and continue with the next one
91- eng.jumpTokenBack # will return back, start processing again
92- eng.jumpCallForward #in one loop [call, call...] jumps x steps forward
93- eng.jumpCallBack #in one loop [call, call...] jumps x steps forward
94- eng.breakFromThisLoop #break from this loop, but do not stop processing
95-
96- Consider this example of a task:
97-
98- .. code-block :: python
99-
100- def if_else (call ):
101- def inner_call (obj , eng ):
102- if call(obj, eng): # if True, continue processing
103- eng.jumpForward(1 )
104- else : # else, skip the next step
105- eng.jumpForward(2 )
106- return inner_call
107-
108- We can then write *workflow definition * like:
109-
110- .. code-block :: text
111-
112- if_else(stage_submission),
113- [
114- [if_else(fulltext_available), #this will be run only when fulltext is uploaded during form submission
115- [extract_metadata, populate_empty_fields],
116- [#do nothing ]],
117- [if_else(check_for_duplicates),
118- [stop_processing],
119- [synchronize_fields, replace_values]],
120- check_mandatory_fields,]
121- ],
122- [
123- check_mandatory_fields, # this will run only for 'review' stage
124- check_preferred_values,
125- save_record
126- ]
127-
128- Tasks
129- -----
130-
131- Tasks are simple python functions, we can enforce rules (not done yet!) in
132- a pythonic way using pydoc conventions, consider this:
133-
134- .. code-block :: python
135-
136- def check_duplicate (obj , eng ):
137- """
138- This task checks if the uploaded fulltext is a duplicate
139- @type obj: InspireGeneralForm
140- @precondition: obj.paths[]
141- list, list of paths to uploaded files
142- @postcondition: obj.fulltext[]
143- list containing txt for the extracted document
144- obj.duplicateids[]
145- list of inspire ids records that contain the duplicate of this document
146- @raise: stopProcessing on error
147- @return: True if duplicate found
148-
149- """
150- ...
151-
152- So using the python docs, we can instruct workflow engine what types of
153- arguments are acceptable, what is the expected outcome and what happens
154- after the task finished. And let's say, there will be a testing framework
155- which will run the workflow pipeline with fake arguments and will test all
156- sorts of conditions. So, the configuration is not cluttered with states
157- and transitions that are possible, developers can focus on implementation
158- of the individual tasks, and site admins should have a good understanding
159- what the task is supposed to do -- the description of the task will be
160- displayed through the web GUI.
161-
162- Some examples
163- -------------
164-
165- Here are some examples of workflow patterns (images are from
166- `http://www.yawlfoundation.org `_) and their implementation in
167- Python. This gives you an idea that workflow engine remains very
168- simple and by supplying special functions, we can implement different
169- patterns.
170-
171-
172- .. image :: http://www.yawlfoundation.org/images/patterns/basic_ps.jpg
173-
174- This pattern is called Parallel split (as tasks B,C,D are all started in
175- parallel after task A). It could be implemented like this:
176-
177- .. code-block :: python
178-
179- def PARALLEL_SPLIT (* args ):
180- """
181- Tasks A,B,C,D... are all started in parallel
182- @attention: tasks A,B,C,D... are not addressable, you can't
183- you can't use jumping to them (they are invisible to
184- the workflow engine). Though you can jump inside the
185- branches
186- @attention: tasks B,C,D... will be running on their own
187- once you have started them, and we are not waiting for
188- them to finish. Workflow will continue executing other
189- tasks while B,C,D... might be still running.
190- @attention: a new engine is spawned for each branch or code,
191- all operations works as expected, but mind that the branches
192- know about themselves, they don't see other tasks outside.
193- They are passed the object, but not the old workflow
194- engine object
195- @postcondition: eng object will contain lock (to be used
196- by threads)
197- """
198-
199- def _parallel_split (obj , eng , calls ):
200- lock= thread.allocate_lock()
201- i = 0
202- eng.setVar(' lock' , lock)
203- for func in calls:
204- new_eng = duplicate_engine_instance(eng)
205- new_eng.setWorkflow([lambda o ,e : e.setVar(' lock' , lock), func])
206- thread.start_new_thread(new_eng.process, ([obj], ))
207- # new_eng.process([obj])
208- return lambda o , e : _parallel_split(o, e, args)
209-
210-
211- And is used like this:
212-
213- .. code-block :: python
214-
215- from workflow.patterns import PARALLEL_SPLIT
216- from my_module_x import task_a,task_b,task_c,task_d
217-
218- [
219- task_a,
220- PARALLEL_SPLIT(task_b,task_c,task_d)
221- ]
222-
223- Arbitrary cycle(s)
224- ------------------
225-
226- .. image :: http://www.yawlfoundation.org/images/patterns/struc_arb.jpg
227-
228- This is just for your amusement (and to see how complicated it looks in the
229- configuration).
230-
231-
232- .. code-block :: text
233-
234- #!python
235- [
236- ... #here some conditional start
237- task_a,
238- task_b,
239- task_c,
240- if_else(some_test),
241- [task_d, [if_else(some_test),
242- lambda obj, eng: eng.jumpCallBack(-6), #jump back to task_a
243- some_other_task,
244- ]]
245- [some_other_task],
246- ...
247- ]
248-
249- .. admonition :: TODO
250-
251- Jumping back and forward is obviously dangerous and tedious
252- (depending on the actual configuration), we need a better solution.
253-
254- Synchronization
255- ---------------
256-
257- .. image :: http://www.yawlfoundation.org/images/patterns/basic_synch.jpg
258-
259- After the execution of task B, task C, and task D, task E can be executed
260- (I will present the threaded version, as the sequential version would be dead simple).
261-
262- .. code-block :: python
40+ Documentation
41+ =============
26342
264- def SYNCHRONIZE (* args , ** kwargs ):
265- """
266- After the execution of task B, task C, and task D, task E can be executed.
267- @var *args: args can be a mix of callables and list of callables
268- the simplest situation comes when you pass a list of callables
269- they will be simply executed in parallel.
270- But if you pass a list of callables (branch of callables)
271- which is potentionally a new workflow, we will first create a
272- workflow engine with the workflows, and execute the branch in it
273- @attention: you should never jump out of the synchronized branches
274- """
275- timeout = MAX_TIMEOUT
276- if ' timeout' in kwargs:
277- timeout = kwargs[' timeout' ]
43+ Documentation is readable at http://workflow.readthedocs.org or can be built using Sphinx: ::
27844
279- if len (args) < 2 :
280- raise Exception ( ' You must pass at least two callables ' )
45+ pip install Sphinx
46+ python setup.py build_sphinx
28147
282- def _synchronize (obj , eng ):
283- queue = MyTimeoutQueue()
284- # spawn a pool of threads, and pass them queue instance
285- for i in range (len (args)- 1 ):
286- t = MySpecialThread(queue)
287- t.setDaemon(True )
288- t.start()
289-
290- for func in args[0 :- 1 ]:
291- if isinstance (func, list ) or isinstance (func, tuple ):
292- new_eng = duplicate_engine_instance(eng)
293- new_eng.setWorkflow(func)
294- queue.put(lambda : new_eng.process([obj]))
295- else :
296- queue.put(lambda : func(obj, eng))
297-
298- # wait on the queue until everything has been processed
299- queue.join_with_timeout(timeout)
300-
301- # run the last func
302- args[- 1 ](obj, eng)
303- _synchronize.__name__ = ' SYNCHRONIZE'
304- return _synchronize
305-
306-
307- Configuration (i.e. what would admins write):
308-
309- .. code-block :: text
48+ Installation
49+ ============
31050
311- from workflow.patterns import SYNCHRONIZE
312- from my_module_x import task_a,task_b,task_c,task_d
51+ Workflow is on PyPI so all you need is: ::
31352
314- [
315- synchronize(task_b,task_c,task_d, task_a)
316- ]
53+ pip install workflow
31754
31855Testing
31956=======
32057
321- Running the test suite is as simple as:
322-
323- .. code-block :: console
324-
325- $ python setup.py test
326-
327- on Windows, you may want to do:
328-
329- .. code-block :: console
330-
331- $ python setup.py test --pytest-args=tests
332-
333- or, to also show code coverage:
334-
335- .. code-block :: console
336-
337- $ ./run-tests.sh
338-
339- TODO
340- ====
341-
342- .. admonition :: TODO
58+ Running the test suite is as simple as: ::
34359
344- There already exist a web-based GUI for construction of the workflow, publish it!
60+ python setup.py test
34561
346- Fix the bin/run-workflow.py script for executing the workflows.
62+ or, to also show code coverage: ::
34763
348- Explain how the workflows can be saved and organized, embedded.
64+ ./run-tests.sh
0 commit comments