问题的来源是,Apache Beam 的 Deduplicate 函数在去重时可能会出现将相同记录视为不同记录的错误情况。要解决这个问题,可以自定义一个去重函数来替代 Deduplicate 函数。以下是一个示例的代码:
import apache_beam as beam
from apache_beam.transforms import CombinePerKey
class CustomDeduplicate(beam.DoFn):
def process(self, element):
record, unique_key = element
yield (unique_key, record)
def remove_duplicates(self, _, records):
yield next(records)
def __call__(self, records):
return (
records
| beam.Map(lambda e: (e, None))
| CombinePerKey(self.remove_duplicates)
| beam.Map(lambda e: e[0])
)
records = [
{'id': 1, 'name': 'Alice'},
{'id': 1, 'name': 'Alice'},
{'id': 2, 'name': 'Bob'},
{'id': 3, 'name': 'Charlie'},
{'id': 3, 'name': 'Charlie'},
]
with beam.Pipeline() as p:
deduplicated_records = (
p
| beam.Create(records)
| beam.ParDo(CustomDeduplicate())
)
output = deduplicated_records | beam.Map(print)
上述代码中的 CustomDeduplicate 类实现了一个自定义的 Deduplicate 函数。该函数使用了 CombinePerKey 这个 Beam 自带的函数,将具有相同键的记录聚合到一起,然后保留其中的一个记录,去除重复项。最后将去重后的记录输出给下一个节点。