使用DAG运行上下文中的变量将参数传递给DAG,并将其存储在Airflow元数据库中。下面是一个示例:
from airflow import DAG
from datetime import datetime
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 8, 23),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
}
dag = DAG(dag_id='my_dag', default_args=default_args, schedule_interval=None)
def my_function(**context):
my_var = context['dag_run'].conf.get('my_var')
# Use my_var parameter here
task1 = PythonOperator(
task_id='task1',
dag=dag,
python_callable=my_function,
provide_context=True)
task2 = PythonOperator(
task_id='task2',
dag=dag,
python_callable=my_function,
provide_context=True)
# Pass parameter to DAG using 'conf' argument
passing_parameter = {"my_var": "Hello World!"}
dag_run = dag.create_dagrun(
run_id=f"manual__{datetime.utcnow().isoformat()}",
state='manual',
conf=passing_parameter,
execution_date=datetime.utcnow()
)
在此示例中,我们通过DAG上下文中的“conf”参数传递了一个参数“my_var”来存储“Hello World!”。在我们的任务函数“my_function”中,我们可以使用以下代码从上下文中检索我们传递的参数:
my_var = context['dag_run'].conf.get('my_var')
通过这种方式,我们可以使用Airflow元数据库中存储的参数值在任务外部使用DAG参数。