Chapter 16 Girlfriend下的并发编程 —— 并发Job(ConcurrentJob、ConcurrentForeachJob、BufferingJob)

在Python开发中,我们一般会通过三种方式来实现并发:多线程、多进程、协程。Girlfriend内置的并发组件目前仅仅提供对多线程的支持。

众所周知,Python的多线程由于GIL问题不能利用多核进行计算,但这也并不意味着线程机制在Python中就一无是处,因为多线程的目的并不仅仅是充分利用CPU,它还可以隔离任务以及避免阻塞带来的响应时间缓慢(嗯,是的,这事儿协程可以做的更好)。

Girlfriend通过并行Job和Fork/Join这些组件来对并发提供支持,虽然目前仅支持线程,但是在此框架之下今后对协程和多进程提供支持也是很容易的。

ConcurrentJob

假设有多个Job需要运行,并且这些Job互补依赖,那么我们就会想,是否可以将这些Job并发执行呢?ConcurrentJob组件就提供了这种支持,它可以将多个Job打包在一起,通过线程池的方式来并发执行。

>>> import time
>>> from girlfriend.workflow.gfworkflow import Job
>>> from girlfriend.workflow.gfworkflow import Context
>>> from girlfriend.workflow.concurrent import ConcurrentJob
>>> def task(context, number):
...     print "begin task: {}".format(number)
...     time.sleep(5)
...     print "end task: {}".format(number)
...     return number
... 
>>> job = ConcurrentJob(
...     name="test",
...     sub_jobs=[
...         Job(
...             name="job1",
...             caller=task,
...             args=[1]
...         ),
...         Job(
...             name="job2",
...             caller=task,
...             args=[2]
...         ),
...         Job(
...             name="job3",
...             caller=task,
...             args=[3]
...         )
...     ],
... )
>>> 
>>> ctx = Context()
>>> job.execute(ctx)
begin task: 1
begin task: 2
begin task: 3
end task: 1
end task: 2
end task: 3
[1, 2, 3]

如果单独执行,这三个任务会总共等待15秒,但通过ConcurrentJob进行包装,只需要5秒就会执行完毕。

ConcurrentJob支持的参数:

  • name Job名称
  • sub_jobs 并行任务列表
  • pool_type 线程池类型,目前只支持ThreadPoolExecutor,不必指定。
  • pool 使用的线程池,如果你的工作流中有多个ConcurrentJob,那么就可以构建一个线程池共享,如果不指定,那么将为sub_jobs中的每个Job都构建一个线程,如果你的任务特别多,那么需要注意这里,避免构建太多线程造成溢出。
  • join 可以指定一个回调函数对最终结果进行加工。
  • error_action 该选项用于指定错误处理动作,如果指定为stop,那么只要其中一个子任务出现错误就会立即终止工作流,并返回错误结果,如果指定为continue,那么会忽略错误并继续执行。默认为stop,无论如何,错误都会被记录到日志中去。
  • error_default_value 当子任务出现异常,并且error_action为continue时,出错子任务会以此参数的值作为返回值加到结果列表中,默认为None。

ConcurrentForeachJob

有时我们是希望一种任务以不同的参数并行执行,比如我要处理一个目录的文件,而插件只能顺序的一次处理一个,我希望并行处理这些文件,这时ConcurrentForeachJob就派上了用场。

ConcurrentForeachJob的使用方式跟Job基本相似,只不过我们需要提供的是一组参数:

ConcurrentForeachJob(
            name="test",
            caller=task,
            args=[[1], [2], [3], [4], [5]]
        )

args支持生成器或者可迭代序列,每次迭代的元素将作为每个并发子任务的参数。

ConcurrentForeachJob的参数:

  • name:组件名称
  • plugin:使用的插件名称
  • caller:执行的函数
  • args:参数
  • thread_num:构建多少个线程用来执行该任务,默认是10
  • task_num_per_thread:每个线程要执行多少个子任务,默认为每个线程1个,如果任务不是太重,每个任务一个线程有些浪费资源,那么可以通过该参数来分配,使一个线程执行尽可能多的任务,同时又保证效率。
  • pool_type:线程池类型,同样目前只支持ThreadPoolExecutor。
  • sub_join:针对每个线程的执行结果聚合。比如当一个线程执行了两个子任务,那么该函数会对这两个子任务进行聚合,默认忽略,将结果收集到列表。
  • result_join: 对最终结果的join操作,每个元素都是sub_join的结果,如果没有指定sub_join,那么将会是二维列表,每个子列表为每个线程的执行结果。

BufferingJob

假设我们正在为一个UGC应用编写搜索引擎,用户通过web发布自己的状态信息,该信息通过消息队列异步传达到搜索引擎的索引创建服务,如果每收到一条消息都去磁盘写入索引文件,那势必在用户活跃的高峰期会造成频繁的IO操作,为了避免磁盘的写压力过大,我们通常会使用在内存中积累一定的消息量再批量写入磁盘的方式去创建索引。但是索引创建又需要一定的时效性,用户发布的新内容,必须在尽量短的时间内被搜到,假如我们要收集齐1000条消息才去创建索引,但是如果迟迟收集不来1000条,那怎么办?此时就需要一个时间限制,一但到达时间限制,不管收集了多少条,都要将索引写入磁盘。

BufferingJob就是为这种场景专门打造的,它通过数目和时间两个限定条件,来决定是否采取下一步的行动。

def task(context):
    msg = redis_cli.bpop(queue_name)
    return msg


def give_back_msg(context, msg):
    redis_cli.push(queue_name, msg)
...

BufferingJob(
    name="consumer",
    caller=task,
    max_items=1000,
    timeout=30,
    immediately=true,
    give_back_handler=give_back_msg
)

BufferingJob启动一个任务线程,然后将caller指定的函数或是plugin指定的插件至于任务线程的一个循环中运行,直到达到指定的条件。

这个名为consumer的Job会从Redis的阻塞队列中不断获取消息,直到消息积累到1000条,如果超过了30秒,就会立即停止,执行接下来的步骤。

BufferingJob的参数

  • name 任务名称
  • plugin 插件名称
  • caller 运行函数
  • max_items 最大积累消息数
  • timeout 超时时间,单位是秒,如果不指定,则没有超时时间。
  • immediately 如果超时时间到了是否将任务立即停止。假设指定的超时时间到了,却有一个任务正在运行,是不管它直接停止,还是等它顺利执行完毕再停止?这是一个需要严肃考虑的问题。如果指定为true,工作流会继续往下执行,如果指定为false,工作流会等待执行中的任务完成才会继续。
  • give_back_handler 如果选择了immediately为true,会造成中途收到的消息没有处理的情况,我们该怎么办?give_back_handler提供了解决之道,允许我们对这种遗漏的消息进行处理,比如将消息归还到队列重新发送。

关于IO阻塞造成的超时问题

如果当前任务是因为外界IO而阻塞(例如等待网络消息),我们不能知道任务什么时候才会结束,即使immediately为true,也不能终止任务线程处理,immediately为true时,只是主线程放弃对任务线程的等待,任务线程其实仍然还在执行,只不过,当它获取消息的时候会将消息转交给give_back_handler处理,而不是写入context结果中。

大多数网络IO操作接口都会有阻塞超时时间的设置,建议您通过这个对超时时间进行控制,这样可以简化处理,不必担心消息丢失之类的问题。

另外BufferingJob的工作应该仅仅是读取消息,其它的逻辑处理应该放在工作流接下来的部分去做。

results matching ""

    No results matching ""