跳转至

使用 Google Pub/Sub

安装

要支持 Google Pub/Sub,您需要安装额外的依赖项。您可以使用 celery[gcpubsub] 一次性安装 Celery 和这些依赖项:

uv add "celery[gcpubsub]"
pip install "celery[gcpubsub]"

配置

您必须在代理 URL 中指定 gcpubsub 和 Google 项目:

broker_url = 'gcpubsub://projects/project-id'

URL 格式为:

gcpubsub://projects/project-id

请注意,您必须在 URL 中为 project-id 添加 projects/ 前缀。

登录凭据将是您在环境中设置的常规 GCP 凭据。

选项

资源过期

默认设置旨在尽可能简单、经济高效且直观,以便"开箱即用"。Pub/Sub 消息和订阅设置为 24 小时后过期,可以通过配置 expiration_seconds 设置来调整:

expiration_seconds = 86400

确认截止时间

ack_deadline_seconds 定义了 Pub/Sub 基础设施在将消息重新传递给另一个工作器之前,应等待工作器确认任务的秒数。

此选项通过 broker_transport_options 设置进行配置:

broker_transport_options = {'ack_deadline_seconds': 60}  # 1 分钟。

默认的可见性超时为 240 秒,工作器会自动延长其所有待处理消息的确认时间。

轮询间隔

轮询间隔决定了在轮询失败之间休眠的秒数。此值可以是整数或浮点数。默认值为 0.1 秒。但这并不意味着当没有更多消息可读时,工作器会每 0.1 秒轰炸 Pub/Sub API,因为它会被对 Pub/Sub API 的阻塞调用所阻塞,该调用只会在有新消息可读或 10 秒后返回。

轮询间隔可以通过 broker_transport_options 设置进行配置:

broker_transport_options = {'polling_interval': 0.3}

非常频繁的轮询间隔可能导致忙循环,导致工作器使用大量 CPU 时间。如果您需要亚毫秒级的精度,应考虑使用其他传输方式,如 RabbitMQRedis

队列前缀

默认情况下,Celery 会为队列名称分配 kombu- 前缀。如果您有其他使用 Pub/Sub 的服务,可以通过 broker_transport_options 设置进行配置:

broker_transport_options = {'queue_name_prefix': 'kombu-'}

结果

Google Cloud Storage (GCS) 可能是存储结果的一个不错的选择。。

注意事项

  • 使用 celery flower 时,需要 --inspect-timeout=10 选项才能正确检测工作器状态。

  • GCP 订阅的空闲订阅(没有排队的消息)配置为 24 小时后删除。 这旨在降低成本。

  • 排队和未确认的消息设置为 24 小时后自动清理。 原因同上。

  • 通道队列大小是近似值,可能不准确。 原因是 Pub/Sub API 不提供获取订阅中确切消息数量的方法。

  • 孤儿(无订阅)Pub/Sub 主题不会被自动删除!! 由于 GCP 对每个项目引入了 10k 个主题的硬限制, 建议定期手动删除孤儿主题。

  • 最大消息大小限制为 10MB,作为解决方法,您可以使用 GCS 后端 将消息存储在 GCS 中,并将 GCS URL 传递给任务。