可以通过使用Airflow的Jinja模板语言在函数中引用触发器参数。
具体步骤如下:
1.在dag文件中定义触发器参数
default_args = {
'start_date': datetime(2021, 10, 1),
'email_on_failure': False,
'email_on_retry': False
}
dag = DAG(
'my_dag',
default_args=default_args,
schedule_interval=None
)
t1 = BashOperator(
task_id='bash_task',
bash_command='echo {{ ds }}',
dag=dag
)
2.在函数中使用Jinja模板语言引用触发器参数
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
def my_function(**context):
ds = context['ds']
print(ds)
default_args = {
'start_date': datetime(2021, 10, 1),
'email_on_failure': False,
'email_on_retry': False
}
dag = DAG(
'my_dag',
default_args=default_args,
schedule_interval=None
)
t1 = BashOperator(
task_id='bash_task',
bash_command='python my_function.py --ds={{ ds }}',
dag=dag
)
这样,在执行任务时,Airflow会将触发器参数传递给函数,并将之解析为具体的日期变量。