Airflow 是一个用于创建、调度和监控工作流的平台,而Celery是一个分布式任务队列。Airflow使用Celery作为任务调度器,但并不使用Celery Beat来调度任务。
Airflow使用自己的调度器来调度任务,称为"Scheduler"。这个调度器会根据任务的依赖关系和时间表来决定何时执行任务。
以下是一个示例的Airflow DAG(有向无环图)的代码,展示了如何设置Airflow的任务调度:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def my_task():
print("My task is running!")
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'my_dag',
default_args=default_args,
schedule_interval='*/5 * * * *', # 每5分钟调度一次任务
)
task = PythonOperator(
task_id='my_task',
python_callable=my_task,
dag=dag,
)
task
在上面的示例中,我们创建了一个名为my_dag
的DAG,它包含一个名为my_task
的任务。这个任务将会每5分钟调度一次。
要运行Airflow,您可以使用以下命令:
airflow scheduler
这将启动Airflow的调度器,并根据定义的时间表来调度任务。
总结起来,Airflow不使用Celery Beat来调度任务,而是使用自己的调度器。您可以通过设置DAG的schedule_interval
属性来定义任务的调度时间表。