- 在Airflow的配置文件中设置Celeryexecutor工作器的一些参数(例如,broker、backend等)与现有的celery工作器相同。
- 需要安装celery库并在Airflow中导入。
- 可以通过在DAG中指定
celery_config_section
参数来使用现有的celery工作器。示例代码如下:
from airflow import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
# 导入celery库
from airflow.executors.celery_executor import CeleryExecutor
# 设置Celeryexecutor工作器的一些参数
celery_config = {
'broker_url': 'redis://localhost:6379/0',
'result_backend': 'redis://localhost:6379/0'
}
# 生成DAG对象,其中指定了Celeryexecutor工作器相关参数
dag = DAG(
'example_dag',
default_args={
'owner': 'airflow',
'start_date': datetime(2021, 1, 1)
},
schedule_interval=None,
catchup=False,
executor=CeleryExecutor(celery_config=celery_config),
dagrun_timeout=600,
max_active_runs=1
)
# 定义相关任务,其中会调用celery工作器
task_1 = BashOperator(
task_id='task_1',
bash_command='sleep 5',
dag=dag
)
task_2 = PythonOperator(
task_id='task_2',
python_callable=my_python_function,
dag=dag
)
task_1 >> task_2