在Airflow中,可以使用Google Cloud Operators(例如GoogleCloudStorageListOperator
)来列出Google Cloud存储桶中的文件列表。然后,您可以使用xcom_pull
方法来检索列表并进行迭代。
然而,由于xcom_pull
方法返回的是序列化的结果(默认为字符串),当您尝试迭代该结果时会遇到问题。为了解决这个问题,您可以使用json.loads
方法将结果反序列化为Python对象。
下面是一个示例代码,展示了如何解决这个问题:
from airflow import DAG
from airflow.operators import PythonOperator
from airflow.contrib.operators.gcs_operator import GoogleCloudStorageListOperator
from airflow.operators.python_operator import PythonOperator
import json
def print_file_list(**kwargs):
ti = kwargs['ti']
file_list = ti.xcom_pull(task_ids='list_files_task')
file_list = json.loads(file_list) # 反序列化为Python对象
for file in file_list:
print(file)
with DAG('gcs_file_list', default_args=default_args, schedule_interval=None) as dag:
list_files_task = GoogleCloudStorageListOperator(
task_id='list_files_task',
bucket='your_bucket_name',
prefix='your_prefix',
google_cloud_storage_conn_id='your_conn_id'
)
print_files_task = PythonOperator(
task_id='print_files_task',
python_callable=print_file_list,
provide_context=True
)
list_files_task >> print_files_task
在上面的例子中,首先使用GoogleCloudStorageListOperator
列出了Google Cloud存储桶中的文件列表,并将结果存储在XCom中。然后,使用PythonOperator
调用print_file_list
函数来迭代xcom_pull
返回的列表。在函数内部,我们使用json.loads
方法将序列化的结果反序列化为Python对象,并进行迭代处理。
请注意,您需要将your_bucket_name
替换为您的实际存储桶名称,将your_prefix
替换为您的实际前缀,并将your_conn_id
替换为您的实际Google Cloud连接ID。