这通常是由于Airflow在交互式会话(例如Jupyter Notebook)中运行时未能正确加载Airflow配置文件所致。解决此问题的一种方法是在交互式会话中手动加载Airflow配置文件,然后创建一个 DAG 对象并将其传递给 Scheduler。以下是一个示例:
import airflow
from airflow.models import DAG
# 加载Airflow默认配置
airflow_home = "/path/to/airflow/home"
airflow.cfg.load_config(airflow_home)
# 创建DAG
dag = DAG(
"my_dag",
description="My first DAG",
schedule_interval="*/5 * * * *",
start_date=datetime(2022, 1, 1),
catchup=False,
)
# 将DAG对象传递给Scheduler
scheduler = airflow.jobs.SchedulerJob(dag_id="my_dag", num_runs=1)
scheduler.run()
通过手动加载Airflow配置文件并创建 DAG 对象,这将确保Scheduler能够在交互式会话中正确解析和加载 DAG。