要使用Airflow将最新的文件从GCS存储桶复制到本地,可以使用GoogleCloudStorageToLocalFilesystemOperator操作符。以下是一个包含代码示例的解决方法:
from airflow import DAG
from airflow.contrib.operators.gcs_to_local import GoogleCloudStorageToLocalFilesystemOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('copy_file_from_gcs', default_args=default_args, schedule_interval=None)
copy_task = GoogleCloudStorageToLocalFilesystemOperator(
task_id='copy_file',
bucket='your-gcs-bucket',
object='path/to/your/file',
filename='/path/to/save/file',
google_cloud_storage_conn_id='google_cloud_default',
dag=dag
)
copy_task
请确保替换代码示例中的以下值:
'your-gcs-bucket'
:您的GCS存储桶的名称。'path/to/your/file'
:要复制的文件在GCS存储桶中的路径。'/path/to/save/file'
:要保存文件的本地路径。'google_cloud_default'
:GCS连接的名称。您需要在Airflow中配置此连接。此代码将创建一个DAG,其中使用GoogleCloudStorageToLocalFilesystemOperator操作符定义了一个任务。该任务将从指定的GCS存储桶复制最新的文件到指定的本地路径。