要为Operator创建动态参数,可以使用Airflow的宏功能。下面是一个示例解决方案:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from datetime import datetime
def my_task(**kwargs):
# 使用宏获取动态参数
dynamic_param = kwargs['dag_run'].conf.get('dynamic_param')
print(f"Dynamic param: {dynamic_param}")
dag = DAG(
'dynamic_param_example',
description='Dynamically pass parameters to an operator',
schedule_interval=None,
start_date=days_ago(1),
catchup=False
)
task = PythonOperator(
task_id='my_task',
provide_context=True,
python_callable=my_task,
dag=dag
)
# 设置动态参数
dynamic_param = {'param1': 'value1', 'param2': 'value2'}
# 使用DAG的extra_params属性传递动态参数
dag.extra_params = dynamic_param
# 或者使用DAG run的conf属性传递动态参数
# dag_run = dag.create_dagrun(run_id='manual_run', execution_date=datetime.now(), conf=dynamic_param)
task
在这个示例中,我们创建了一个名为dynamic_param_example
的DAG,其中包含一个名为my_task
的PythonOperator。在my_task
函数中,我们使用kwargs
参数并通过dag_run.conf.get('dynamic_param')
来获取动态参数。
动态参数可以通过两种方式传递给DAG。一种方式是将动态参数设置为DAG的extra_params
属性,如示例中的dag.extra_params = dynamic_param
。另一种方式是在执行DAG时,使用dag_run.conf
参数传递动态参数,如示例中的dag_run = dag.create_dagrun(run_id='manual_run', execution_date=datetime.now(), conf=dynamic_param)
。
无论使用哪种方式,my_task
函数都可以通过kwargs['dag_run'].conf.get('dynamic_param')
来获取动态参数的值。在示例中,我们只是简单地打印动态参数的值,你可以根据实际需求在my_task
函数中进行相应的处理。