使用 Apache Beam 生成随机整数序列可以通过以下的示例代码来实现:
import apache_beam as beam
import random
class GenerateRandomNumbers(beam.DoFn):
def process(self, element):
for i in range(100):
yield random.randint(1, 100)
with beam.Pipeline() as pipeline:
numbers = pipeline | 'Create seed' >> beam.Create([None])
random_numbers = numbers | 'Generate random numbers' >> beam.ParDo(GenerateRandomNumbers())
random_numbers | 'Print random numbers' >> beam.Map(print)
在上面的代码中,首先定义了一个 GenerateRandomNumbers
类,它继承自 beam.DoFn
,其中 process
方法对每个输入的元素生成 100 个随机整数。之后,使用 beam.Pipeline()
创建管道,并使用 beam.Create()
创建一个带有一个 None
元素的 PCollection。最后,使用 beam.ParDo()
和 beam.Map()
分别应用 GenerateRandomNumbers
类和 print
函数处理 PCollections,以最终的形式输出随机数序列。
上一篇:ApacheBeam+DatabricksNotebook-mapfunctionerror
下一篇:ApacheBeam-在Bigquery流式插入时出现RuntimeException:ManagedChannel分配位置