在CLI中运行Airflow Subdag回溯时,应该考虑使用airflow clear
命令来清除失败的任务,然后使用airflow backfill
命令来重新运行这些任务。同时,确保在Subdag定义中设置好正确的日期范围参数。以下是代码示例:
# Subdag定义
def subdag(parent_dag_name, child_dag_name, args):
dag = DAG(
dag_id=f'{parent_dag_name}.{child_dag_name}',
default_args=args,
schedule_interval='@daily'
)
with dag:
task_1 = BashOperator(
task_id='task_1',
bash_command='echo "Subdag task 1"',
)
return dag
# 父DAG定义
with DAG(
dag_id='parent_dag',
default_args=default_args,
schedule_interval='@daily'
) as dag:
task_1 = BashOperator(
task_id='task_1',
bash_command='echo "Parent task 1"',
)
# Subdag实例化
subdag_task = SubDagOperator(
task_id='subdag',
subdag=subdag('parent_dag', 'subdag', default_args),
)
task_1 >> subdag_task
# 清除任务
airflow clear parent_dag.subdag -s 2022-01-01 -e 2022-01-10 -t task_1
# 回溯任务
airflow backfill parent_dag.subdag -s 2022-01-01 -e 2022-01-10 -t task_1