使用 Kafka¶
配置¶
celeryconfig.py
import os
task_serializer = 'json'
broker_transport_options = {
# "allow_create_topics": True,
}
broker_connection_retry_on_startup = True
# 对于使用 SQLAlchemy 作为后端
# result_backend = 'db+postgresql://postgres:example@localhost/postgres'
broker_transport_options.update({
"security_protocol": "SASL_SSL",
"sasl_mechanism": "SCRAM-SHA-512",
})
sasl_username = os.environ["SASL_USERNAME"]
sasl_password = os.environ["SASL_PASSWORD"]
broker_url = f"confluentkafka://{sasl_username}:{sasl_password}@broker:9094"
broker_transport_options.update({
"kafka_admin_config": {
"sasl.username": sasl_username,
"sasl.password": sasl_password,
},
"kafka_common_config": {
"sasl.username": sasl_username,
"sasl.password": sasl_password,
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-512",
"bootstrap_servers": "broker:9094",
}
})
请注意,如果主题尚不存在,则需要 "allow_create_topics",否则不需要。
tasks.py
from celery import Celery
app = Celery('tasks')
app.config_from_object('celeryconfig')
@app.task
def add(x, y):
return x + y
认证¶
参见上文。SASL 用户名和密码通过环境变量传递。
更多信息¶
Celery 队列会被路由到 Kafka 主题。例如,如果一个队列名为 "add_queue",那么将在 Kafka 中创建/使用一个名为 "add_queue" 的主题。
对于 canvas,当使用支持它的后端时,典型的机制如 chain、group 和 chord 似乎可以工作。
限制¶
目前,使用 Kafka 作为代理意味着只能使用一个工作进程。参见 Multiple celery fork pool workers don't work #1785 。