Java 要解决此问题,请使用可序列化类型作为 Beam PTransform 的输出,并配置正确的 Beam 编码器。一些 JDBC 驱动程序可能不支持所有 Java 对象类型。使用 Serializable 或兼容类型可以更好地支持 JDBC 驱动程序。例如,以下示例将 Beam PCollection 中的 Person 对象转换为 Map 并使用 JdbcIO.write()写入 JDBC 表中:
PCollection
// Convert Person objects to Maps PCollection
// Write maps to a JDBC table personMaps.apply(JdbcIO.
Python 在 Python SDK 中,JdbcWriteTransform 在创建操作并调用 JdbcWriteFn.write 方法时,在 Beam 中为每个 PCollection 元素使用依赖于 Python pickle 的默认编码。由于未实现使用这种序列化模式的 Java 编码器实现,因此 Beam 会在创建 coder 时抛出上述异常。
此问题的一种解决方法是将 Java 类型映射到序列化的 Python 对象类型。代码示例:
import apache_beam as beam import psycopg2 import cPickle
class PickledPythonCoder(object): def encode(self, value): return cPickle.dumps(value)
def decode(self, value): return cPickle.loads(value)
def is_deterministic(self): return True
def eq(self, other): return isinstance(other, PickledPythonCoder)
coder = PickledPythonCoder()
class Person(object): def init(self, id, name, age): self.id = id self.name = name self.age = age
def asTuple(self): return (self.id, self.name, self.age)
class WriteToDB(beam.DoFn):
上一篇:ApacheBeamWriteToJdbc:java.lang.IllegalArgumentException:未知的CoderURNbeam:coder:pickled_python:v1