在Airflow中,可以使用EmailOperator来发送任务执行后的结果信息。有时候,我们需要将pandas生成的文件作为附件添加到邮件中发送。以下是一个示例,展示如何将pandas DataFrame生成的csv文件作为附件添加到Airflow的EmailOperator中。
首先,我们需要定义一个生成csv文件的pandas DataFrame:
import pandas as pd
data = {'name': ['Alice', 'Bob', 'Charlie'],
'age': [25, 30, 35]}
df = pd.DataFrame(data)
df.to_csv('/path/to/csv/file.csv', index=False)
然后,在Airflow的DAG文件中定义一个EmailOperator,指定要发送的邮件文本和附件列表:
from airflow.operators.email_operator import EmailOperator
from airflow.utils.email import send_email
csv_file_path = '/path/to/csv/file.csv'
with open(csv_file_path, 'rb') as f:
csv_content = f.read()
email_operator = EmailOperator(
task_id='send_email',
to='receiver@example.com',
subject='Airflow - Attach file to EmailOperator from pandas',
html_content='Hi,
Please find attached csv file generated from pandas.
',
files=[{
'filename': 'pandas_dataframe.csv',
'content': csv_content,
'mimetype': 'text/csv'
}]
)
email_operator.execute(context={})
在上面的示例中,我们打开csv文件并读取其内容,然后将其添加到EmailOperator的files参数中作为附件。附件的mimetype为"text/csv",表示它是一个csv文件。
最后,我们使用execute方法来执行EmailOperator任务,并指定上下文。这样,当任务执行时,pandas DataFrame生成的csv文件将被作为附件添加到邮件中并发送给指定接收者。