确认 AWS S3 的连接设置正确,包括 region 和 bucket 名称等。
检查 Airflow 是否安装了 boto3 库,如果没有安装,需使用以下命令安装:
pip install boto3
使用以下代码示例测试连接 AWS S3:
import boto3
s3 = boto3.resource('s3', region_name='your_region')
bucket = s3.Bucket('your_bucket_name')
for obj in bucket.objects.all():
print(obj.key)
如果没有报错则说明连接成功。
确认 Airflow 的 credentials 配置正确,在 DAG 文件中引用的 credentials_id 必须和 Airflow 的 credentials 名称一致。示例代码如下:
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
dag = DAG(
dag_id="example_dag",
schedule_interval="0 0 * * *",
start_date=datetime(2021, 1, 1),
catchup=False
)
def test_aws_s3_connection():
import boto3
s3 = boto3.client('s3')
response = s3.list_buckets()
print(response)
task = PythonOperator(
task_id="test_aws_s3_connection",
python_callable=test_aws_s3_connection,
dag=dag
)
在 Airflow 的 Web UI 中配置 AWS S3 的 credentials,然后让 credentials_id 与脚本中调用的 s3 对象的 region_name 参数相同即可成功连接。
上一篇:ApacheAirflow-HttpHook与直接使用Python的requests库,以及它们与DAG刷新的关系。
下一篇:ApacheAirflow-ModuleNotFoundError:Nomodulenamed'selenium'