在Airflow中,可以使用XCom
来传递任务之间的数据。XCom
是Airflow中的一个功能,用于在任务之间传递和共享数据。
下面是一个示例,展示如何在一个任务中获取前一个任务的任务ID:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.models import XCom
def get_previous_task_id(**context):
# 获取前一个任务的任务ID
previous_task_id = context['task_instance'].previous_ti.task_id
print('Previous Task ID:', previous_task_id)
def my_task(**context):
# 获取前一个任务的任务ID
previous_task_id = context['task_instance'].previous_ti.task_id
print('Previous Task ID:', previous_task_id)
dag = DAG('my_dag', start_date=datetime(2021, 1, 1), schedule_interval=None)
task1 = PythonOperator(
task_id='task1',
python_callable=get_previous_task_id,
provide_context=True,
dag=dag
)
task2 = PythonOperator(
task_id='task2',
python_callable=my_task,
provide_context=True,
dag=dag
)
task1 >> task2
在上面的示例中,get_previous_task_id
函数和my_task
函数都接受一个context
参数,该参数包含了当前任务的上下文信息。通过context['task_instance'].previous_ti.task_id
可以获取前一个任务的任务ID。
在任务定义时,需要设置provide_context=True
,以便将上下文信息传递给任务函数。
通过将任务1和任务2连接起来,任务2可以通过context
参数获取任务1的任务ID。