要从S3存储桶下载PDF文件到Airflow,您可以使用S3Hook
来连接到S3,并使用S3Hook.download_file()
方法下载文件。以下是一个示例解决方案:
from airflow import DAG
from airflow.hooks.S3_hook import S3Hook
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 1, 1),
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
def download_pdf_from_s3():
s3_hook = S3Hook(aws_conn_id='aws_default') # 用您自己的AWS连接ID初始化S3Hook
# 下载PDF文件
s3_bucket = 'your-s3-bucket' # 替换为您的S3存储桶名称
s3_key = 'path/to/pdf/file.pdf' # 替换为PDF文件在S3中的路径
local_path = '/path/to/save/file.pdf' # 替换为本地保存文件的路径
s3_hook.download_file(s3_bucket, s3_key, local_path)
print(f"Successfully downloaded PDF file from S3 to {local_path}")
with DAG('download_pdf_from_s3', default_args=default_args, schedule_interval=None) as dag:
task = PythonOperator(
task_id='download_pdf',
python_callable=download_pdf_from_s3,
)
请注意,您需要将your-s3-bucket
替换为您的S3存储桶名称,path/to/pdf/file.pdf
替换为PDF文件在S3中的路径,并将/path/to/save/file.pdf
替换为本地保存文件的路径。
此示例DAG将创建一个名为download_pdf_from_s3
的DAG,并包含一个名为download_pdf
的任务。当DAG运行时,它将调用download_pdf_from_s3
函数来下载PDF文件。您可以根据需要将此DAG与其他任务和操作符组合在一起。