在创建Airflow RedshiftSQLOperator对象时,在传入SQL语句时使用jinja2的模板渲染函数render_template(),避免Airflow渲染SQL时产生不同的代码。示例代码如下:
from airflow.providers.amazon.aws.operators.redshift import RedshiftSQLOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
from airflow.models import DAG
from jinja2 import Template
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2022, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('example_redshift_operator',
default_args=default_args,
description='Example Redshift SQL operator',
schedule_interval=timedelta(days=1))
sql_template = Template("""
SELECT * FROM my_table WHERE date > '{{ ds }}'
""")
sql = sql_template.render()
redshift_op = RedshiftSQLOperator(
task_id='redshift_example',
sql=sql,
dag=dag
)
dummy_op = DummyOperator(task_id='dummy_example', dag=dag)
redshift_op >> dummy_op