在Airflow中,嵌套的DAG(定义在其他DAG中的DAG)可能会出现默认参数无效的问题。这是由于默认参数只会在DAG实例化时进行计算而不会传递到嵌套的DAG中。
为了解决这个问题,我们可以使用 DefaultArgs 类来设置默认参数,并允许传递给嵌套 DAG。
例如,我们可以在定义主DAG时设置子DAG的默认参数:
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from subdag import subdag
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 1, 1),
'email': ['airflow@example.com'],
'retries': 1,
}
with DAG(dag_id='my_dag', default_args=default_args, schedule_interval='*/10 * * * *') as dag:
def print_hello():
print('Hello from the main DAG!')
task = PythonOperator(
task_id='print_hello',
python_callable=print_hello,
dag=dag,
)
subdag_task = SubDagOperator(
task_id='subdag_task',
subdag=subdag('my_dag', 'subdag_task', default_args=default_args),
default_args=default_args,
dag=dag,
)
task >> subdag_task
然后,在定义子DAG时,我们可以使用父DAG的默认参数:
def subdag(parent_dag_id, child_dag_id, args):
with DAG(
dag_id=f'{parent_dag_id}.{child_dag_id}',
default_args=args,
schedule_interval=default_dag_schedule,
) as dag:
...
这样,我们就可以使用父DAG的默认参数在子DAG中。