在Beam管道中使用会话窗口并设置早期触发器时,有时会出现“准时”面板未在水印到达时触发的问题。解决此问题的一种方法是使用稍后处理时间触发器。
以下是一个示例管道,其中使用带有早期触发器的会话窗口,并使用稍后处理时间触发器来解决上述问题:
public class SessionWindowTest {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
List>> input = Arrays.asList(
TimestampedValue.of(KV.of("key", 1), new Instant(1)), // First Session
TimestampedValue.of(KV.of("key", 2), new Instant(2)),
TimestampedValue.of(KV.of("key", 3), new Instant(3)),
TimestampedValue.of(KV.of("key", 4), new Instant(5)), // Second Session
TimestampedValue.of(KV.of("key", 5), new Instant(6)),
TimestampedValue.of(KV.of("key", 6), new Instant(7)),
TimestampedValue.of(KV.of("key", 7), new Instant(9)) // Third Session
);
PCollection> inputCollection = pipeline
.apply(Create.timestamped(input))
.apply(MapElements
.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
.via((TimestampedValue> tv) -> tv.getValue())
);
inputCollection
.apply(Window.>into(Sessions.withGapDuration(Duration.standardSeconds(2)))
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(5))))
.discardingFiredPanes