以下是一个使用Airflow将数据写入Google Cloud Storage(GCS)的任务的示例代码:
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.gcs_operator import GoogleCloudStorageUploadOperator
# 定义DAG
dag = DAG(
'write_to_gcs',
description='Write data to Google Cloud Storage',
schedule_interval='0 0 * * *',
start_date=datetime(2022, 1, 1),
catchup=False
)
# 定义任务
write_to_local_file = BashOperator(
task_id='write_to_local_file',
bash_command='echo "Data to be written" > /tmp/data.txt',
dag=dag
)
write_to_gcs = GoogleCloudStorageUploadOperator(
task_id='write_to_gcs',
bucket='my-gcs-bucket',
object='data.txt',
filename='/tmp/data.txt',
google_cloud_storage_conn_id='google_cloud_default',
dag=dag
)
# 定义任务之间的依赖关系
write_to_gcs.set_upstream(write_to_local_file)
在此示例中,我们定义了一个DAG(有关DAG的更多信息,请参阅Airflow文档)。我们有两个任务:
write_to_local_file
任务使用BashOperator运行一个命令,将数据写入本地文件/tmp/data.txt
。write_to_gcs
任务使用GoogleCloudStorageUploadOperator
将本地文件/tmp/data.txt
上传到指定的GCS存储桶中的data.txt
对象。你需要将示例代码中的my-gcs-bucket
替换为你的GCS存储桶名称,并确保你的Airflow安装已正确配置Google Cloud Storage连接(google_cloud_default
)。
你可以将此示例代码保存为Python文件,并在Airflow中运行该文件以创建和运行此任务。