Airflow提供了丰富的事件记录机制,可以让我们方便地追溯任务的执行历史以及监控任务运行状态。通过使用Airflow的API或Web UI可以获取到Airflow生成的事件列表。下面是获取事件列表的Python代码示例:
from airflow import settings
from airflow import models
from airflow.utils.state import State
def get_event_logs():
session = settings.Session()
event_logs = (
session.query(models.TaskInstance)
.filter(models.TaskInstance.state == State.SUCCESS)
.order_by(models.TaskInstance.execution_date.desc())
.limit(10)
.all()
)
return event_logs
event_logs = get_event_logs()
for log in event_logs:
print(log)
上面的代码会获取Airflow中最近10个执行成功的任务实例对象,并打印出这些对象的信息。我们可以根据自己的需求定制自己的查询条件,例如查找状态为failed的任务实例,或者按照task_id进行过滤等等。