尝试将Airflow版本升级到最新版(目前是2.2.3)。该问题可能已经在新版本中得到修复。
如果您无法升级到最新版本,则可以尝试更改DAG配置以减少刷新TaskInstance的频率。例如,您可以增加DAG的默认开始日期或增加检查运行状态的间隔。
下面是一个示例代码:
from airflow import DAG from datetime import datetime, timedelta
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2021, 1, 1, 0, 0), # 增加默认开始日期 'retries': 1, 'retry_delay': timedelta(minutes=5) }
dag = DAG( 'my_dag', default_args=default_args, description='Description', schedule_interval='@hourly', catchup=False )
最后,您可以检查Airflow的配置文件,确保所有配置正确。例如,您可以检查执行器并发设置,以确保最大并发值不超过资源限制。
这是一个示例在Airflow配置中修改执行器并发设置的代码:
executor_config = { 'KubernetesExecutor': { 'request_memory': '2G', 'limit_memory': '2G', 'request_cpu': '1', 'limit_cpu': '1', 'gpus': '0' } }
请注意,上述解决方法中的代码示例仅供参考,并且可能需要根据您的需求进行修改。