Airflow中,同一DAG中的TaskInstance会在同一进程中执行。可以通过设置DAG中的参数,让TaskInstance之间共享变量。
在DAG定义中添加default_args
参数,并在其中定义要共享的变量。在Task中可以通过context
对象访问这些变量。
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
default_args = {
'start_date': datetime(2021, 1, 1),
'var_shared': 'a shared variable'
}
dag = DAG(
'my_dag',
default_args=default_args
)
task1 = BashOperator(
task_id='task1',
bash_command='echo "{{ dag.default_args.var_shared }}"',
dag=dag
)
task2 = BashOperator(
task_id='task2',
bash_command='echo "{{ dag.default_args.var_shared }}"',
dag=dag
)
task1 >> task2
在上述代码中,default_args
中定义的var_shared
变量可以在两个Task中共享使用。在Task中,可以通过{{ dag.default_args.var_shared }}
的方式访问这个变量。