Apache Airflow在版本1.10.14中引入了对pickle5的支持来提高序列化效率。如果你使用的是该版本或更高版本,则需要安装pickle5。
安装pickle5的方法可以使用pip命令:
pip install 'apache-airflow[pickle5]'
在使用Airflow配置celery时,也需要添加'pickle5'。例如:
from airflow import CeleryApp
app = CeleryApp('airflow.worker', broker='pyamqp://guest@localhost//')
app.conf.task_serializer = 'pickle'
app.conf.worker_prefetch_multiplier = 1
app.conf.broker_connection_retry = True
app.conf.broker_connection_max_retries = 0
app.conf.result_backend = 'db+postgresql://airflow@localhost:5432/airflow'
# 添加pickle5
app.conf.worker_enable_remote_control = False
app.conf.worker_send_task_events = True
app.conf.worker_connection_retry = True
app.conf.task_protocol = 5
app.start()
需要注意的是,在使用pickle5时,可能会遇到一些兼容性问题。因此,建议在测试环境中测试后再用于生产环境。