使用XCom存储和检索任务间数据
Airflow的XCom是跨任务间共享数据的推荐方法。可以通过XCom将数据从一个任务传递到另一个任务。
在一个任务中使用XCom进行存储,例如:
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
def my_task(**kwargs):
# set the value to store
my_value = 'hello world'
# save the value to xcom
kwargs['ti'].xcom_push(key='my_value_key', value=my_value)
my_operator = PythonOperator(
task_id='my_task',
python_callable=my_task,
provide_context=True,
dag=my_dag,
)
在另一个任务中检索值,例如:
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
def my_other_task(**kwargs):
# retrieve my_value from xcom
my_value = kwargs['ti'].xcom_pull(key='my_value_key', task_ids='my_task')
# do something with my_value
print(my_value)
my_other_operator = PythonOperator(
task_id='my_other_task',
python_callable=my_other_task,
provide_context=True,
dag=my_dag,
)
这将从前一个任务(ID为my_task)的XCom存储中检索键为"my_value_key"的值,并将值存储在my_value中,以便于在当前任务(ID为my_other_task)中使用。