- 定义一个字符串列表类型的Tuple类:
public class StringListTuple {
public String key;
public List value;
public StringListTuple() {}
public StringListTuple(String key, List value) {
this.key = key;
this.value = value;
}
public static StringListTuple of(String key, List value) {
return new StringListTuple(key, value);
}
}
- 使用group by和reduce进行聚合:
DataStream> dataStream = ...;
DataStream resultStream = dataStream
.keyBy(value -> value.f0)
.reduce((value1, value2) -> {
List list = new ArrayList<>();
list.addAll(value1.f1);
list.addAll(value2.f1);
return Tuple2.of(value1.f0, list);
})
.map(value -> StringListTuple.of(value.f0, value.f1));
- 将结果转换为字符串:
DataStream stringResult = resultStream
.map(value -> value.key + ": " + String.join(", ", value.value));