这个错误是因为Airflow 2.5.3版本中的DataframeToParquetOperator
任务中的s3
参数的默认值设置为了parquet
,而在该版本中,parquet
不再是有效的源格式。
为了解决这个问题,你可以手动将DataframeToParquetOperator
的s3
参数设置为有效的源格式。
以下是一个示例代码:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dataframe_plugin import DataframeToParquetOperator
import pandas as pd
def process_data():
# 处理数据的逻辑
df = pd.DataFrame({'col1': [1, 2, 3], 'col2': ['a', 'b', 'c']})
return df
default_args = {
'start_date': airflow.utils.dates.days_ago(1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG('example_dag', default_args=default_args, schedule_interval=None) as dag:
task1 = PythonOperator(
task_id='process_data',
python_callable=process_data
)
task2 = DataframeToParquetOperator(
task_id='save_to_parquet',
dataframe_task_id='process_data',
s3='csv', # 设置为有效的源格式,如csv
s3_bucket='your-s3-bucket',
s3_key='your-s3-key',
s3_filename='output.parquet'
)
task1 >> task2
在上面的代码中,将s3
参数的值设置为有效的源格式,如csv
。你还需要提供S3存储桶的名称、S3键和输出文件名。
确保使用正确的参数配置,并将其集成到你的Airflow DAG中,以便解决这个问题。