Chapter 17 Girlfriend下的并发编程 —— Fork/Join组件
前面我们介绍了一系列并发类型的Job,这些Job只能在一个任务单元中执行并发,但有时我们希望并发执行子工作流,比如我们通过BufferingJob从消息队列中获取了1000条消息,我们希望接下来开启10个子工作流去平均分配处理这1000条消息,这时就需要用到Fork和Join这两种类型的工作单元了。
如下所示,Girlfriend会开启10个线程来执行ConcurrentFork和ConcurrentJoin之间的子工作流。
units = [
Job(
name="task_init",
caller=sleep_task,
args=("init", 1)
),
ConcurrentFork(
name="fork",
thread_num=10
),
Job(
name="task_first",
caller=sleep_task,
args=("first", 2)
),
Job(
name="task_second",
caller=sleep_task,
args=("second", 3)
),
ConcurrentJoin(
name="join"
),
]
ConcurrentFork的参数:
- name 单元名称
- thread_num 执行线程数目
- pool 指定一个外部的线程池,否则会新创建一个线程数目为thread_num的线程池。
- pool_type 线程池类型,目前只能使用ThreadPoolExecutor,无需指定。
- context_factory 上下文工厂,Fork出的子工作流会使用新的上下文对象,上下文工厂是一个回调函数,通过它来产生新的上下文。
- extends_listeners 是否继承父工作流的监听器
- listeners 子工作流的监听器列表,如果extends_listener为True,那么实际的监听器列表为父监听器列表 + 子监听器列表,如果为False,那么只有子监听器列表。
以上参数,只有name和thread_num是必须的。
ConcurrentJoin的参数:
- name 单元名称
- join 对各个子工作流返回的结果进行归并处理。可选。它会接受一个上下文参数和一个列表参数,列表参数的每一个元素都是子线程的最终结果。
Fork和Join是必须成对出现的,不能只使用其中的一个,Join会阻塞主线程,直到Fork出的所有子线程都执行完毕。这就跟在操作系统中,父进程需要wait子进程一个样子。Fork和Join目前不支持嵌套。
关于任务的分配
我们前面说分配1000条消息给10个子工作流,如何分配呢?前面我们在介绍上下文对象的时候,说它有一个thread_id的属性。这个属性就是用来做这件事情的,Girlfriend会为Fork单元产生的每一个子工作流分配一个thread_id,从0到thread_num - 1,可以通过该数值将任务列表分割,每个线程分配[thread_id (msg_num / 10), (thread_id + 1) (msg_num / 10))之间的消息。可以在子工作流的开始去添加一个任务,专门从父上下文中的任务列表去获得具体的子任务。
关于上下文的继承
context_factory的默认值是Context类的构造方法,尽管我们一再拿操作系统的Fork机制做类比,但是这里并没有进行COW的保护机制,通常子工作流读写的属性都是自己的,不会污染父上下文,但并不阻止去访问父上下文的属性,通过ctx.parrent属性就可以获得父上下文对象。但对于父上下文的内容,应该尽量做到只读不写。
线程池的关闭
如果通过pool属性引用了外部的线程池,那么开发者需要自己在恰当的时间去关闭线程池,如果没有引用,那么girlfriend会在join节点关闭所有Fork产生的线程。