问题可能是由于Airflow 2的默认Executor更改为“SequentialExecutor”,导致在Executor调度器中出现延迟。因此,需要更改为“LocalExecutor”或“CeleryExecutor”来解决该问题。
要更改为“LocalExecutor”:
在Airflow配置文件中,找到“executor =”一行,并将其更改为“executor = LocalExecutor”。
要更改为“CeleryExecutor”:
1.安装Celery库:
pip install celery
2.在Airflow配置文件中,找到“executor =”一行,并将其更改为“executor = CeleryExecutor”。
3.在Airflow目录下创建celery.py文件:
from airflow.executors.celery_executor import app as celery_app
celery = celery_app.conf.update( task_routes={ 'my_dag_id': { 'my_task_id': 'my_worker_name' } } )
4.将Celery worker绑定到队列:
celery worker -Queues my_worker_name
以上方法可以解决升级后Airflow 2无法执行任务的问题。