Airflow Celery worker 是 Airflow 中用于执行任务的异步执行框架,它提供了 Celery 作为默认的执行引擎。其中 Celery 是一个 Python 的分布式任务队列框架,用于处理异步任务和定时任务等。而 celery-hostname 则是指 Celery 的主机名。
示例代码:
[celery]
# Celery app configuration
broker_url = your_broker_url
result_backend = your_result_backend_url
celery_imports = your_module_to_import
# Other Celery settings
worker_log_server_port = 8793
worker_hijack_root_logger = False
celeryd_workdir = /your_work_directory
celeryd_concurrency = 16
celeryd_prefetch_multiplier = 4
celeryd_max_tasks_per_child = 1000
# Set Celery hostname
hostname = your_hostname
from celery import Celery
app = Celery('your_app_name', broker='your_broker_url')
@app.task(bind=True)
def your_task(self, *args, **kwargs):
# your task implementation
pass
if __name__ == '__main__':
result = app.send_task('your_task_name', args=[...], kwargs={...}, queue='your_queue_name', routing_key='your_routing_key_name', task_id='your_task_id', countdown=10, expires=3600, priority=2, serializer='json', compression='gzip', hostname='celery-hostname')
需要注意的是,如果你有多个 celery-worker,每个 Celery worker 必须有不同的主机名才能保证区分不同的 worker。此外,这些配置还可以通过命令行参数或环境变量进行设置。