Chapter 02 工作流
在上一章中,我们提到了工作流是girlfriend的核心。本章我们来详细介绍工作流的各个组成部分。
首先是结构性组件,它们执行具体的工作并控制流程的走向,就像编程语言中的条件和循环语句一样:
- Job - 任务执行的具体单元,拥有很多子类,Job是工作流中的主力队员,它可以执行插件,也可以执行某个函数,可以循环执行,也可以按照不同的方式异步执行。
- Decision - 决策单元,通过目前的上下文来决定任务走向。
- Fork - 分支单元,并不是逻辑判断产生的分支,而是可以开启一个或多个新的进程/线程/协程来执行子工作流,子工作流会继承父工作流的上下文,有点跟操作系统fork一个新进程类似。
- Join - Fork单元的结束单元,与Fork总是成对出现,汇集处理Fork执行的结果,往往会阻塞父工作流进程直到所有fork出的子工作流完成为止,类似于操作系统中父进程wait子进程的概念。
- End - 结束节点,当工作流执行到该节点时,意味着整个工作流都结束了。End节点会携带工作流执行的成功/失败状态,如果成功,还会返回工作流最后执行的结果值。如果异常,则会携带相关的异常信息和堆栈信息。
以上就是workflow的结构性组件了,除了这些结构性的组件之外,还包括以下辅助性质的组件:
- Context - 上下文对象,用于在上述各个结构组件之间传递数据。
- Listener - 事件监听器,girlfriend将工作流执行的过程划分为一系列的生命周期事件,比如工作流启动、准备进入新的工作单元、工作单元运行结束、发生异常、工作流执行完毕等等,用户可以用监听器对这些事件按自己的需要作出灵活的处理。
让我们来看一个简单的工作流,将学生基本信息和成绩两组数据包装成Table对象,并将这两个Table对象inner join成一个新的Table,先在终端中将它们像MySQL客户端那样的表格格式输出,然后再导出包含三个Sheet的Excel文件:
score_workflow.py
# coding: utf-8
from girlfriend.data.table import TableWrapper
from girlfriend.workflow.gfworkflow import Job
from girlfriend.plugin.excel import SheetW
class Student(object):
def __init__(self, id_, name, gender, birth):
self.id, self.name = id_, name
self.gender, self.birth = gender, birth
students = [
Student(1, u"小明", u"男", "1989-11-07"),
Student(2, u"小红", u"女", "1989-08-10"),
Student(3, u"小华", u"男", "1990-02-01")
]
score_data = [
[1, u"小明", 102, 110, 130],
[2, u"小红", 110, 98, 121],
[3, u"小华", 98, 120, 130],
]
def _init_tables(context):
context["student_table"] = TableWrapper(
name=u"学生表",
titles=(
"id", u"编号",
"name", u"姓名",
"gender", u"性别",
"birth", u"生日"
)
)(students)
context["score_table"] = TableWrapper(
name=u"成绩表",
titles=(
"id", u"编号",
"name", u"姓名",
"chinese", u"语文",
"english", u"英语",
"math", u"数学"
)
)(score_data)
workflow = (
Job(
name="init_table",
caller=_init_tables
),
Job(
name="join_table",
plugin="join_table",
args={
"way": "inner",
"left": "student_table",
"right": "score_table",
"on": "id=id",
"fields": ("l.id", "l.name", "l.gender", "l.birth",
"r.chinese", "r.math", "r.english"),
"name": u"学生成绩表"
}
),
Job(
name="print_table",
plugin="print_table",
args=["$student_table", "$score_table", "$join_table.result"]
),
Job(
name="write_excel",
plugin="write_excel",
args=(
u"学生成绩.xlsx",
[
SheetW("join_table.result", sheet_handler=None),
SheetW("student_table"),
SheetW("score_table")
]
),
),
)
将上面的文件保存后,命令行中执行gf_workflow -m score_workflow.py
即可看到输出结果,当前目录下也会有一个"学生成绩.xlsx"的Excel文件。
这段代码的重点在于workflow这个元组,其中定义了要依次执行的工作单元,每个单元都是通过唯一的name属性来标识。第一个工作单元是调用函数_init_tables
,将不同格式的序列统一包装成了Table对象,并保存在context中,context的行为与字典是类似的,通过它为后续的操作传递数据;第二个工作单元是调用join_table插件,这个插件可以把两个table用inner、left、right三种方式join起来,这里采用了inner join的方式,left和right参数分别指定要join的表,on指定join条件,即左表的id字段等于右表的id字段,fields参数指定了join后表格拥有的列,比如,l.id表示左表的id;第三个单元执行print_table插件,print_table插件可以一口气打印出多个table对象;最后一个单元就是导出excel了,每个SheetW对象表示workbook中的一个Sheet,Table对象可以直接转换为Excel sheet,Table的name属性作为sheet的名称,Table的标题会自动作为表格的第一行。
后续我们会详细介绍这些插件的高级用法,比如为Excel添加图表和样式、更多的表格转化操作等,这里只是先了解一下工作流的结构。
Job
Job有很多的种类,在这里我们先来介绍最为主要的 girlfriend.workflow.gfworkflow.Job
,该类型的Job可以接受两种方式来运行,一种是通过caller
参数来指定要运行的函数,一种是通过plugin
参数来指定要运行的插件名,两者都是通过args参数来指定函数或者插件运行时所需要的参数,args可以是列表、元组或者字典。caller函数的第一个参数必须接受注入的上下文对象,尽管有时可能用不到。
>>> from girlfriend.workflow.gfworkflow import Job, Context
>>> job = Job(name="test", caller=lambda ctx, a, b: a + b, args=(1,2))
>>> ctx = Context()
>>> job.execute(ctx)
3
>>> ctx["test.result"]
3
如上所示,你可以通过execute方法单独运行一个Job,execute会返回caller或者插件的计算结果,同时这个结果也会被自动写入上下文,key的形式为单元名称.result
,方便在工作流中被其它的工作单元所引用。
可以使用字典来构建key-arg形式的参数:
>>> job = Job(name="test", caller=lambda ctx, a, b: a + b, args={"a":1, "b":2})
>>> job.execute(ctx)
3
除了在参数中可以使用像整数、字符串这样固定的值之外,你还可以引用上下文中的对象作为参数:
>>> Job(name="test", caller=lambda ctx, a, b: a + b, args=(1,2)).execute(ctx)
3
>>> Job(name="test2", caller=lambda ctx, a, b: a * b, args=(3, "$test.result")).execute(ctx)
9
注意第二个Job的第二个参数$test.result
,在参数中出现以$符号开头的字符串会自动去引用上下文中的变量。那要是我确实有个字符串是以$开头怎么办?那就需要使用两个$符号来进行转义了:
>>> Job(name="connect_string", caller=lambda ctx, word: "Hello, " + word, args=["$$Sam"]).execute(ctx)
'Hello, $Sam'
args除了接收列表、元组和字典外,还可以接受函数:
>>> def myargs(ctx):
... return (1, ctx["a"])
...
>>> ctx = Context()
>>> ctx["a"] = 2
>>> Job(name="add", caller=lambda ctx, a, b: a + b, args=myargs).execute(ctx)
3
这在一些需要根据当前工作流运行状况去动态生成参数的场景是非常有用的。另外,如果函数中包含了yield语句,那么Job将会循环执行,每执行一次yield,就会以这次生成的数据为参数执行一次Job:
>>> def yield_args(ctx):
... for i in xrange(0, 10):
... yield (i, ctx["a"])
...
>>> Job(name="add", caller=lambda ctx, a, b: a + b, args=yield_args).execute(ctx)
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
>>> ctx["add.result"]
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
上面的add Job总共执行了十次,execute方法会把每次执行的结果自动收集到一个列表中。
在工作流中,Job是默认一个接一个的执行的,如果你想跳过中间的某一个,那么可以指定一个goto参数,参数的值为接下来想要执行的单元,可以是Job,也可以是Decision、Fork或者是End。
test_goto.py
from girlfriend.workflow.gfworkflow import (
Workflow,
Job,
)
def caller(ctx, num):
if "a" not in ctx:
ctx["a"] = num
else:
ctx["a"] += num
return ctx["a"]
workflow = (
Job(
name="job1",
caller=caller,
args=(1,),
goto="job3"
),
Job(
name="job2",
args=(2,),
caller=caller,
),
Job(
name="job3",
args=(3,),
caller=caller
)
)
end = Workflow(workflow).execute()
print end.result
这个脚本不必通过gf_workflow工具运行,直接可以通过python ./test_goto.py
执行。最后输出的结果是4,而不是6,因为job1直接跳到了job3,中间的job2没有运行。通常我们不必在Job当中指定goto参数,而是通过Decision单元来决定工作流的走向。
关于Job,就先介绍这些,还有很多种,像是ConcurrentJob、ConcurrentForeachJob、BufferingJob等会在后续的章节中详细介绍。
Context
在上面的例子中,我们已经接触到了上下文对象。当一个工作流开始执行时,就会自动创建一个工作流对象,工作流结束时,如果不是持久存储或者是依赖于其它组件(可以扩展基于Redis或者MySQL的上下文对象,用于分布式场景),上下文对象就会被销毁了,另外上下文对象可以继承另一个上下文对象的值,这在Fork子工作流的时候非常有用。在传递数据方面,Context对象跟字典表现的并无二致,可以使用get获取一个值,可以使用in来判断某个键是否存在,也可以用del来删除一个值。但是除了传递数据,Context还有一个重要的作用,那就是集成了一些辅助工具:可以从上下文对象中获取上层的logger或者获取一个插件,这样就可以在caller中也能调用插件了,弥补有时候插件功能的一些不足。
Context中的属性:
属性 | 说明 |
---|---|
config | 配置对象 |
logger | 这是工作流中公用的logger,如果你有特殊业务需求,需要单独打日志,那么需要自己构建logger |
current_unit | 当前运行的单元名称,这个通常用于监听器 |
current_unittype | 当前运行的单元类型 |
parrent | 父上下文,用于子工作流中获取父上下文相关数据 |
thread_id | 当前线程标识,通常用于Fork |
args | 运行时参数,后续介绍 |
使用上下文中的日志工具
import logging
from girlfriend.workflow.gfworkflow import (
Workflow,
Job,
)
logger = logging.getLogger('test_logger')
logger.setLevel(logging.INFO)
file_handler = logging.FileHandler('test.log')
logger.addHandler(file_handler)
def caller(ctx, num):
if "a" not in ctx:
ctx["a"] = num
else:
ctx["a"] += num
ctx.logger.info("current a is {}".format(ctx["a"]))
return ctx["a"]
workflow = (
Job(
name="job1",
caller=caller,
args=(1,),
goto="job3"
),
Job(
name="job2",
args=(2,),
caller=caller,
),
Job(
name="job3",
args=(3,),
caller=caller
)
)
end = Workflow(workflow, logger=logger).execute()
print end.result
通过context.logger,就可以使用在Workflow构造函数中所指定的日志对象了。无论是自定义插件,还是使用caller函数,如果想输出日志,那么通常情况不必再去自己创建logger,使用上下文中的logger即可,并且通过这样做,日志的输出位置和形式可以由最终用户决定。
Decision
Decision单元要比Job单元简单的多,它主要是用来决定程序的走向问题。它只包含了一个单元名称和一个接受上下文参数的函数,通过执行这个函数的返回值来动态决定下一步工作流的走向,下面是一个Decision的使用示例,根据用户在命令行指定的参数,来决定是否执行生成Excel的单元:
test_decision.py
# coding: utf-8
import argparse
from girlfriend.workflow.gfworkflow import (
Job,
Decision,
)
from girlfriend.data.table import TableWrapper
from girlfriend.plugin.excel import SheetW
cmd_parser = argparse.ArgumentParser()
cmd_parser.add_argument(
"--gen-excel", dest="gen_excel",
action="store_true", help=u"是否生成Excel")
def _init_table(ctx):
return TableWrapper(
name=u"students",
titles=("id", u"编号", "name", u"姓名", "grade", u"年级")
)([(1, "Sam", 1), (2, "Jack", 2), (3, "James", 3)])
def workflow(options):
return (
Job(name="init_table", caller=_init_table),
Job(name="print_table", args=["$init_table.result"]),
Decision(
"need_write_excel",
lambda ctx: "write_excel" if options.gen_excel else "end"
),
Job(name="write_excel", args=(
"students.xlsx", [SheetW("init_table.result")]))
)
运行这个工作流:
gf_workflow -m test_decision.py
或者
gf_workflow -m test_decision.py --gen-excel
gf_workflow工具会自动完成cmd_parser的解析工作,这里只要声明工作流所需要的命令行参数就可以了,gf_workflow会把解析后的参数注入到workflow函数,这样就可以让Decision节点基于命令行参数动态的做决策,如果指定了gen-excel选项,那么会执行write_excel单元,否则的话就结束,"end"是一个特殊值,如果将goto或者Decision的返回结果设置为end,那么工作流会立即终止,所以不要指定任何单元的名称为"end"。
- 这里的print_table和write_excel两个Job都没有指定plugin属性,如果不指定plugin,那么将会自动把单元名称作为插件名。
Fork / Join
我们将在并发相关的章节来介绍Fork Join,这里先不去介绍。
End
End节点表示工作流的结束,通常不需要显式指定,系统会自动在工作单元的末尾加入End节点,或者通过将goto设置为"end"以及通过Decision返回"end"来结束执行。工作流引擎在执行完毕之后会将End节点返回,这时End节点中包含了工作流运行的成功失败状态:
test_end.py
import traceback
from girlfriend.workflow.protocol import End
from girlfriend.workflow.gfworkflow import Job, Workflow
workflow = (
Job(name="test_1", caller=lambda ctx: 1 + 1),
Job(name="test_2", caller=lambda ctx: 1 / 0),
)
end = Workflow(workflow).execute()
print "\n==== end status ===="
print "Error happened: ", end.status == End.STATUS_ERROR_HAPPENED
print "Exc type: ", end.exc_type
print "Exc value: ", end.exc_value
print "Traceback: \n", traceback.print_tb(end.tb)
这里只有一个发生错误的Job,通过 python ./test_end.py
来执行,就可以看到Workflow在执行的时候并不会raise出异常,而是终止工作流并返回一个ErrorEnd对象,该对象里面包含了异常的完整信息。
在不发生错误的情况下,End通过result属性返回最后一个单元的执行结果,但是如果工作流是用于计算场景的,想对最后的返回结果进行一些加工,那么可以显式指定一个End,通过execute参数注册回调函数来对最终结果进行加工:
from girlfriend.workflow.protocol import OkEnd
from girlfriend.workflow.gfworkflow import Job, Workflow
workflow = (
Job(name="test_1", caller=lambda ctx: 2),
OkEnd(name="myend", execute=lambda ctx: 3)
)
end = Workflow(workflow).execute()
print "==== end status ===="
print "Result: ", end.result
可以看到最终的执行结果是3而不是2。
Workflow
我们前面已经多次接触了Workflow对象,通过接受一个列表或者元组定义的单元序列,调用其中的execute方法就可以执行并返回结果。
Workflow是工作流的执行引擎,除了维护必要的配置信息、插件管理器和工作单元序列之外,没有其余的额外状态,也就是说通常你可以把一个Workflow对象应用于并发环境中。
除了工作流序列,Workflow的构造函数还接受很多其他的参数,比如上下文工厂,一般情况下,我们并不需要去直接构造Workflow对象,而是通过一些Facade性质的工具比如gf_workflow来运行工作流,这些工具封装了在不同环境下构建Workflow对象的细节。
这里需要重点说明的是execute方法,它接受三个可选的参数:
args:girlfriend将工作单元的参数分为两大类,一类是模板参数,另一类是运行时参数,我们上面看到的跟Job紧密绑定在一起的都是模板参数,它们往往是固定的形式;而通过execute方法指定的参数可以随着每次运行而改变。
start_point:运行的起始点,默认将工作序列中的第一个单元作为起始点,如果你不希望这么做,可以通过start_point参数来指定想要的起始点。
end_point:结束单元,同样,你可以在这里指定工作流运行的最后一个单元,不过只能是Job或者Join类型。
运行时参数
下面我们通过一个示例来说明运行时参数的作用,假设我们将girlfriend运行在web.py环境之中,通过web请求来执行工作流:
import web
from girlfriend.workflow.gfworkflow import Workflow, Job
workflow = Workflow((
Job("test_1", caller=lambda ctx, a: 1 + a),
Job("test_2", caller=lambda ctx, a, b: a * b, args={
"a": 3
})
))
class Test(object):
def GET(self):
param = web.input()
end = workflow.execute(args={
"test_1": [int(param.get("a", 0))],
"test_2": {"b": int(param.get("b", 0))}
})
return end.result
app = web.application(("/test", Test))
app.run()
这样我们就可以即时的通过请求来向工作流传递参数,而不必每次都重新生成工作流序列,运行时参数接受列表、元组和字典形式,需要注意的是,运行时参数的类型必须与模板参数的类型一致(如果有模板参数的话),否则会出错。如果是序列类型的参数,girlfriend会直接使用运行时参数替换模板参数,如果是字典参数,那么会基于模板参数执行一个update操作,用运行时参数覆盖已经存在的值。在实践中需要注意以上两种不同的参数替换行为。
Listener
通过继承girlfriend.workflow.protocol.AbstractListener
类来构建监听器,选择覆盖以下方法来监听对应的事件:
方法 | 接受参数 | 说明 |
---|---|---|
on_start | context | 在工作流正式之前执行该方法,可以在此做一些准备工作 |
on_unit_start | context | 在执行一个工作单元之前会调用该方法 |
on_unit_finish | context | 在调用一个工作单元完毕后会调用该方法 |
on_error | context, exc_type, exc_valye, traceback | 当有错误发生时执行该方法 |
on_finish | context | 整个工作流运行完毕时调用该方法,需要注意的是因为错误结束时不会调用 |
不必实现全部的方法,只选择实现感兴趣的事件即可。通过Workflow对象的add_listener方法来注册监听器:
workflow.add_listener(listener_a)
workflow.add_listener(listener_b)
...
事件发生时,监听器会按照被注册的顺序依次执行。
需要特别的注意的是,add_listener可以接受一个监听器类作为参数,也可以接受一个具体的对象作为参数,但这两者的生命周期是有不同的,当接受一个类作为参数时,每次执行工作流,都会创建一个新的监听器;如果接受一个对象作为参数时,每次执行工作流,这个对象是持久不变的。这在并发场景时尤为重要,如果你的监听器是线程安全的,你可以放心传递一个已经构建好的对象,如果不是线程安全,或者其中维护了一系列的状态,那么请传递监听器类,由执行引擎在每次运行时自动构建,当然,这个监听器类的构造方法需要是无参的。