要通过Airflow API获取DAG运行的输出,可以使用以下步骤:
首先,确保你已经安装了Airflow和相关的依赖。
在你的代码中导入所需的模块和类:
from airflow.api.client.local_client import Client
from airflow.models import TaskInstance
client = Client(api_base_url='http://localhost:8080/api/v1')
请确保将api_base_url
替换为你的Airflow Web服务器的API地址。
dag_id = 'your_dag_id'
execution_date = '2022-01-01T00:00:00+00:00'
dag_runs = client.get_dag_runs(dag_id=dag_id, execution_date=execution_date)
for dag_run in dag_runs:
task_instances = client.get_task_instances(dag_id=dag_id, dag_run_id=dag_run['id'])
for task_instance in task_instances:
task_instance_obj = TaskInstance(task_instance['task_id'], task_instance['dag_id'], task_instance['execution_date'])
logs = task_instance_obj.get_raw_log_url()
print(logs)
请将your_dag_id
替换为你要获取输出的DAG的ID,将execution_date
替换为你要获取输出的DAG运行的执行日期。
这样,你就可以通过Airflow API获取DAG运行的输出。根据需要,你可以进一步处理输出日志URL,如下载日志文件或提取有用的信息。