是的,Airflow支持使用cloud-sql-python-connector连接到CloudSQL Postgres。要使用此连接器,请执行以下步骤:
pip install cloud-sql-python-connector
from airflow import DAG
from datetime import datetime
from airflow.operators.postgres_operator import PostgresOperator
from airflow.contrib.hooks.gcp_sql_hook import GcSQLHook
dag = DAG('cloudsql_dag', description='Cloud SQL DAG', schedule_interval='@hourly', start_date=datetime(2022, 1, 1))
# Define CloudSQL PostgreSQL connection
gcsql_hook = GcSQLHook(instance='YOUR_CLOUDSQL_INSTANCE_NAME',
schema='YOUR_DATABASE_NAME',
conn_id='cloudsql_conn')
# Define PostgreSQL Operator to run SQL queries
run_query = PostgresOperator(
task_id='postgres_task',
sql="""SELECT * FROM YOUR_TABLE""",
postgres_conn_id='cloudsql_conn',
dag=dag)
run_query
在上面的代码中,我们首先导入了dag库和postgres_operator运算符,然后定义了一个CloudSQL连接器对象,该对象使用GcSQLHook从CloudSQL获取连接标识符。然后我们定义了一个PostgreSQL运算符任务(run_query),它使用postgres_conn_id属性引用在gcsql_hook中定义的连接,并在PostgreSQL中运行SELECT查询。最后,在DAG定义中包含我们的PostgreSQL任务(run_query)。