在Apache Beam中使用PubSubIO输出的GroupByKey元素重复的问题通常是由于数据窗口的处理不正确引起的。下面是一个代码示例,展示了如何解决这个问题:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows
# 定义一个自定义的DoFn,将GroupByKey的输出元素转换为字符串
class ToString(beam.DoFn):
def process(self, element):
(key, values) = element
return ['{}: {}'.format(key, ', '.join(values))]
# 创建PipelineOptions
options = PipelineOptions()
# 创建Pipeline
with beam.Pipeline(options=options) as p:
# 从PubSub读取数据
messages = (p
| beam.io.ReadFromPubSub(topic='')
.with_output_types(bytes))
# 将数据按照固定窗口进行分组
windowed_messages = (messages
| beam.WindowInto(FixedWindows(size=5)))
# 将数据进行解码
decoded_messages = (windowed_messages
| 'Decode' >> beam.Map(lambda x: x.decode('utf-8')))
# 将数据进行分组
grouped_messages = (decoded_messages
| 'Group' >> beam.Map(lambda x: (x.split(',')[0], x.split(',')[1]))
| beam.GroupByKey())
# 将分组后的数据转换为字符串
formatted_messages = (grouped_messages
| 'Format' >> beam.ParDo(ToString()))
# 将数据写入PubSub
formatted_messages | beam.io.WriteToPubSub(topic='')
在这个示例中,我们首先通过ReadFromPubSub
从PubSub中读取数据,然后使用FixedWindows
将数据按照固定窗口进行分组。接下来,我们将数据进行解码和分组,然后使用自定义的DoFn ToString
将分组后的数据转换为字符串。最后,我们使用WriteToPubSub
将数据写入PubSub。
通过正确设置数据窗口和处理逻辑,可以确保GroupByKey的输出元素不会重复。