问题主要出现在数据流程和代码环境之间的不匹配。解决方法是通过设置并行度参数来解决这个问题。示例代码如下:
在Pipeline Ptransform中,使用GroupByKey方法并附加CombineFn类以聚合输入。发出的结果是一组键值对。
pcollection = ... # some input to the pipeline
grouped = pcollection | beam.GroupByKey()
result = grouped | "Aggregate" >> beam.CombineValues(CombineFn)
在Dataflow作业中,为此步骤指定合适的并行度。例如,使用以下命令运行Dataflow作业:
python -m most_frequent_words
--runner DataflowRunner
--project $PROJECT
--temp_location gs://$BUCKET/temp
--output gs://$BUCKET/output
--worker_machine_type n1-standard-4
--num_workers 5
--autoscaling_algorithm THROUGHPUT_BASED
--max_num_workers 10
--experiments=use_beam_bq_sink,shuffle_mode=service
--grouped | "Aggregate" >> beam.CombineValues(CombineFn)
--num_workers 21
然后,重新运行作业,结果应该能够在Dataflow中正确输出。
上一篇:AggregationOperations中的Collations
下一篇:AggregationStrategy与org.apache.camel.CamelExchangeException: Invalid correlation key的错误。