在Airflow中,可以通过SubDagOperator来实现任务的重用。SubDagOperator允许将一组任务封装在一个子DAG中,并在主DAG中多次重用该子DAG。
以下是一个示例代码,演示如何在Airflow中重用一个任务:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from datetime import datetime
def subdag(parent_dag_name, child_dag_name, args):
dag_subdag = DAG(
dag_id=f'{parent_dag_name}.{child_dag_name}',
default_args=args,
schedule_interval="@daily",
)
with dag_subdag:
task1 = DummyOperator(task_id='task1', dag=dag_subdag)
task2 = DummyOperator(task_id='task2', dag=dag_subdag)
task3 = DummyOperator(task_id='task3', dag=dag_subdag)
task1 >> task2 >> task3
return dag_subdag
dag = DAG(
dag_id='reusable_task_example',
default_args={
'owner': 'airflow',
'start_date': datetime(2022, 1, 1),
},
schedule_interval="@daily",
)
taskA = DummyOperator(task_id='taskA', dag=dag)
subdag_task = SubDagOperator(
task_id='subdag_task',
subdag=subdag('reusable_task_example', 'subdag_task', dag.default_args),
dag=dag,
)
taskB = DummyOperator(task_id='taskB', dag=dag)
taskA >> subdag_task >> taskB
在上述示例中,我们定义了一个子DAG,其中包含了三个DummyOperator任务(task1、task2和task3)。然后,我们使用SubDagOperator将该子DAG作为一个任务(subdag_task)插入到主DAG中。通过多次使用SubDagOperator,我们可以在主DAG中重用该子DAG。
请注意,子DAG的定义和主DAG的定义是分离的,这样可以更好地组织和重用任务。
上一篇:Airflow - 重试延迟