Chapter 18 上下文持久化与中断恢复

假设我们正在执行一个数据可视化的任务,我们需要从数据库中读取数据,然后再进行各种聚合计算,最后将这些计算结果呈现到可视化图表上。读取数据和对数据进行计算需要花费很长的时间,然而,就在计算的最后一步,出现了ZeroDivisionError,整个工作流无法继续前进。于是我们只能停下来,定位到错误代码进行修正,修正完毕之后,再重新开始计算……因为这个小小的错误,之前的时间都浪费了。

如果我们在每个工作流单元执行完毕时,能够把当前上下文和执行进度保存起来,那么我们是不是就可以利用这些来恢复工作流,而不需要做这种重复的等待呢?

嗯,girlfriend做了这种支持。

使用Listener来对上下文进行持久化

前面我们提到,girlfriend可以通过监听器对工作流的执行事件进行监听,因此,通过编写Listener对上下文进行持久化最好不过了,有两大好处:

  1. 完全不侵入工作流引擎的具体执行。
  2. 扩展性好,只需要监听感兴趣的事件来捕获上下文数据,具体的持久化方式(比如文件方式、数据库方式等等)随你而定。

girlfreidn目前内置了基于pickle持久化的PicklePersistListener,girlfriend.workflow.persist.pickle.PicklePersisListener,该监听器会监听on_unit_start和on_finish事件,它会在每个新单元执行之前把上下文对象写入指定的文件,然后在整个工作流执行结束之后在文件中标记完成状态。

我们直接在构建工作流对象的时候添加该监听器即可:

workflow = Workflow(units);
workflow.add_listener(PicklePersistListener("dump.dat"))

PicklePersistListener("dump.dat")构造函数的参数是可选的,指定持久化文件的路径,默认为dump.dat

注意,对于使用Fork单元产生的子工作流,PicklePersistListener目前不会对其进行专门的持久化,而是将Fork和Join之间的所有内容看做是一个单元。当Fork出的某个子工作流崩溃,无法通过PicklePersistListener来对其进行恢复。

指定RecoverPolicy来恢复中断的工作流

RecoverPolicy是一个抽象的接口,该接口描述从持久化恢复工作流的抽象:

class RecoverInfo(object):

    """
    从持久化中加载的用于恢复工作流执行的信息
    """

    def __init__(self, begin_unit, context_factory):
        """
        :param begin_unit 起始单元
        :param context_factory 上下文工厂
        """
        self._begin_unit = begin_unit
        self._context_factory = context_factory

    @property
    def begin_unit(self):
        return self._begin_unit

    @property
    def context_factory(self):
        return self._context_factory

class RecoverPolicy(object):

    """
    RecoverPolicy用于为工作流的调用者提供恢复策略
    恢复策略包含两方面的内容,一个是当前工作流要执行的起始点,
    另外就是关于Context的恢复策略,需要提供一个可以恢复旧有数据的上下文工场
    """

    __metaclass__ = ABCMeta

    @abstractmethod
    def load(self):
        """用于恢复持久化的上下文信息,由工作流的驱动程序回调。
           当工作流不需要恢复时,会抛出NoNeedRecoverException
        """
        pass

load方法会返回一个RecoverInfo对象,该对象承载了工作流恢复的关键信息:1. 将要运行的起始点 2. 上下文工厂

recover_info = recover_policy.load()
workflow = Workflow(units, context_factory=recover_info.context_factory)
workflow.execute(args=Args, start_point=recover_info.begin_unit)

就类似于各种encoder和decoder,持久化Listener和RecoverPolicy也都是成对出现的。使用PicklePersistListener持久化的工作流,需要使用girlfriend.workflow.persist.pickle.PickleRecoverPolicy来进行恢复。

在gf_workflow中

只要在目标模块中加入相应的Listener和RecoverPolicy即可完成自动持久化和恢复工作:

# coding: utf-8

"""
Docs goes here
"""

from argparse import ArgumentParser
from girlfriend.workflow.gfworkflow import Job
from girlfriend.workflow.protocol import Env
from girlfriend.plugin.orm import SQL
from girlfriend.data.table import TableWrapper
from girlfriend.workflow.persist.pickle import (
    PicklePersistListener,
    PickleRecoverPolicy
)

# 命令行解析器,在这里可以定义工作流自己需要的参数
cmd_parser = ArgumentParser(description=__doc__.decode("utf-8"))
cmd_parser.add_argument("--table", "-t", dest="option",
                        default="user", action="store", help="")


# 日志设置项
logger = None # 可以指定一个字符串表示的日志文件路径,也可以是具体的Logger对象,当直接使用字符串路径时,默认将使用日期rotate策略。
logger_level = "info" # 默认输出的日志级别

# 设置监听器
listeners = [
    PicklePersistListener("filepath"),
]

# 设置恢复策略
recover_policy = PickleRecoverPolicy("filepath")

def _test_env_args(options):
    return {}


def _test_env_config(options):
    return {
        "db_hehe": {
            "connect_url": "sqlite:////Users/chihongze/gftest/gftest.db"
        }
    }


def _pro_env_args(options):
    return {}


_pro_env_config = {
    "db_hehe": {
        "connect_url": "sqlite:////Users/chihongze/hehe/hehe.db"
    }
}

# 支持的运行环境列表,不同的运行环境支持不同的配置和参数,比如测试环境和正式环境访问的数据源有所区别。
env = (
    Env("test", _test_env_args, _test_env_config),
    Env("pro", _pro_env_args, _pro_env_config),
)

# 工作流定义,可以是一个返回工作流单元序列的函数也可以直接是工作流序列,取决于工作流是否要基于命令行参数而变化。
def workflow(options):
    work_units = (
        # orm_query
        Job(
            name="orm_query",
            plugin="orm_query",
            args=[
                SQL(
                    engine_name="hehe",
                    variable_name="table",
                    sql="select * from :table_name",
                    params={"table_name": options.table}, # 这里直接使用了命令行参数
                    row_handler=None,
                    result_wrapper=TableWrapper(
                        "table_name",
                        titles=["id", "编号", "name", "姓名", "clazz", "班级"]
                    )
                ),
            ]
        ),
        # print_table
        Job(
            name="print_table",
            plugin="print_table",
            args=[
                "$table"
            ]
        ),

    )

    return work_units

gf_workflow在执行该模块时,会先检查恢复策略,如果持久化文件中的工作流未完成,则会继续,如果已完成,则会提示,需要用户来决定是否删掉持久化文件,以供再次重新运行。当然,如果持久化文件不存在,则会从头开始执行。

results matching ""

    No results matching ""