这个问题通常是因为XCom传递的参数没有正确指定。在Airflow DAG中,可以使用如下的方式进行XCom参数传递:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def my_func(**context):
context['ti'].xcom_push(key='my_key', value='my_value')
def my_other_func(**context):
my_value = context['ti'].xcom_pull(key='my_key')
print(my_value)
with DAG('my_dag') as dag:
task1 = PythonOperator(task_id='push_to_xcom', python_callable=my_func, provide_context=True)
task2 = PythonOperator(task_id='pull_from_xcom', python_callable=my_other_func, provide_context=True)
task1 >> task2
在这个示例中,task1使用.xcom_push()
方法将参数推送到XCom,而task2使用.xcom_pull()
方法从XCom中拉取参数。请确保XCom键和值的正确性,并且Pull任务在Push任务之后运行。