通过使用XCom来在任务级别上传递信息。XCom允许在任务之间共享小块数据,例如成功或失败的状态信息。以下是一个示例,其中使用XCom传递任务级别的信息:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
# 定义DAG
dag = DAG('my_dag', description='My simple DAG',
schedule_interval='0 1 * * *',
start_date=datetime(2021, 1, 1),
catchup=False)
# 定义任务1
task1 = BashOperator(task_id='task1',
bash_command='echo "Hello from task1"',
xcom_push=True,
dag=dag)
# 定义任务2,使用任务1的结果
task2 = BashOperator(task_id='task2',
bash_command='echo "Hello from task2. Task1 result: {{ ti.xcom_pull(task_ids=\'task1\') }}"',
dag=dag)
# 定义任务3,使用任务2的结果
task3 = BashOperator(task_id='task3',
bash_command='echo "Hello from task3. Task2 result: {{ ti.xcom_pull(task_ids=\'task2\') }}"',
dag=dag)
# 设置任务的依赖关系
task1 >> task2 >> task3
在任务1中,我们使用xcom_push=True
来将任务级输出到XCom中。在任务2和任务3中,我们使用ti.xcom_pull()
从XCom中检索任务级输出,并将其包含在Bash命令中。这样,我们可以在任务级别上传递信息,并在任务之间共享。