跳转至

使用 RabbitMQ

安装与配置

RabbitMQ 是默认的代理,因此除了您想要使用的代理实例的 URL 位置外,不需要任何额外的依赖项或初始配置:

broker_url = 'amqp://myuser:mypassword@localhost:5672/myvhost'

有关代理 URL 的描述以及 Celery 可用的各种代理配置选项的完整列表,请参阅 conf-broker-settings,并参阅下文设置用户名、密码和虚拟主机。

安装 RabbitMQ Server

请参阅 RabbitMQ 网站上的 Installing RabbitMQ

设置 RabbitMQ

要使用 Celery,我们需要创建一个 RabbitMQ 用户、一个虚拟主机,并允许该用户访问该虚拟主机:

sudo rabbitmqctl add_user myuser mypassword

sudo rabbitmqctl add_vhost myvhost

sudo rabbitmqctl set_user_tags myuser mytag

sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"

将上述 myusermypasswordmyvhost 替换为适当的值。

有关 access control 的更多信息,请参阅 RabbitMQ Authentication, Authorisation, Access Control

在 macOS 上安装 RabbitMQ

在 macOS 上安装 RabbitMQ 最简单的方法是使用 Homebrew

brew install rabbitmq

使用 brew 安装 RabbitMQ 后,您需要将以下内容添加到您的路径中,以便能够启动和停止代理:将其添加到您的 shell 启动文件中(例如 .bash_profile.profile)。

PATH=$PATH:/usr/local/sbin

配置系统主机名

如果您使用的 DHCP 服务器给您一个随机的主机名,您需要永久配置主机名。这是因为 RabbitMQ 使用主机名与节点通信。

使用 scutil 命令永久设置您的主机名:

sudo scutil --set HostName myhost.local

然后将该主机名添加到 /etc/hosts 中,以便可以将其解析回 IP 地址:

127.0.0.1       localhost myhost myhost.local

如果您启动 rabbitmq-server,您的 rabbit 节点现在应该是 rabbit@myhost,可以通过 rabbitmqctl 验证:

sudo rabbitmqctl status
Status of node rabbit@myhost ...
[{running_applications,[{rabbit,"RabbitMQ","1.7.1"},
                    {mnesia,"MNESIA  CXC 138 12","4.4.12"},
                    {os_mon,"CPO  CXC 138 46","2.2.4"},
                    {sasl,"SASL  CXC 138 11","2.1.8"},
                    {stdlib,"ERTS  CXC 138 10","1.16.4"},
                    {kernel,"ERTS  CXC 138 10","2.13.4"}]},
{nodes,[rabbit@myhost]},
{running_nodes,[rabbit@myhost]}]
...done.

如果您的 DHCP 服务器给您一个以 IP 地址开头的主机名(例如 23.10.112.31.comcast.net),这一点尤其重要。在这种情况下,RabbitMQ 将尝试使用 rabbit@23:一个非法的主机名。

启动/停止 RabbitMQ 服务器

要启动服务器:

sudo rabbitmq-server

您也可以通过添加 -detached 选项在后台运行它(注意:只有一个破折号):

sudo rabbitmq-server -detached

切勿使用 kill (kill(1)) 停止 RabbitMQ 服务器,而是使用 rabbitmqctl 命令:

sudo rabbitmqctl stop

使用 Quorum 队列

Warning

Quorum 队列需要禁用全局 QoS,这意味着某些功能将无法按预期工作。 有关详细信息,请参阅 limitations

Celery 支持 Quorum Queues,通过将 x-queue-type 标头设置为 quorum,如下所示:

from kombu import Queue

task_queues = [Queue('my-queue', queue_arguments={'x-queue-type': 'quorum'})]
broker_transport_options = {"confirm_publish": True}

如果您想更改默认队列的类型,请将 task_default_queue_type 设置为 "quorum"

配置 Quorum Queues 的另一种方法是依赖默认设置并使用 task_routes

task_default_queue_type = "quorum"
task_default_exchange_type = "topic"
task_default_queue = "my-queue"
broker_transport_options = {"confirm_publish": True}

task_routes = {
    "*": {
        "routing_key": "my-queue",
    },
}

Celery 使用 worker_detect_quorum_queues 设置自动检测是否使用了 quorum 队列。我们建议保持默认行为开启。

要从经典镜像队列迁移到 quorum 队列,请参阅 Migrating from Mirrored Classic Queues to Quorum Queues

限制

禁用全局 QoS 意味着每通道 QoS 现在是静态的。这意味着在使用 Quorum 队列时,某些 Celery 功能将无法工作。

自动缩放依赖于在实例化或终止新进程时增加和减少预取计数,因此在检测到 Quorum 队列时它将无法工作。

类似地,worker_enable_prefetch_count_reduction 设置在检测到 Quorum 队列时即使设置为 True 也将无效。

此外,ETA 和倒计时 在接收到时会阻塞工作进程,直到 ETA 到达,因为我们无法再增加预取计数并从队列中获取另一个任务。

为了正确调度 ETA 和倒计时 任务,我们自动检测是否使用了 quorum 队列,如果使用了,Celery 会自动启用 原生延迟交付

原生延迟交付

由于带有 ETA 和倒计时 的任务会阻塞工作进程,直到它们被调度执行, 我们需要使用 RabbitMQ 的原生功能来调度任务的执行。

该设计借鉴自 NServiceBus。如果您对实现细节感兴趣,请参阅 RabbitMQ Delayed Delivery

原生延迟交付在检测到 quorum 队列时自动启用。

默认情况下,原生延迟交付队列是 quorum 队列。如果您想将它们更改为经典队列,可以将 broker_native_delayed_delivery_queue_type 设置为 classic。