在 DAG 中使用 xcom_push() 和 xcom_pull() 方法将任务间所需的信息进行交互共享,以确保每个任务依赖的前置任务已经成功运行。
代码示例:
from airflow.operators.python_operator import PythonOperator
def task1():
# do something
return 'result1'
def task2(**context):
result1 = context['task_instance'].xcom_pull(task_ids='task1')
# do something with result1
return 'result2'
def task3(**context):
result1 = context['task_instance'].xcom_pull(task_ids='task1')
result2 = context['task_instance'].xcom_pull(task_ids='task2')
# do something with result1 and result2
return 'result3'
dag = DAG('dag_name', default_args=default_args)
task_1 = PythonOperator(task_id='task1', python_callable=task1, dag=dag)
task_2 = PythonOperator(task_id='task2', python_callable=task2, dag=dag)
task_3 = PythonOperator(task_id='task3', python_callable=task3, dag=dag)
task_1 >> [task_2, task_3]
在这个例子中,task2 和 task3 都需要 task1 返回的结果,因此通过 xcom_push() 方法将结果推送到 XCom 中,再由 task2 和 task3 通过 xcom_pull() 方法获取结果。这样,即使 task1 需要重新运行,也不会对后续任务造成影响,因为后续任务已经获取到了所需的结果。