如果你的 Apache Beam Python Dataflow 与 GCP Pub/Sub 一起使用时发现计数器超计数,可能是因为你没有处理互斥更新。为了解决这个问题,你可以使用 Beam 固有的 apache_beam.transforms.core.CombineFn
类来处理计数器更新,保证以原子方式进行。你可以在 ParDo
中使用该类来创建计数器。以下是一个示例代码:
from apache_beam.transforms.core import CombineFn
class MyCounter(CombineFn):
def create_accumulator(self):
return 0
def add_input(self, accumulator, input):
return accumulator + 1
def merge_accumulators(self, accumulators):
return sum(accumulators)
def extract_output(self, accumulator):
return accumulator
with beam.Pipeline() as p:
...
count_pipeline = (
p
| beam.ReadFromPubSub(subscription=subscription)
| beam.ParDo(MyCounter()).with_outputs('count')
)
# Access the count data using count_pipeline['count'].