Chapter 18 上下文持久化与中断恢复
假设我们正在执行一个数据可视化的任务,我们需要从数据库中读取数据,然后再进行各种聚合计算,最后将这些计算结果呈现到可视化图表上。读取数据和对数据进行计算需要花费很长的时间,然而,就在计算的最后一步,出现了ZeroDivisionError,整个工作流无法继续前进。于是我们只能停下来,定位到错误代码进行修正,修正完毕之后,再重新开始计算……因为这个小小的错误,之前的时间都浪费了。
如果我们在每个工作流单元执行完毕时,能够把当前上下文和执行进度保存起来,那么我们是不是就可以利用这些来恢复工作流,而不需要做这种重复的等待呢?
嗯,girlfriend做了这种支持。
使用Listener来对上下文进行持久化
前面我们提到,girlfriend可以通过监听器对工作流的执行事件进行监听,因此,通过编写Listener对上下文进行持久化最好不过了,有两大好处:
- 完全不侵入工作流引擎的具体执行。
- 扩展性好,只需要监听感兴趣的事件来捕获上下文数据,具体的持久化方式(比如文件方式、数据库方式等等)随你而定。
girlfreidn目前内置了基于pickle持久化的PicklePersistListener,girlfriend.workflow.persist.pickle.PicklePersisListener
,该监听器会监听on_unit_start和on_finish事件,它会在每个新单元执行之前把上下文对象写入指定的文件,然后在整个工作流执行结束之后在文件中标记完成状态。
我们直接在构建工作流对象的时候添加该监听器即可:
workflow = Workflow(units);
workflow.add_listener(PicklePersistListener("dump.dat"))
PicklePersistListener("dump.dat")构造函数的参数是可选的,指定持久化文件的路径,默认为dump.dat
注意,对于使用Fork单元产生的子工作流,PicklePersistListener目前不会对其进行专门的持久化,而是将Fork和Join之间的所有内容看做是一个单元。当Fork出的某个子工作流崩溃,无法通过PicklePersistListener来对其进行恢复。
指定RecoverPolicy来恢复中断的工作流
RecoverPolicy是一个抽象的接口,该接口描述从持久化恢复工作流的抽象:
class RecoverInfo(object):
"""
从持久化中加载的用于恢复工作流执行的信息
"""
def __init__(self, begin_unit, context_factory):
"""
:param begin_unit 起始单元
:param context_factory 上下文工厂
"""
self._begin_unit = begin_unit
self._context_factory = context_factory
@property
def begin_unit(self):
return self._begin_unit
@property
def context_factory(self):
return self._context_factory
class RecoverPolicy(object):
"""
RecoverPolicy用于为工作流的调用者提供恢复策略
恢复策略包含两方面的内容,一个是当前工作流要执行的起始点,
另外就是关于Context的恢复策略,需要提供一个可以恢复旧有数据的上下文工场
"""
__metaclass__ = ABCMeta
@abstractmethod
def load(self):
"""用于恢复持久化的上下文信息,由工作流的驱动程序回调。
当工作流不需要恢复时,会抛出NoNeedRecoverException
"""
pass
load方法会返回一个RecoverInfo对象,该对象承载了工作流恢复的关键信息:1. 将要运行的起始点 2. 上下文工厂
recover_info = recover_policy.load()
workflow = Workflow(units, context_factory=recover_info.context_factory)
workflow.execute(args=Args, start_point=recover_info.begin_unit)
就类似于各种encoder和decoder,持久化Listener和RecoverPolicy也都是成对出现的。使用PicklePersistListener持久化的工作流,需要使用girlfriend.workflow.persist.pickle.PickleRecoverPolicy
来进行恢复。
在gf_workflow中
只要在目标模块中加入相应的Listener和RecoverPolicy即可完成自动持久化和恢复工作:
# coding: utf-8
"""
Docs goes here
"""
from argparse import ArgumentParser
from girlfriend.workflow.gfworkflow import Job
from girlfriend.workflow.protocol import Env
from girlfriend.plugin.orm import SQL
from girlfriend.data.table import TableWrapper
from girlfriend.workflow.persist.pickle import (
PicklePersistListener,
PickleRecoverPolicy
)
# 命令行解析器,在这里可以定义工作流自己需要的参数
cmd_parser = ArgumentParser(description=__doc__.decode("utf-8"))
cmd_parser.add_argument("--table", "-t", dest="option",
default="user", action="store", help="")
# 日志设置项
logger = None # 可以指定一个字符串表示的日志文件路径,也可以是具体的Logger对象,当直接使用字符串路径时,默认将使用日期rotate策略。
logger_level = "info" # 默认输出的日志级别
# 设置监听器
listeners = [
PicklePersistListener("filepath"),
]
# 设置恢复策略
recover_policy = PickleRecoverPolicy("filepath")
def _test_env_args(options):
return {}
def _test_env_config(options):
return {
"db_hehe": {
"connect_url": "sqlite:////Users/chihongze/gftest/gftest.db"
}
}
def _pro_env_args(options):
return {}
_pro_env_config = {
"db_hehe": {
"connect_url": "sqlite:////Users/chihongze/hehe/hehe.db"
}
}
# 支持的运行环境列表,不同的运行环境支持不同的配置和参数,比如测试环境和正式环境访问的数据源有所区别。
env = (
Env("test", _test_env_args, _test_env_config),
Env("pro", _pro_env_args, _pro_env_config),
)
# 工作流定义,可以是一个返回工作流单元序列的函数也可以直接是工作流序列,取决于工作流是否要基于命令行参数而变化。
def workflow(options):
work_units = (
# orm_query
Job(
name="orm_query",
plugin="orm_query",
args=[
SQL(
engine_name="hehe",
variable_name="table",
sql="select * from :table_name",
params={"table_name": options.table}, # 这里直接使用了命令行参数
row_handler=None,
result_wrapper=TableWrapper(
"table_name",
titles=["id", "编号", "name", "姓名", "clazz", "班级"]
)
),
]
),
# print_table
Job(
name="print_table",
plugin="print_table",
args=[
"$table"
]
),
)
return work_units
gf_workflow在执行该模块时,会先检查恢复策略,如果持久化文件中的工作流未完成,则会继续,如果已完成,则会提示,需要用户来决定是否删掉持久化文件,以供再次重新运行。当然,如果持久化文件不存在,则会从头开始执行。