通过设置 Dag 的超时时间来解决此问题,具体实现可参考以下代码示例:
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def my_task():
# Do something...
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 6, 1),
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=1),
# 设置超时时间为10小时
'execution_timeout': timedelta(hours=10)
}
dag = DAG(
'my_dag',
default_args=default_args,
description='My DAG',
schedule_interval='0 0 * * *'
)
my_task = PythonOperator(
task_id='my_task',
python_callable=my_task,
dag=dag
)
在上述的示例代码中,我们通过在 dag 的 default_args 中设置了 execution_timeout 来指定了该 dag 的超时时间为10小时。如果任务运行时间超过了10小时,airflow 会自动将该任务标记为失败。
上一篇:AirflowDAG序列化缓存
下一篇:Airflowdag运行永不结束