通常情况下,Airflow XCOM pull 不会显示结果的原因是获取的 XCOM 键值不存在。为确保 XCOM 键值存在,请检查相应的 DAG 是否已成功运行。如果 DAG 运行成功且设置了相应的 XCOM 键值,则可能是从该 task 发送数据的格式不够正确导致的。请查看发送数据的 task 设定,确保数据格式正确并且已经成功提交。
以下是一个简单的代码示例,展示如何在 DAG 中使用 XCOM:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def push_xcom(**kwargs):
# Push data to XCOM
kwargs['ti'].xcom_push(key='my_key', value='some_value')
def pull_xcom(**kwargs):
# Pull data from XCOM
my_value = kwargs['ti'].xcom_pull(key='my_key')
print(my_value)
dag = DAG('my_dag', description='A simple test DAG', schedule_interval='@once')
push_task = PythonOperator(
task_id='push_xcom',
python_callable=push_xcom,
provide_context=True,
dag=dag
)
pull_task = PythonOperator(
task_id='pull_xcom',
python_callable=pull_xcom,
provide_context=True,
dag=dag
)
push_task >> pull_task
在这个 DAG 中,push_xcom
task 将数据 "some_value" 存储在 XCOM 中并使用 xcom_push()
函数来推送数据。然后,pull_xcom
task 从 XCOM 中拉取数据,并使用 xcom_pull()
函数来获取存储的值。这两个 task 通过 >>
操作符来连接,以确保 push_xcom
task 在 pull_xcom
task 之前运行。