在 Airflow 出现之前,常用的数据流程管理工具包括 Oozie、Luigi、Azkaban 等。这些工具通常使用 XML 或 JSON 格式定义工作流程,所有的任务都在同一个文件中定义。以 Oozie 为例,以下是一个 XML 文件的示例:
${jobTracker}
${nameNode}
com.examples.Task1
input
output1
${jobTracker}
${nameNode}
com.examples.Task2
output1
output2
Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
这个文件定义了由两个任务(task1 和 task2)组成的工作流。数据的输入和输出都在任务的参数中定义。在 Oozie 中,可以通过 Web UI 或命令行启动工作流程,并且可以查看工作流程运行的日志。
Python 的 Luigi 也提供了类似的功能。以下是一个示例:
import luigi
class Task1(luigi.Task):
def requires(self):
return None
def output(self):
return luigi.LocalTarget('output1')
def run(self):
# do something here
class Task2(luigi.Task):
def requires(self):
return Task1()
def output(self):
return luigi.LocalTarget('output2')
def run(self):
# do something here
if __name__ == '__main__':
luigi.run()
这个代码文件定义了两个任务(Task1 和 Task2),其中 Task2 需要依赖于 Task1 的输出。