常见的解决方案是在Subdag的DAG_ID中加入Main_DAG_ID以避免DAG_ID重复的问题。同时,确保在子DAG中使用正确的Operator。
示例代码:
from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.subdag_operator import SubDagOperator from datetime import datetime
default_args = { "owner": "airflow", "depends_on_past": False, "start_date": datetime(2022, 6, 1), "retries": 0 }
dag = DAG( dag_id="Main_DAG_ID", default_args=default_args, schedule_interval="@daily" )
subdag_params = { "main_dag_id": "Main_DAG_ID", "start_date": datetime(2022, 6, 1) }
subdag = DAG( dag_id="SubDAG_ID", default_args=default_args, schedule_interval="@daily" )
subdag_task = BashOperator( task_id="subdag_task", bash_command="echo 'This is a subdag task'", dag=subdag )
subdag = SubDagOperator( task_id="subdag", subdag=subdag, dag=dag )
subdag_task.set_upstream(subdag)