Airflow支持使用Operator执行自定义的SQL语句。可以使用BashOperator或PythonOperator来执行这些SQL脚本。
以下是一个使用PythonOperator执行SQL脚本的示例:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import psycopg2
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2019, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('example_sql_dag',
default_args=default_args,
schedule_interval=timedelta(days=1))
def run_sql():
conn = psycopg2.connect(
host='localhost',
port='5432',
user='postgres',
password='postgres',
database='mydb'
)
cur = conn.cursor()
cur.execute(open('/path/to/sql_script.sql', 'r').read())
conn.commit()
cur.close()
conn.close()
run_sql_task = PythonOperator(
task_id='run_sql_task',
python_callable=run_sql,
dag=dag
)
run_sql_task
在此示例中,PythonOperator使用psycopg2模块从PostgreSQL数据库连接并执行SQL脚本。 还需要将要执行的SQL脚本放在脚本的路径。
其他的Operator,例如BashOperator或MySQLOperator也可以用于执行SQL脚本。 只需要将脚本路径传递给Operator即可。