[源码解析] 并行分布式任务队列 Celery 之 负载均衡
目录
0x00 摘要
Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。本文介绍 Celery 的负载均衡机制。
Autoscaler 的作用 实际就是在线调节进程池大小。这也和缓解负载相关,所以放在这里一起论述。
0x01 负载均衡
Celery 的负载均衡其实可以分为三个层次,而且是与 Kombu 高度耦合(本文 broker 以 Redis 为例)。
- 在 worker 决定 与 哪几个 queue 交互,有一个负载均衡(对于 queues );
- 在 worker 决定与 broker 交互,使用 brpop 获取消息时候有一个负载均衡(决定哪一个 worker 来处理任务);
- 在 worker 获得 broker 消息之后,内部 具体 调用 task 时候,worker 内部进行多进程分配时候,有一个负载均衡(决定 worker 内部哪几个进程)。
注意,这个顺序是从 worker 读取任务处理任务的角度 出发,而不是从系统架构角度出发。
因为从系统架构角度说,应该是 which worker ----> which queue in the worker ----> which subprocess in the worker 这个角度。
我们下面按照 "worker 读取任务处理任务角度" 的顺序进行分析。
1.1 哪几个 queue
Kombu 事实上是使用 redis 的 BRPOP 功能来完成对具体 queue 中消息的读取。
- Kombu 是循环调用,每次调用会制定读取哪些内部queues的消息;
- queue 这个逻辑概念,其实就是对应了 redis 中的一个 物理key,从 queue 读取,就代表 BRPOP 需要指定 监听的 key。
- Kombu 是在每一次监听时候,根据这些 queues 得到 其在 redis 之中对应的物理keys,即都指定监听哪些 redis keys;
- brpop是个多key命令,当给定多个 key 参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的头元素。这样就得到了这些 逻辑queue 对应的消息。
因为 task 可能会 用到多个 queue,所以具体从哪几个queue 读取?这时候就用到了策略。
1.1.1 _brpop_start 选择下次读取的queue
Kombu 在每次监听时候,调用 _brpop_start 完成监听。其作用就是 选择下一次读取的queues。
_brpop_start 如下:
def _brpop_start(self, timeout=1): # 得到一些内部queues queues = self._queue_cycle.consume(len(self.active_queues)) if not queues: return # 得到queue对应的keys keys = [self._q_for_pri(queue, pri) for pri in self.priority_steps for queue in queues] + [timeout or 0] self._in_poll = self.client.connection self.client.connection.send_command('BRPOP', *keys) # 利用这些keys,从redis内部获取key的消息
此时变量如下:
self.active_queues = {set: 1} {'celery'}len(self.active_queues) = {int} 1 self._queue_cycle = {round_robin_cycle} <kombu.utils.scheduling.round_robin_cycle object at 0x0000015A7EE9DE88> self = {Channel} <kombu.transport.redis.Channel object at 0x0000015A7EE31048>
所以_brpop_start 就是从 self._queue_cycle 获得几个需要读取的queue。
具体如下图:
+ Kombu | Redis | |+--------------------------------------------+ || Worker | || | | queue 1 key| +-----------+ | || | queue 1 | | BRPOP(keys) || | queue 2 | keys | || | ...... | +--------+----------------------------------------> queue 2 key| | queue n | ^ | || +-----------+ | keys | || | | || | | | queue 3 key| +-------------+------------+ | || | Keys list | | || | | | || +--------------------------+ | |+--------------------------------------------+ | | | | | | +
1.1.2 round_robin_cycle 设置下次读取的 queue
从上面代码中,我们可以知道 consume 就是返回 round_robin_cycle 中前几个 queue,即 return self.items[:n]。
而 self.items 的维护,是通过 rotate 完成的,就是把 最近用的 那个 queue 放到队列最后,这样给其他 queue 机会,就是 round robin 的概念了。
class round_robin_cycle: """Iterator that cycles between items in round-robin.""" def __init__(self, it=None): self.items = it if it is not None else [] def update(self, it): """Update items from iterable.""" self.items[:] = it def consume(self, n): """Consume n items.""" return self.items[:n] def rotate(self, last_used): """Move most recently used item to end of list.""" items = self.items try: items.append(items.pop(items.index(last_used))) except ValueError: pass return last_used
比如在如下代码中,当读取到消息之后,就会调用 self._queue_cycle.rotate(dest) 进行调整。
def _brpop_read(self, **options): try: try: dest__item = self.client.parse_response(self.client.connection, 'BRPOP', **options) except self.connection_errors: # if there's a ConnectionError, disconnect so the next # iteration will reconnect automatically. self.client.connection.disconnect() raise if dest__item: dest, item = dest__item dest = bytes_to_str(dest).rsplit(self.sep, 1)[0] self._queue_cycle.rotate(dest) # 这里进行调整 self.connection._deliver(loads(bytes_to_str(item)), dest) return True else: raise Empty() finally: self._in_poll = None
具体如下图:
+ Kombu | Redis | |+--------------------------------------------+ || Worker | || | | queue 1 key| +-----------+ | || | queue 1 | | BRPOP(keys) || | queue 2 | keys | || | ...... | +--------+----------------------------------------> queue 2 key| | queue n | ^ | || +-----------+ | keys | || | | || + | | queue 3 key| round_robin_cycle | || + | || | | || | | || +-------------+------------+ | || | Keys list | | || +--------------------------+ | |+--------------------------------------------+ | | +
1.2 哪一个worker
如果多个 worker 同时去使用 brpop 获取 broker 消息,那么具体哪一个能够读取到消息,其实这就是有一个 竞争机制,因为redis 的单进程处理,所以只能有一个 worker 才能读到。
这本身就是一个负载均衡。这个和 spring quartz 的负载均衡实现非常类似。
- spring quartz 是 多个节点读取 同一个数据库记录决定谁能开始下一次处理,哪一个得到了数据库锁 就是哪个。
- Kombu 是通过 多个 worker 读取 redis "同一个或者一组key" 的 实际结果 来决定 "哪一个 worker 能开始下一次处理"。
具体如下图:
+ Kombu | Redis | |+--------------------------------------+ || Worker 1 | || | || +-----------+ | || | queue 1 | | BRPOP(keys) || | queue 2 | keys | || | ...... | +--------+-----------------------------+ || | queue n | ^ | | || +-----------+ | keys | | || | | | || + | | || round_robin_cycle | | | +--> queue 1 key| ^ | | | || | | | | || | | | | Single Thread || +------------+---------+ | +---------------------> queue 2 key| | keys list | | | | || +----------------------+ | | | |+--------------------------------------+ | | | | | +--> queue 3 key | |+--------------------------------------+ | || Worker 2 | BRPOP(keys) | || | +---------------+ || | | |+--------------------------------------+ | | | |+--------------------------------------+ BRPOP(keys) | || Worker 3 | | || | +--------------+ || | +| |+--------------------------------------+
1.3 哪一个进程
进程池中,使用了策略来决定具体使用哪一个进程来处理任务。
1.3.1 策略
先讲解 strategy。在 AsynPool 启动有如下,配置了策略:
class AsynPool(_pool.Pool): """AsyncIO Pool (no threads).""" def __init__(self, processes=None, synack=False, sched_strategy=None, proc_alive_timeout=None, *args, **kwargs): self.sched_strategy = SCHED_STRATEGIES.get(sched_strategy, sched_strategy)
于是我们看看 strategy 定义如下,基本由名字可以知道其策略意义:
SCHED_STRATEGY_FCFS = 1 # 先来先服务SCHED_STRATEGY_FAIR = 4 # 公平SCHED_STRATEGIES = { None: SCHED_STRATEGY_FAIR, 'default': SCHED_STRATEGY_FAIR, 'fast': SCHED_STRATEGY_FCFS, 'fcfs': SCHED_STRATEGY_FCFS, 'fair': SCHED_STRATEGY_FAIR,}
1.3.2 公平调度
我们讲讲公平调度的概念。
不同系统对于公平调度的理解大同小异,我们举几个例子看看。
- Linux 中,调度器必须在各个进程之间尽可能公平地共享CPU时间,而同时又要考虑不同的任务优先级。一般原理是:按所需分配的计算能力,向系统中每个进程提供最大的公正性,或者从另外一个角度上说, 试图确保没有进程被亏待。
- Hadoop 中,公平调度是一种赋予作业(job)资源的方法,它的目的是让所有的作业随着时间的推移,都能平均的获取等同的共享资源。当单独一个作业在运行时,它将使用整个集群。当有其它作业被提交上来时,系统会将任务(task)空闲时间片(slot)赋给这些新的作业,以使得每一个作业都大概获取到等量的CPU时间。
- Yarn 之中,Fair Share指的都是Yarn根据每个队列的权重、最大,最小可运行资源计算的得到的可以分配给这个队列的最大可用资源。
1.3.3 公平调度 in Celery
在 asynpool之中,有设置,看看"是否为 fair 调度":
is_fair_strategy = self.sched_strategy == SCHED_STRATEGY_FAIR
基于 is_fair_strategy 这个变量,Celery 的公平调度有几处体现。
在开始 poll 时候,如果是 fair,则需要 存在 idle worker 才调度,这样就给了 idler worker 一个调度机会。
def on_poll_start(): # Determine which io descriptors are not busy inactive = diff(active_writes) # Determine hub_add vs hub_remove strategy conditional if is_fair_strategy: # outbound buffer present and idle workers exist add_cond = outbound and len(busy_workers) < len(all_inqueues) else: # default is add when data exists in outbound buffer add_cond = outbound if add_cond: # calling hub_add vs hub_remove iterate_file_descriptors_safely( inactive, all_inqueues, hub_add, None, WRITE | ERR, consolidate=True) else: iterate_file_descriptors_safely( inactive, all_inqueues, hub_remove)
在具体发布 写操作 时候,也会看看是否 worker 已经正在忙于执行某一个 task,如果正在执行,就不调度,这样就给了其他 不忙worker 一个调度的机会。
def schedule_writes(ready_fds, total_write_count=None): if not total_write_count: total_write_count = [0] # Schedule write operation to ready file descriptor. # The file descriptor is writable, but that does not # mean the process is currently reading from the socket. # The socket is buffered so writable simply means that # the buffer can accept at least 1 byte of data. # This means we have to cycle between the ready fds. # the first version used shuffle, but this version # using `total_writes % ready_fds` is about 30% faster # with many processes, and also leans more towards fairness # in write stats when used with many processes # [XXX On macOS, this may vary depending # on event loop implementation (i.e, select/poll vs epoll), so # have to test further] num_ready = len(ready_fds) for _ in range(num_ready): ready_fd = ready_fds[total_write_count[0] % num_ready] total_write_count[0] += 1 if ready_fd in active_writes: # already writing to this fd continue if is_fair_strategy and ready_fd in busy_workers: # 是否调度 # worker is already busy with another task continue if ready_fd not in all_inqueues: hub_remove(ready_fd) continue
具体逻辑如下:
+ Kombu | Redis | BRPOP(keys) |+------------------------------------+ || Worker 1 | +---------------+ || | | |+------------------------------------+ | | queue 1 key | | +-> | | |+------------------------------------+ BRPOP(keys) | | Single thread || Worker 2 | +--------------------------------------> queue 2 key| | | | (which worker) |+------------------------------------+ | | | | | |+------------------------------------+ | | +-> queue 3 key| Worker 3 | | || | | || +-----------+ | | || | queue 1 | | BRPOP(keys) | || | queue 2 | keys | | || | ...... | +--------+-------------------------+ || | queue n | ^ | || +-----------+ | keys | || | | || + | || round_robin_cycle (which queues) || ^ | || | | || | | || +----+----+ | || + |keys list| | || | +---------+ | |+------------------------------------+ | | | | fair_strategy(which subprocess) | | | +-------+----------+----------------+ | | | | | v v v |+-----+--------+ +------+-------+ +-----+--------+ || subprocess 1 | | subprocess 2 | | subprocess 3 | ++--------------+ +--------------+ +--------------+
0x02 Autoscaler
Autoscaler 的作用 实际就是在线调节进程池大小。这也和缓解负载相关,所以放在这里一起论述。
2.1 调用时机
在 WorkerComponent 中可以看到,为 AutoScaler 注册了两个调用途径:
- 注册在 consumer 消息响应方法中,这样消费时候如果有需要,就会调整;
- 利用 Hub 的 call_repeatedly 方法注册了周期任务,即周期看看是否需要调整。
这样就会最大程度的加大调用频率。
class WorkerComponent(bootsteps.StartStopStep): """Bootstep that starts the autoscaler thread/timer in the worker.""" def create(self, w): scaler = w.autoscaler = self.instantiate( w.autoscaler_cls, w.pool, w.max_concurrency, w.min_concurrency, worker=w, mutex=DummyLock() if w.use_eventloop else None, ) return scaler if not w.use_eventloop else None def register_with_event_loop(self, w, hub): w.consumer.on_task_message.add(w.autoscaler.maybe_scale) # 消费时候如果有需要,就会调整 hub.call_repeatedly( # 周期看看是否需要调整 w.autoscaler.keepalive, w.autoscaler.maybe_scale, )
2.2 具体实现
2.2.1 bgThread
Autoscaler 是Background thread,这样 AutoScaler就可以在后台运行:
class bgThread(threading.Thread): """Background service thread.""" def run(self): body = self.body shutdown_set = self._is_shutdown.is_set try: while not shutdown_set(): body() finally: self._set_stopped()
2.2.2 定义
Autoscaler 的定义如下,可以看到其逻辑就是定期判断是否需要调整:
- 如果当前并发已经到了最大,则下调;
- 如果到了最小并发,则上调;
- 则具体上调下调的,都是通过具体线程池函数做到的,这就是要根据具体操作系统来进行分析,此处略过。
class Autoscaler(bgThread): """Background thread to autoscale pool workers.""" def __init__(self, pool, max_concurrency, min_concurrency=0, worker=None, keepalive=AUTOSCALE_KEEPALIVE, mutex=None): super().__init__() self.pool = pool self.mutex = mutex or threading.Lock() self.max_concurrency = max_concurrency self.min_concurrency = min_concurrency self.keepalive = keepalive self._last_scale_up = None self.worker = worker def body(self): with self.mutex: self.maybe_scale() sleep(1.0) def _maybe_scale(self, req=None): procs = self.processes cur = min(self.qty, self.max_concurrency) if cur > procs: self.scale_up(cur - procs) return True cur = max(self.qty, self.min_concurrency) if cur < procs: self.scale_down(procs - cur) return True def maybe_scale(self, req=None): if self._maybe_scale(req): self.pool.maintain_pool() def update(self, max=None, min=None): with self.mutex: if max is not None: if max < self.processes: self._shrink(self.processes - max) self._update_consumer_prefetch_count(max) self.max_concurrency = max if min is not None: if min > self.processes: self._grow(min - self.processes) self.min_concurrency = min return self.max_concurrency, self.min_concurrency def scale_up(self, n): self._last_scale_up = monotonic() return self._grow(n) def scale_down(self, n): if self._last_scale_up and ( monotonic() - self._last_scale_up > self.keepalive): return self._shrink(n) def _grow(self, n): self.pool.grow(n) def _shrink(self, n): self.pool.shrink(n) def _update_consumer_prefetch_count(self, new_max): diff = new_max - self.max_concurrency if diff: self.worker.consumer._update_prefetch_count( diff ) @property def qty(self): return len(state.reserved_requests) @property def processes(self): return self.pool.num_processes
0xEE 个人信息
★★★★★★关于生活和技术的思考★★★★★★
微信公众账号:罗西的思考
如果您想及时得到个人撰写文章的消息推送,或者想看看个人推荐的技术资料,敬请关注。