解决方法是在WriteToBigQuery之前使用re-apply窗口转换,以确保它的输入是在全新的窗口之内。以下是使用Re-window操作符解决此问题的代码示例:
class MyPipelineOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--window_size', type=int, default=30)
def run():
options = PipelineOptions()
p = beam.Pipeline(options=options)
# Reads from PubSubTopic.
lines = p | beam.io.ReadStringsFromPubSubTopic(input_topic)
# Parsing PubSub message and adding Processing time.
records = (lines
| beam.ParDo(ParsePubSubMessage())
| beam.Map(lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))
| beam.WindowInto(window.FixedWindows(options.window_size)))
# Keying by user id, then developing session information per window.
sessioned_items = (records
| beam.ParDo(SessionGenerator())
| beam.Map(lambda elem: (elem['user_id'], elem)))
# Stateful ParDo operation to keep track of session-level information.
user_activity = (sessioned_items
| 'KeyByKeying' >> beam.GroupByKey()
| 'Stateful_Process' >> beam.ParDo(UserState()))
# Adds start and end times in xx:xx:xx format and removes activity durations which are lower than the threshold.
final_activity = (user_activity
| beam.ParDo(FormatSessionAndDiscardShortActivities()))
# Writing results into BigQuery.
final_activity | beam.io.WriteToBigQuery(output_table,
schema=OUTPUT_SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
result = p.run()
result.wait_until_finish()
上一篇:ApacheBeam:WhattriggerdoIneedformyusecase
下一篇:ApacheBeam:因PipelineOptions无法序列化,导致无法序列化DoFnWithExecutionInformation。