要在Airflow中从Python函数传递参数给MySQL Operator,可以使用Airflow的XCom功能。以下是一个解决方法的示例代码:
from airflow import DAG
from airflow.operators.mysql_operator import MySqlOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
default_args = {
'start_date': datetime(2021, 1, 1)
}
def my_python_function(**kwargs):
# 从kwargs中获取参数
my_parameter = kwargs['params']['my_parameter']
# 执行你的Python代码
print(f"My parameter is: {my_parameter}")
with DAG('my_dag', schedule_interval='@once', default_args=default_args) as dag:
# 定义Python Operator
python_task = PythonOperator(
task_id='my_python_task',
python_callable=my_python_function,
provide_context=True,
op_kwargs={'my_parameter': 'hello'}
)
# 定义MySQL Operator,并将参数从Python函数传递给SQL模板
mysql_task = MySqlOperator(
task_id='my_mysql_task',
sql='SELECT * FROM my_table WHERE my_column = {{ params.my_parameter }}',
dag=dag
)
# 设置任务之间的依赖关系
python_task >> mysql_task
在上面的示例代码中,首先定义了一个Python函数my_python_function
,它接受一个参数my_parameter
。然后,在DAG中定义了一个Python Operator python_task
,通过op_kwargs
参数将参数传递给函数。接下来,定义了一个MySQL Operator mysql_task
,其中的SQL模板中使用了{{ params.my_parameter }}
来引用传递的参数。
最后,通过将python_task
设置为mysql_task
的依赖任务,确保在执行MySQL任务之前先执行Python任务。
这样,当DAG运行时,Python函数将接收到传递的参数,并可以在函数中使用。