Airflow DAG可以通过使用Variable
对象来获取配置文件的路径,并使用Python的json
模块来加载配置文件的内容。以下是一个示例代码,演示如何从本地目录中获取配置JSON文件:
import os
import json
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
def load_config_file():
# 获取配置文件路径
config_file_path = Variable.get("config_file_path")
# 检查配置文件是否存在
if not os.path.exists(config_file_path):
raise FileNotFoundError(f"Config file '{config_file_path}' not found.")
# 加载配置文件内容
with open(config_file_path, 'r') as file:
config = json.load(file)
# 打印配置文件内容
print(config)
# 创建DAG
with DAG(
'example_dag',
schedule_interval=None,
start_date=days_ago(1),
catchup=False
) as dag:
# 创建PythonOperator任务,用于加载配置文件
load_config_task = PythonOperator(
task_id='load_config_file',
python_callable=load_config_file
)
在上面的示例中,load_config_file
函数使用Variable.get
方法来获取配置文件路径,该路径在Airflow的变量中进行了配置。然后,使用os.path.exists
方法检查配置文件是否存在,如果不存在,则抛出FileNotFoundError
异常。
然后,使用open
函数打开配置文件,并使用json.load
方法加载配置文件的内容。最后,你可以根据需要处理配置文件的内容。
在DAG的定义中,创建了一个PythonOperator
任务,用于执行load_config_file
函数。你可以根据需要将此任务添加到DAG中的其他任务之前或之后。