如果使用 Airflow 中的 DockerOperator 执行任务时出现超时问题,可以通过以下代码示例解决:
from datetime import timedelta
from airflow import DAG
from airflow.operators.docker_operator import DockerOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 8, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'docker_timeout_example',
default_args=default_args,
schedule_interval=None,
)
docker_task = DockerOperator(
task_id='docker_task',
image='my_image:latest',
api_version='auto',
auto_remove=True,
docker_url='unix://var/run/docker.sock',
command='/bin/sleep 600',
dag=dag,
)
docker_task.execution_timeout = timedelta(minutes=10)
在上面的代码示例中,我们给 DockerOperator 的 execution_timeout
属性设置了 10 分钟,这意味着如果任何 Docker 操作(在此示例中是 /bin/sleep 600
)超过 10 分钟,则该任务将被认为已超时并将失败。
您可以将这个超时值设置为您任务的预期超时时间。