当Airflow在读取DAG任务的日志时,由于网络或其他原因可能会导致失败。可以通过以下代码示例解决此问题:
增加dag中的参数logging_level,例如:
from datetime import datetime, timedelta
from airflow import DAG from airflow.operators.bash_operator import BashOperator
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2022, 1, 1), 'retries': 1, 'retry_delay': timedelta(minutes=5), 'logging_level': "INFO" }
dag = DAG('example_dag', default_args=default_args)
t1 = BashOperator( task_id='task_1', bash_command='echo "Hello World"', dag=dag )
t2 = BashOperator( task_id='task_2', bash_command='exit 1', dag=dag )
t3 = BashOperator( task_id='task_3', bash_command='echo "Done"', dag=dag )
t1 >> t2 >> t3
在airflow.cfg中,增加或修改log_fetch_timeout参数,例如:
[webserver] log_fetch_timeout = 60
[scheduler] log_fetch_timeout = 60
[task_logging] log_fetch_timeout = 60
[process_logging] log_fetch_timeout = 60
在Airflow Webserver和Scheduler服务中重启Airflow。
这样就能解决Airflow获取日志文件失败的问题。