我们可以尝试降低Airflow版本或使用以下代码作为解决方案:
在dag中使用 dagbag.import_errors
增加日志级别以获取更多有用的信息,然后解决dag中的问题。代码示例如下:
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
def my_func():
return "Hello world!"
with DAG("my_dag"):
task = PythonOperator(
task_id="my_task",
python_callable=my_func,
)
dag = task.dag
for task_id in ["task_2", "task_3"]:
dag.add_task(PythonOperator(task_id=task_id, python_callable=my_func))
if dag.import_errors:
logger.warning(f"Failed to import DAG: {dag.import_failed_exception}")
for filename, stacktrace in dag.import_errors.items():
logger.error(f"{filename}: {stacktrace}")
此代码将在dag导入期间记录日志级别,可以显示导致动态生成任务失败的问题以及更多有用的信息。在排除这些问题后,甘特图应该能够正确地显示任务。