在Airflow 2.6中,可以使用运行时配置来确定手动触发的DAG中的任务超时时间。以下是一个包含代码示例的解决方法:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2022, 1, 1),
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
def task1_function(**kwargs):
# 从运行时配置中获取任务超时时间
timeout = kwargs['dag_run'].conf.get('timeout')
# 在这里执行任务逻辑
# ...
with DAG('runtime_config_example', default_args=default_args, schedule_interval=None) as dag:
task1 = PythonOperator(
task_id='task1',
python_callable=task1_function,
provide_context=True
)
from airflow.api.client.local_client import Client
def trigger_dag_with_config(dag_id, run_id, conf):
client = Client(None, None)
client.trigger_dag(dag_id=dag_id, run_id=run_id, conf=conf)
dag_id = 'runtime_config_example'
run_id = 'manual_trigger'
conf = {'timeout': 3600} # 设置任务超时时间为3600秒
trigger_dag_with_config(dag_id, run_id, conf)
通过上述步骤,你可以在手动触发的DAG中使用运行时配置来确定任务超时时间。在任务的Python函数中,可以通过kwargs['dag_run'].conf.get('timeout')
来获取运行时配置中的超时时间。