from google.cloud import storage
storage_client = storage.Client.from_service_account_json('gcp_credentials.json')
bucket = storage_client.get_bucket('your-bucket-name')
blob = bucket.blob('your-config-file.json')
config = blob.download_as_string()
其中,“gcp_credentials.json”是您的GCP凭据文件名,“your-bucket-name”是您的存储桶名称,“your-config-file.json”是您要下载的配置文件名。
from airflow.operators.python_operator import PythonOperator
from google.cloud import storage
def my_task():
storage_client = storage.Client.from_service_account_json('gcp_credentials.json')
bucket = storage_client.get_bucket('your-bucket-name')
blob = bucket.blob('your-config-file.json')
config = blob.download_as_string()
# do something with your config file
task = PythonOperator(
task_id='my_task',
python_callable=my_task,
dag=my_dag,
# set the working directory to the base directory of your DAGs
working_directory='/home/airflow/gcs/dags'
)
其中,“/home/airflow/gcs/dags”是您的DAGs基本目录。
通过这些步骤,您应该能够成功访问您存储在GCP存储中的配置文件,并在您的PythonOperator任务中使用它。