首先,在DAG定义中添加一个可选参数,用于接收dag_run.conf中的输入
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
# 默认参数
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2019, 12, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# DAG定义
dag = DAG(
'my_dag',
default_args=default_args,
schedule_interval=timedelta(days=1),
catchup=False
)
# DAG任务定义
task1 = BashOperator(
task_id='task1',
bash_command='echo {{ dag_run.conf["param1"] }}',
dag=dag
)
在运行DAG时,添加输入参数
dagrun_conf = {'param1': 'my_parameter_value'}
dag_run = dag.create_dagrun(
run_id='my_dag_run',
state="running",
conf=dagrun_conf,
execution_date=datetime.now(),
external_trigger=True
)
运行DAG并触发任务
airflow trigger_dag my_dag -e 20210202 -c '{"param1": "my_parameter_value"}'
注:上述代码示例仅供参考,需根据实际需要进行调整。