这个问题通常是由于Airflow的默认超时时间导致的。可以通过在DAG文件中设置任务的超时时间来避免这个问题。例如,在一个DAG文件中,将task_timeout参数设置为3600秒(1小时):
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'retries': 0,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(seconds=3600)
}
dag = DAG('long_running_job', default_args=default_args)
def my_long_running_task():
# some long-running task code
long_running_task = PythonOperator(
task_id='long_running_task',
python_callable=my_long_running_task,
dag=dag
)
long_running_task
在上面的示例中,我们将execution_timeout参数设置为3600秒,以防止Airflow长时间运行的任务在1小时后被杀死,但任务仍处于进行中的状态。