在使用Airflow的CeleryExecutor时,无法直接从airflow.cfg中选择设置给Celery的配置。但是可以通过使用环境变量或修改Celery配置文件来解决这个问题。下面提供了两种解决方法的示例代码:
# 在airflow.cfg中设置Celery的配置选项
# celery_config = /path/to/celery_config.py
# 在celery_config.py文件中设置Celery的配置
# CELERY_CONFIG = {
# 'broker_url': 'redis://localhost:6379/0',
# 'result_backend': 'redis://localhost:6379/0',
# 'task_serializer': 'json',
# ...
# }
# 在启动Airflow时,通过环境变量设置Celery配置
export AIRFLOW__CELERY__BROKER_URL=redis://localhost:6379/0
export AIRFLOW__CELERY__RESULT_BACKEND=redis://localhost:6379/0
export AIRFLOW__CELERY__TASK_SERIALIZER=json
...
# 启动Airflow
airflow scheduler
# 在airflow.cfg中设置Celery的配置选项
# celery_config = /path/to/celery_config.py
# 在celery_config.py文件中设置Celery的配置
# CELERY_CONFIG = {
# 'broker_url': 'redis://localhost:6379/0',
# 'result_backend': 'redis://localhost:6379/0',
# 'task_serializer': 'json',
# ...
# }
# 修改Celery的配置文件,将Celery的配置选项写入配置文件
# /path/to/celery_config.py
# [celery]
# broker_url = redis://localhost:6379/0
# result_backend = redis://localhost:6379/0
# task_serializer = json
# ...
# 启动Airflow
airflow scheduler
通过以上两种方法,可以将Celery的配置选项传递给Airflow的CeleryExecutor,并使其生效。
上一篇:Airflow使用BashOperator出现了ModuleNotFoundError错误。
下一篇:Airflow使用KubernetesExecutor在KubernetesJobWatcher中出现未知错误。