在执行任务的DAG中,设置 catchup=False 和 schedule_interval='0 0 * * *',并在需要回放的日期范围内手动运行DAG。例如,要回放从2021年11月1日到2021年11月30日的任务,可以在DAG目录中运行以下命令:
airflow dags backfill my_dag_name -s 2021-11-01 -e 2021-11-30
其中,my_dag_name 是你的DAG名称。
代码示例:
from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime, timedelta
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2021, 11, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), }
dag = DAG( 'my_dag_name', default_args=default_args, description='A simple DAG', schedule_interval='0 0 * * *', catchup=False )
def my_task(ds, **kwargs): print(f"Executing task for {ds}")
run_this = PythonOperator( task_id='my_task', provide_context=True, python_callable=my_task, dag=dag, )
run_this