这个问题发生在使用 Apache Beam 的 SQSIO 库时,可能同时使用了 StreamTransform,由于有一个已知的 bug 使得 SQSIO 库中的消息处理器无法正确处理消息,从而引发 IllegalMutationException 异常。
目前没有解决方案,但有一种替代方法,可以通过使用 Apache Beam 的原生 SQSIO 库并避免使用 StreamTransform 来规避此问题。
以下是使用原生 SQSIO 库的代码示例:
import apache_beam as beam
from apache_beam.io.aws.sqs import SqsIO
from apache_beam.options.pipeline_options import PipelineOptions
options = PipelineOptions()
with beam.Pipeline(options=options) as p:
records = p | 'ReadFromSQS' >> SqsIO.Read(queue_url='sqs://queue_url')
records | beam.Map(print)
通过使用该方法,您应该能够避免使用 StreamTransform 并成功读取和处理 Amazon SQS 中的消息。