在Flink Table API中无法直接使用Scala的Option类型,需要将其转换为Java的Optional类型。以下是示例代码:
Scala示例:
case class Person(name: String, age: Option[Int])
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val ds: DataStream[Person] = env.fromElements(Person("Alice", Some(25)), Person("Bob", None))
val table: Table = tEnv.fromDataStream(ds)
val result: Table = table.filter($"age".isPresent)
val resultSet: DataStream[Row] = tEnv.toAppendStream(result, classOf[Row])
转换后的Java示例:
import java.util.Optional;
public class Person {
private String name;
private Optional age;
public Person(String name, Optional age) {
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public Optional getAge() {
return age;
}
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStream ds = env.fromElements(new Person("Alice", Optional.of(25)), new Person("Bob", Optional.empty()));
Table table = tEnv.fromDataStream(ds);
Table result = table.filter($("age").isPresent());
DataStream resultSet = tEnv.toAppendStream(result, Row.class);