Watermark是Flink中流的重要属性,是推进事件时间处理的关键。TwoInputStreamOperator代表具有两个输入流的运算符,它们的Watermark行为可能会出现一些问题,需要注意一些事项。
需要手动合并两个输入流的Watermark,否则可能会出现乱序。
需要处理两个输入流的所有Watermark,即使其中一个流的数据已经结束。
下面是示例代码,展示了如何实现正确的Watermark行为:
public class TwoInputStreamOperatorExample
extends TwoInputStreamOperator {
private long stringWatermark = Long.MIN_VALUE; // 保存String的最新Watermark
private long integerWatermark = Long.MIN_VALUE; // 保存Integer的最新Watermark
@Override
public void processElement1(StreamRecord element) throws Exception {
// 处理String流的元素
String value = element.getValue();
long timestamp = element.getTimestamp();
stringWatermark = Math.max(stringWatermark, timestamp); // 更新String的Watermark
output.collect(new StreamRecord<>(value, timestamp));
}
@Override
public void processElement2(StreamRecord element) throws Exception {
// 处理Integer流的元素
Integer value = element.getValue();
long timestamp = element.getTimestamp();
integerWatermark = Math.max(integerWatermark, timestamp); // 更新Integer的Watermark
output.collect(new StreamRecord<>(value.toString(), timestamp));
}
@Override
public void processWatermark1(Watermark mark) throws Exception {
// 处理String流的Watermark
stringWatermark = Math.max(stringWatermark, mark.getTimestamp()); // 更新String的Watermark
processWatermark();
}
@Override
public void processWatermark2(Watermark mark) throws Exception {
// 处理Integer流的Watermark
integerWatermark = Math.max(integerWatermark, mark.getTimestamp()); // 更新Integer的Watermark