检查以下几个问题:
确保要检索的XCom键已在任务中设置。例如,如果您想要从先前的任务中检索时间戳,请确保该任务已将时间戳作为XCom存储。
def write_to_xcom(ds, **kwargs): # some data processing... timestamp = datetime.utcnow().isoformat() kwargs['ti'].xcom_push(key='timestamp', value=timestamp) return 'OK'
def read_from_xcom(ds, **kwargs): timestamp = kwargs['ti'].xcom_pull(key='timestamp') print('Timestamp:', timestamp)
检查XCom键的名称是否正确。XCom键区分大小写,因此键的大小写必须与之前设置的完全匹配。
def write_to_xcom(ds, **kwargs): # some data processing... timestamp = datetime.utcnow().isoformat() kwargs['ti'].xcom_push(key='Timestamp', value=timestamp) # 注意'T'是大写 return 'OK'
def read_from_xcom(ds, **kwargs): timestamp = kwargs['ti'].xcom_pull(key='timestamp') # 键的名称大小写错误 print('Timestamp:', timestamp)
检查要检索的XCom键是否在先前的任务中成功存储。如果在先前的任务中未成功存储XCom,请检查该任务的日志,以查看是否存在任何错误或异常。
检查要检索的XCom是否从正确的任务中提取。如果要检索的XCom来自不同的任务,则需要指定该任务的任务ID。
def write_to_xcom(ds, **kwargs): # some data processing... timestamp = datetime.utcnow().isoformat() kwargs['ti'].xcom_push(key='timestamp', value