Airflow 2提供了TaskFlow API,可以用于构建灵活的任务流程。在使用TaskFlow时,需要添加适当的日志记录来跟踪任务的执行。下面是一个示例,展示如何在TaskFlow中添加日志记录:
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from taskflow import Task, TaskFlow, log
@task
def task1():
log.info("Running task 1")
@task
def task2():
log.info("Running task 2")
@task
def task3():
log.info("Running task 3")
@task
def task4():
log.info("Running task 4")
@dag(default_args={"owner": "airflow", "start_date": days_ago(1)})
def my_dag():
with TaskFlow("my_task_flow") as flow:
t1 = Task(task1())
t2 = Task(task2())
t3 = Task(task3())
t4 = Task(task4())
t1 >> t2 >> t3 >> t4
submit_spark_job = SparkSubmitOperator(
task_id="submit_spark_job",
application="my_spark_job.py",
conn_id="spark_default",
verbose=False,
)
flow >> submit_spark_job
dag = my_dag()
在这个示例中,我们使用TaskFlow构建了一个包含四个任务的任务流程。每个任务都使用log.info()添加了一个简单的日志记录。当TaskFlow执行时,这些日志记录将被写入Airflow的日志文件中。最后,我们添加了一个SparkSubmitOperator,该操作符将启动一个Spark作业。我们将前面构建的任务流程(flow)作为SparkSubmitOperator的前置任务,这意味着我们的任务流程将运行完毕后,才会执行Spark作业。
使用上面的方法,在Airflow 2中可以很容易地添加日志记录,并