在Airflow中,使用Operator来执行任务。每个Operator都有一个输出(output)参数和一个输入(input)参数,可以将一个Operator的输出与另一个Operator的输入连接起来。使用单个“>>”符号可以将一个Operator的输出连接到另一个Operator的输入。例如:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {'owner': 'airflow'}
dag = DAG('my_dag', default_args=default_args, schedule_interval=timedelta(days=1))
task1 = BashOperator(
task_id='task1',
bash_command='echo "Hello World from Task 1"',
dag=dag,
)
task2 = BashOperator(
task_id='task2',
bash_command='cat /tmp/hello.txt >> /tmp/goodbye.txt',
dag=dag,
)
task1 >> task2
上述代码定义了一个DAG,其中定义了两个BashOperator任务(task1和task2),它们分别输出文本消息和将一个文本文件的内容附加到另一个文本文件中。运行时,task2将在task1成功完成后执行。
在一些情况下(例如循环或分支),可能需要使用多个“>>”符号将一个Operator的输出连接到多个Operators的输入。在这种情况下,可以使用列表和循环来实现。例如:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {'owner': 'airflow'}
dag = DAG('my_dag', default_args=default_args, schedule_interval=timedelta(days=1))
task1 = BashOperator(
task_id='task1',
bash_command='echo "Hello World from Task 1"',
dag=dag,
)
task2 = BashOperator(
task_id='task2',
bash_command='cat /tmp/hello.txt >> /tmp/goodbye1.txt',
dag=dag
)
task3 = BashOperator(
task_id='task3',
bash_command='cat /tmp/hello.txt >> /tmp/goodbye2.txt',
dag=dag,
)
tasks = [task2, task3]
for task in tasks:
task1 >> task
上述代码中,定义了三个任务(task1、task2和task3),其中task2和task3都的输入通过循环来定义,并且都使用了“>>”符号将输出连接到task1的输出