一种可能的解决方法是升级到Flink的版本大于1.11.2,并在Vectorized运算符配置文件中使用BatchPhysicalTypeCheckStrategy参数。下面是代码示例:
val conf = new Configuration()
conf.setString("table.exec.resource.default-parallelism", "1")
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
val streamSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tEnv = StreamTableEnvironment.create(env, streamSettings)
val srcDDL =
"""
|CREATE TABLE src (
| id BIGINT,
| name STRING,
| age INT,
| score DOUBLE
|) WITH (
| 'connector' = 'datagen'
|)
""".stripMargin
tEnv.executeSql(srcDDL)
val sinkDDL =
"""
|CREATE TABLE sink (
| id BIGINT,
| name STRING,
| age INT,
| score DOUBLE,
| invalid BOOLEAN
|) WITH (
| 'connector' = 'print'
|)
""".stripMargin
tEnv.executeSql(sinkDDL)
val func = new ExampleFunction
tEnv.createTemporaryFunction("func", func)
val query =
"""
|SELECT
| id,
| name,
| age,
| score,
| func(name) as invalid
|FROM src
""".stripMargin
tEnv.executeSql(query).print()
使用以上代码示例,UDF应该可以正常工作。
上一篇:ApacheFlinkvsApacheStorm基准测试
下一篇:ApacheFlink遇到org.apache.hadoop.ipc.RpcException:RPC响应数据长度超过了写入HDFS的最大数据长度限制。