在Airflow配置文件中添加以下配置:
[scheduler]
catchup_by_default = False
dag_dir_list_interval = 300
[celery]
task_always_eager = False
worker_prefetch_multiplier = 1
然后重新启动scheduler和worker。此外,我们可以将以下代码片段添加到每个任务的PythonOperator中,以避免任务僵尸化:
import psutil
import os
def kill_on_zombie(p):
current_process = psutil.Process(os.getpid())
children = current_process.children(recursive=True)
for child in children:
if child.status() == psutil.STATUS_ZOMBIE:
child.kill()
def my_task():
kill_on_zombie(os.getpid())
# Your task code here
这段代码将检查每个Python任务的子进程,如果有Zombie进程存在,则杀死所有Zombie进程。这有助于防止Zombie任务积累并导致Airflow崩溃。
上一篇:Airflow1.10.15-HooksthatrunDBfunctionsmustinheritfromDBApiHook
下一篇:Airflow1.10.9-无法从pandas_gbq.gbq中导入名为'_check_google_client_version'的名称