这可能是由于Airflow无法检测到终结进程的状态而导致的。可以使用Airflow提供的kill_externally_launched_process
函数来解决这个问题。该函数可以在任务超时或通过其他方式终止进程。
示例代码:
from airflow.operators.bash_operator import BashOperator
def my_func(ds, **kwargs):
# 启动外部进程
p = subprocess.Popen(['/path/to/my/script'], stdout=subprocess.PIPE)
# 设置回调函数
airflow.utils.dag_processing.kill_externally_launched_process(p.pid, timeout=60, task_id=kwargs['task'].task_id)
# 等待进程完成
p.wait()
task = BashOperator(
task_id='my_task',
bash_command=my_func,
dag=dag)
在以上示例代码中,通过调用kill_externally_launched_process
方法,可以指定任务的超时时间和任务的ID,以便在任务完成后终止进程。在执行任务时,也可使用回调函数确保任务完成后终止进程。