可以尝试使用以下代码更新Celery配置,为Worker提供权限创建队列:
from airflow import configuration as conf
conf.add_section('celery')
conf.set('celery', 'worker_pool_restarts', 'True')
conf.set('celery', 'worker_concurrency', '1')
conf.set('celery', 'task_acks_late', 'True')
conf.set('celery', 'task_default_queue', 'default')
conf.set('celery', 'task_default_exchange', 'default')
conf.set('celery', 'task_default_routing_key', 'default')
此外,还可以在调度器中使用以下代码来设置队列:
from airflow.models import Connection
conn = Connection(
conn_id='my_queue',
conn_type='amqp',
host='[HOST]',
virtual_host='[VIRTUAL_HOST]',
login='[USERNAME]',
password='[PASSWORD]',
port=[PORT]
)
conn.extra = """
{
"celery_queue_prefix": "testQueue.",
"queues": {
"testQueue": {
"exchange": "testExchange",
"routing_key": "testRoutingKey"
}
}
}
"""
conn.insert()
确保将testQueue
替换为需要创建的队列名称,并按需更改extra
字段中的其他设置。
最后,重新启动Airflow Worker以使更改生效。