要在Airflow中使用EmailOperator发送邮件之前下载文件,可以使用Python的requests库进行文件下载。下面是一个示例代码:
import os
import requests
from airflow import DAG
from airflow.operators.email_operator import EmailOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'start_date': days_ago(1)
}
dag = DAG(
'download_and_email',
default_args=default_args,
schedule_interval='@once',
catchup=False
)
def download_file(url, save_path):
response = requests.get(url)
with open(save_path, 'wb') as file:
file.write(response.content)
download_task = PythonOperator(
task_id='download_file',
python_callable=download_file,
op_kwargs={'url': 'http://example.com/file.txt', 'save_path': '/path/to/save/file.txt'},
dag=dag
)
email_task = EmailOperator(
task_id='send_email',
to='user@example.com',
subject='File Downloaded',
html_content='The file has been downloaded.',
files=['/path/to/save/file.txt'],
dag=dag
)
download_task >> email_task
上述代码中,首先定义了一个download_file
函数,该函数使用requests库从指定的URL下载文件,并保存到指定的本地路径。
然后,创建了一个download_task
,使用PythonOperator将download_file
函数作为可调用的Python函数,并通过op_kwargs
参数传递URL和保存路径。
接下来,创建了一个email_task
,使用EmailOperator发送邮件。在files
参数中指定了要附加的文件路径。
最后,通过将download_task
和email_task
连接起来,形成任务依赖关系。
你可以根据实际需求修改代码中的URL、保存路径、收件人地址、邮件主题和正文等内容。