以下是使用Apache Beam的WriteToText和WriteToFiles方法的代码示例:
import apache_beam as beam
def write_to_text(data):
return data | beam.io.WriteToText('output.txt')
with beam.Pipeline() as p:
# 创建PCollection
data = p | beam.Create(['Hello', 'World', 'Apache', 'Beam'])
# 将PCollection写入文本文件
write_to_text(data)
import apache_beam as beam
def write_to_files(element):
# 根据元素的值创建文件名
filename = 'output_' + element + '.txt'
return element | beam.io.WriteToText(filename)
with beam.Pipeline() as p:
# 创建PCollection
data = p | beam.Create(['file1', 'file2', 'file3'])
# 将PCollection写入多个文件
data | beam.Map(write_to_files)
在这些示例中,使用beam.io.WriteToText()
和beam.io.WriteToFiles()
方法将PCollection的元素写入文本文件。WriteToText方法将所有元素写入单个文件,而WriteToFiles方法根据元素的值创建多个文件。