在Airflow中,可以通过以下步骤将任务持续推送到执行队列:
from airflow import DAG
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 1, 1),
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'example_dag',
default_args=default_args,
schedule_interval='@daily'
)
from airflow.operators.python_operator import PythonOperator
def task_func(**kwargs):
# 执行任务逻辑
print('Executing task...')
task = PythonOperator(
task_id='example_task',
python_callable=task_func,
provide_context=True,
dag=dag
)
task1 = PythonOperator(task_id='task1', python_callable=task_func, dag=dag)
task2 = PythonOperator(task_id='task2', python_callable=task_func, dag=dag)
task1 >> task2
airflow scheduler
airflow logs -t -d
这样,任务就会根据设定的调度时间被持续推送到执行队列中执行。