在Airflow中,我们经常需要在运行任务时传递一些参数来定制化任务的行为。有时,这些参数可能需要在任务运行时根据情况进行更新。Airflow提供了多种传递参数的方法,本文将介绍其中的两种:在DAG和Operator级别传递参数。
from airflow import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
with DAG('example_dag', schedule_interval='@daily', default_args=default_args) as dag:
t1 = BashOperator(
task_id='example_task',
bash_command='echo "{{ dag_run.conf["parameter_name"] }}"'
)
其中,{{ dag_run.conf["parameter_name"] }}可以获取到在任务运行时传递的参数,如:
airflow backfill example_dag -s 2021-01-01 -e 2021-01-03 --conf '{"parameter_name": "test_param"}'
from airflow import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
with DAG('example_dag', schedule_interval='@daily', default_args=default_args) as dag:
t1 = BashOperator(
task_id='example_task',
bash_command='echo "{{ dag_run.conf["parameter_name"] }}"',
provide_context=True
)
def my_func(ds, **kwargs):
param_value = kwargs['dag_run'].conf['my_parameter']
# do something using param_value
return None
t2 = PythonOperator(
task_id='example_task',
python_callable=my_func,
provide_context=True
)
其中,provide_context=True表示该Operator可以接收上下文环境中的参数。在PythonOperator中,可以通过kwargs获取到上下文环境中的参数,并使用它进行操作。我们可以在