这通常是由于DAG中的任务数量过多或数据文件过大导致的。一种解决方法是使用Airflow的分批次模式(chunking)。分批次模式可以将大的数据文件拆分为小的块并逐个处理,从而减少内存开销。以下是使用分批次模式处理CSV文件的示例代码:
import pandas as pd
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class CsvToDatabaseOperator(BaseOperator):
"""
将CSV文件逐块读取并插入数据库中。
"""
@apply_defaults
def __init__(self, csv_file_path, database_conn_id, table_name, batch_size=1000, *args, **kwargs):
super().__init__(*args, **kwargs)
self.csv_file_path = csv_file_path
self.database_conn_id = database_conn_id
self.table_name = table_name
self.batch_size = batch_size
def execute(self, context):
with open(self.csv_file_path) as f:
reader = pd.read_csv(f, chunksize=self.batch_size)
for chunk in reader:
# 插入数据库中
chunk.to_sql(name=self.table_name, con=self.database_conn_id, if_exists='append')
在DAG中,您可以使用上述自定义运算符将CSV文件插入数据库中:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from my_operators import CsvToDatabaseOperator
dag = DAG('example_dag', start_date=datetime.datetime.now())
# 定义任务
start_task = DummyOperator(task_id='start_task', dag=dag)
csv_to_db_task = CsvToDatabaseOperator(
task_id='csv_to_db_task',
csv_file_path='file.csv',
database_conn_id='my_database',
table_name='my_table',
batch_size=1000,
dag=dag,
)
end_task = DummyOperator(task_id='end_task', dag=dag)
# 定义任务顺序
start_task >> csv_to_db