Chapter 17 Girlfriend下的并发编程 —— Fork/Join组件

前面我们介绍了一系列并发类型的Job,这些Job只能在一个任务单元中执行并发,但有时我们希望并发执行子工作流,比如我们通过BufferingJob从消息队列中获取了1000条消息,我们希望接下来开启10个子工作流去平均分配处理这1000条消息,这时就需要用到Fork和Join这两种类型的工作单元了。

如下所示,Girlfriend会开启10个线程来执行ConcurrentFork和ConcurrentJoin之间的子工作流。

units = [
    Job(
        name="task_init",
        caller=sleep_task,
        args=("init", 1)
    ),
    ConcurrentFork(
        name="fork",
        thread_num=10
    ),
    Job(
        name="task_first",
        caller=sleep_task,
        args=("first", 2)
    ),
    Job(
        name="task_second",
        caller=sleep_task,
        args=("second", 3)
    ),
    ConcurrentJoin(
        name="join"
    ),
]

ConcurrentFork的参数:

  • name 单元名称
  • thread_num 执行线程数目
  • pool 指定一个外部的线程池,否则会新创建一个线程数目为thread_num的线程池。
  • pool_type 线程池类型,目前只能使用ThreadPoolExecutor,无需指定。
  • context_factory 上下文工厂,Fork出的子工作流会使用新的上下文对象,上下文工厂是一个回调函数,通过它来产生新的上下文。
  • extends_listeners 是否继承父工作流的监听器
  • listeners 子工作流的监听器列表,如果extends_listener为True,那么实际的监听器列表为父监听器列表 + 子监听器列表,如果为False,那么只有子监听器列表。

以上参数,只有name和thread_num是必须的。

ConcurrentJoin的参数:

  • name 单元名称
  • join 对各个子工作流返回的结果进行归并处理。可选。它会接受一个上下文参数和一个列表参数,列表参数的每一个元素都是子线程的最终结果。

Fork和Join是必须成对出现的,不能只使用其中的一个,Join会阻塞主线程,直到Fork出的所有子线程都执行完毕。这就跟在操作系统中,父进程需要wait子进程一个样子。Fork和Join目前不支持嵌套。

关于任务的分配

我们前面说分配1000条消息给10个子工作流,如何分配呢?前面我们在介绍上下文对象的时候,说它有一个thread_id的属性。这个属性就是用来做这件事情的,Girlfriend会为Fork单元产生的每一个子工作流分配一个thread_id,从0到thread_num - 1,可以通过该数值将任务列表分割,每个线程分配[thread_id (msg_num / 10), (thread_id + 1) (msg_num / 10))之间的消息。可以在子工作流的开始去添加一个任务,专门从父上下文中的任务列表去获得具体的子任务。

关于上下文的继承

context_factory的默认值是Context类的构造方法,尽管我们一再拿操作系统的Fork机制做类比,但是这里并没有进行COW的保护机制,通常子工作流读写的属性都是自己的,不会污染父上下文,但并不阻止去访问父上下文的属性,通过ctx.parrent属性就可以获得父上下文对象。但对于父上下文的内容,应该尽量做到只读不写。

线程池的关闭

如果通过pool属性引用了外部的线程池,那么开发者需要自己在恰当的时间去关闭线程池,如果没有引用,那么girlfriend会在join节点关闭所有Fork产生的线程。

results matching ""

    No results matching ""