可以使用Airflow的macros和Snowflake Operator来实现。在SnowflakeOperator参数中设置SQL查询语句,使用Airflow的macros将执行日期传递到SQL查询语句中。
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.snowflake_operator import SnowflakeOperator
from airflow.macros import ds_format
dag = DAG(
dag_id='example_snowflake_operator',
start_date=datetime(2022, 1, 1),
schedule_interval='@daily',
catchup=False
)
snowflake_op = SnowflakeOperator(
task_id='snowflake_query',
snowflake_conn_id='snowflake_conn',
sql='SELECT * FROM my_table WHERE DATE_COLUMN = {{ ds }}',
dag=dag
)
上述代码中,SnowflakeOperator的参数sql使用了Airflow的macro函数ds_format(),将数据源(ds)格式化成 Snowflake 接受的日期格式,然后传递到SQL查询语句中。这样,在每个调度周期期间,Airflow将在运行SQL查询之前将执行日期传递给SnowflakeOperator。通过这种方式,即可将Airflow中的执行日期传递到SQL参数中,以便在运行查询时使用。
上一篇:Airflow中的延迟通知系统
下一篇:Airflow中的自定义日志记录