确保您的Airflow版本高于1.10.10。此版本修复了已知的XCom bug。
确保您已经正确地配置了xcom_backed = airflow.contrib.operators.aws_lambda_operator.AWSLambdaHook
。使用AWS Lambda作为XCom backend后端的示例如下所示:
from airflow.contrib.operators.aws_lambda_operator import AWSLambdaOperator
config = {
'region_name': 'us-east-1'
}
lambda_op = AWSLambdaOperator(
task_id='lambda_test',
aws_conn_id='aws_default',
region_name=config['region_name'],
payload={"foo": "bar"},
function_name='test_lambda',
xcom_push=True,
dag=dag,
)
验证您在XCom表中保存的值是否为字符串。在Airflow MWAA中,pickle
模块可能无法使用,因此保存为pickle类型的XCom值可能不起作用。
通过执行以下命令清除Airflow数据库:
airflow resetdb
确保您正在使用相同的AWS账户并对Airflow MWAA所在的VPC和子网具有正确的访问权限。
如果以上解决方法都不起作用,您可以尝试使用其他的XCom backend注册,并重试。例如,您可以使用Redis作为backend。
如果问题仍然存在,请参考Airflow MWAA文档或联系AWS支持获取帮助。