可能是由于以下原因:
解决方法如下:
在环境变量中添加:
export AWS_ACCESS_KEY_ID=
在DAG定义中添加:
my_operator=S3ListOperator( task_id='s3_list_operator', bucket='myS3bucket', aws_conn_id='my_aws_connection', prefix='my_folder/', delimiter='/' )
确保存储桶/路径存在。可以使用AWS S3控制台或AWS CLI进行验证。
如果以上两个步骤都没有问题,可能是由于Airflow的版本不兼容导致的。在新版Airflow中可能存在有关S3的bug,可以尝试降级Airflow版本或使用其他方式实现S3文件列出操作。例如:
from airflow.contrib.hooks.aws_hook import AwsHook from airflow.operators.python_operator import PythonOperator
def list_s3_files(): aws_hook = AwsHook(aws_conn_id='my_aws_connection') s3 = aws_hook.get_client_type('s3') response = s3.list_objects_v2( Bucket='myS3bucket', Prefix='my_folder/', Delimiter='/' ) for content in response.get('Contents', []): print(content.get('Key'))
my_operator = PythonOperator( task_id='s3_list_operator', python_callable=list_s3_files, dag=my_dag )
上述方法可以通过PythonOperator和Boto3 SDK来实现S3文件列表。