在Airflow中使用PostgresToGoogleCloudStorageOperator时可能遇到身份验证错误,这是因为Google Cloud Storage API需要使用Google Cloud凭证进行身份验证。
解决方法如下所示:
from airflow.operators.postgres_to_gcs import PostgresToGoogleCloudStorageOperator
from datetime import datetime
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/path/to/your/cred.json'
dag = DAG(
dag_id='my_dag',
start_date=datetime(2021, 1, 1),
schedule_interval='@once'
)
pg_to_gcs = PostgresToGoogleCloudStorageOperator(
task_id='postgres_to_gcs',
sql='SELECT * FROM my_table',
filename='my_table_{{ ds }}.json',
bucket='my_bucket',
postgres_conn_id='my_postgres_conn',
google_cloud_storage_conn_id='my_gcp_conn',
dag=dag
)
在上面的示例中,'my_gcp_conn'是新的连接ID。'my_postgres_conn'是您在Airflow中定义的PostgreSQL连接ID。'/path/to/your/cred.json'是您的凭证JSON文件路径。
请确保JSON文件位于您的Airflow服务器上,并将正确的文件路径传递给环境变量'GOOGLE_APPLICATION_CREDENTIALS'。
这样您就可以使用PostgresToGoogleCloudStorageOperator上传PostgreSQL查询结果到Google Cloud Storage了。