出现"Airflow DAG不断重试却没有显示任何错误"的情况可能是由于以下原因导致的:
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2022, 1, 1),
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('example_dag', default_args=default_args, schedule_interval='@daily')
task1 = BashOperator(
task_id='task1',
bash_command='command1',
retries=3,
retry_delay=timedelta(minutes=5),
dag=dag,
)
task2 = BashOperator(
task_id='task2',
bash_command='command2',
retries=3,
retry_delay=timedelta(minutes=5),
dag=dag,
)
task1 >> task2
在上面的例子中,任务task1和task2都设置了3次重试机会,每次重试间隔为5分钟。
on_failure_callback参数设置一个错误处理函数,该函数将在任务失败时被调用,并可以记录错误信息或采取其他操作。例如:def handle_failure(context):
# 记录错误信息
error_message = context.get('exception', '')
logging.error(f'Task failed: {error_message}')
dag = DAG('example_dag', default_args=default_args, schedule_interval='@daily', on_failure_callback=handle_failure)
在上面的例子中,handle_failure函数将在任务失败时被调用,并将错误信息记录到日志中。
INFO,则可能无法显示错误信息。可以在default_args中设置log_level参数来调整日志级别,例如:default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2022, 1, 1),
'retries': 3,
'retry_delay': timedelta(minutes=5),
'log_level': 'ERROR',
}
在上面的例子中,将日志级别设置为ERROR,以便只显示错误日志。
通过设置正确的重试策略、错误处理和日志级别,可以解决Airflow DAG不断重试却没有显示任何错误的问题。