跳转至

优化

引言

默认配置做了很多妥协。它对于任何单一情况都不是最优的,但在大多数情况下都能很好地工作。

可以根据特定用例应用优化措施。

优化可以应用于运行环境的不同属性,无论是任务执行所需的时间、使用的内存量,还是在高峰负载时的响应能力。

确保运行

在《编程珠玑》一书中,Jon Bentley通过提出以下问题介绍了信封背面计算的概念:

❝ 密西西比河一天流出多少水? ❞

这个练习的重点1是表明系统在及时处理数据方面存在限制。信封背面计算可以作为一种提前规划的手段。

在Celery中;如果一个任务需要10分钟才能完成,而每分钟有10个新任务进入,那么队列永远不会为空。这就是为什么监控队列长度非常重要!

一种方法是使用 Munin。您应该设置警报,在任何队列达到不可接受的大小时立即通知您。这样您就可以采取适当的措施,比如添加新的工作节点,或撤销不必要的任务。

通用设置

代理连接池

代理连接池自版本 2.5 起默认启用。

您可以调整 broker_pool_limit 设置以最小化争用,该值应基于使用代理连接的活跃线程/绿色线程数量。

使用临时队列

Celery 创建的队列默认是持久化的。这意味着代理会将消息写入磁盘,以确保即使代理重启,任务也能被执行。

但在某些情况下,消息丢失是可以接受的,因此并非所有任务都需要持久性。您可以为这些任务创建一个临时队列来提高性能:

from kombu import Exchange, Queue

task_queues = (
    Queue('celery', routing_key='celery'),
    Queue('transient', Exchange('transient', delivery_mode=1),
          routing_key='transient', durable=False),
)

或使用 task_routes

task_routes = {
    'proj.tasks.add': {'queue': 'celery', 'delivery_mode': 'transient'}
}

delivery_mode 更改了发送到此队列的消息的传递方式。值为 1 表示消息不会被写入磁盘,值为 2(默认值)表示消息可以被写入磁盘。

要将任务定向到您的新临时队列,您可以指定队列参数(或使用 task_routes 设置):

task.apply_async(args, queue='transient')

更多信息请参阅 路由指南

Worker 设置

预取限制

Prefetch(预取)是一个从 AMQP 继承而来的术语,经常被用户误解。

预取限制是工作器可以为自己保留的任务(消息)数量的限制。如果为零,工作器将继续消耗消息,不考虑可能还有其他可用的工作器节点能够更快地处理它们 2,或者消息可能甚至无法放入内存。

工作器的默认预取计数是 worker_prefetch_multiplier 设置乘以并发槽的数量 3(进程/线程/绿色线程)。

如果您有许多长时间运行的任务,您希望乘数值为:这意味着每个工作器进程一次只保留一个任务。

然而——如果您有许多短时间运行的任务,并且吞吐量/往返延迟对您很重要,这个数字应该较大。如果消息已经被预取并可用在内存中,工作器能够每秒处理更多任务。您可能需要实验找到最适合您的最佳值。在这种情况下,像 50 或 150 这样的值可能是有意义的。比如 64 或 128。

如果您有长时间运行和短时间运行任务的组合,最佳选择是使用两个分别配置的工作器节点,并根据运行时间路由任务(参见 路由指南)。

一次保留一个任务

任务消息只有在任务被 acknowledged(确认)后才会从队列中删除,因此如果工作器在确认任务之前崩溃,它可以被重新传递给另一个工作器(或在恢复后传递给同一个工作器)。

请注意,异常在 Celery 中被视为正常操作,并且会被确认。 确认实际上用于防范无法通过 Python 异常系统正常处理的故障(即电源故障、内存损坏、硬件故障、致命信号等)。 对于正常异常,您应该使用 task.retry() 来重试任务。

当使用默认的早期确认时,设置预取乘数为,意味着工作器最多为每个工作器进程保留一个额外任务:换句话说,如果工作器以 celery worker -c 10 启动,工作器在任何时候最多可以保留 20 个任务(10 个正在执行的已确认任务,和 10 个未确认的保留任务)。

用户经常询问是否可能禁用"任务预取",这是可能的,但有一个条件。您可以让工作器只保留与工作器进程数量相同的任务,条件是它们被延迟确认(对于 celery worker -c 10,有 10 个正在执行的未确认任务)。

为此,您需要启用 late acknowledgment(延迟确认)。使用此选项而不是默认行为意味着在电源故障或工作器实例突然被杀死的情况下,已经开始执行的任务将被重试,因此这也意味着任务必须是 idempotent(幂等)的。

您可以通过以下配置选项启用此行为:

task_acks_late = True
worker_prefetch_multiplier = 1

如果您的任务不能被延迟确认,您可以通过启用 worker_disable_prefetch 来禁用代理预取。使用此设置,工作器只有在执行槽空闲时才获取新任务,防止任务在繁忙的工作器上等待长时间运行的任务。这也可以通过命令行使用 celery worker --disable-prefetch 来设置。此功能目前仅在使用 Redis 作为代理时受支持。

内存使用

如果您在 prefork 工作器上遇到高内存使用情况,首先需要确定问题是否也发生在 Celery 主进程上。Celery 主进程的内存使用量在启动后不应继续急剧增加。如果您看到这种情况发生,可能表明存在内存泄漏错误,应报告给 Celery 问题跟踪器。

如果只有您的子进程有高内存使用情况,这表明您的任务存在问题。

请记住,Python 进程内存使用具有"高水位线",并且直到子进程停止才会将内存返回给操作系统。这意味着单个高内存使用任务可能会永久增加子进程的内存使用量,直到它被重启。修复此问题可能需要向您的任务添加分块逻辑以减少峰值内存使用量。

Celery 工作器有两种主要方法来帮助减少由于"高水位线"和/或子进程中的内存泄漏导致的内存使用:worker_max_tasks_per_childworker_max_memory_per_child 设置。

您必须小心不要将这些设置设置得太低,否则您的工作器将花费大部分时间重启子进程而不是处理任务。例如,如果您使用 worker_max_tasks_per_child 为 1,并且您的子进程需要 1 秒启动,那么该子进程每分钟最多只能处理 60 个任务(假设任务立即运行)。当您的任务总是超过 worker_max_memory_per_child 时,可能会出现类似的问题。


  1. 该章节可在此处免费阅读:信封背面。这本书是一本经典文本。强烈推荐。 

  2. RabbitMQ 和其他代理以轮询方式传递消息,因此这不适用于活跃系统。如果没有预取限制并且您重启集群,节点启动之间会有时间延迟。如果有 3 个离线节点和一个活跃节点,所有消息都将被传递到活跃节点。 

  3. 这是并发设置 worker_concurrencycelery worker -c 选项。