在Airflow中,可以通过Python代码将参数传递给PostgresOperator。以下是一个示例解决方法:
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from datetime import datetime
default_args = {
'start_date': datetime(2021, 1, 1),
}
with DAG('example_dag', default_args=default_args, schedule_interval='@once') as dag:
hotel_ids = ['hotel1', 'hotel2', 'hotel3'] # 传递给PostgresOperator的参数
# 定义一个任务,将hotel_ids作为参数传递给PostgresOperator
task = PostgresOperator(
task_id='example_task',
sql='SELECT * FROM hotels WHERE hotel_id IN %(hotel_ids)s;', # 查询语句,使用%(hotel_ids)s作为参数占位符
params={'hotel_ids': hotel_ids}, # 将hotel_ids作为参数传递给PostgresOperator
postgres_conn_id='postgres_default' # 设置PostgreSQL连接
)
在上面的示例中,我们定义了一个DAG,并在hotel_ids
变量中设置了要传递给PostgresOperator的参数。然后,在PostgresOperator中的params
参数中将hotel_ids
作为字典传递。在SQL查询语句中,我们使用%(hotel_ids)s
作为参数占位符。
这样,当任务运行时,hotel_ids
参数将被传递给PostgresOperator,并在查询语句中使用。