这是一个已知的问题,可以通过自定义一个 SerializableFunction 来解决。以下为示例代码:
public class InstantToTimestampFn implements SerializableFunction {
@Override
public Timestamp apply(Instant instant) {
return Timestamp.ofTimeMicroseconds(instant.toEpochMilli() * 1000);
}
}
在 PTransform 中使用上述自定义函数即可将 Instant 转换成 BigQuery 中的 TimeStamp 类型。
PCollection myEntities = ...;
myEntities
.apply(MapElements.via((el) -> el.toBuilder()
.setTimestampField(new InstantToTimestampFn().apply(el.getInstantField()))
.build())
.withOutputType(new TypeDescriptor() {}))
.apply(BigQueryIO.writeTableRows()
.to(":.")
.withSchema(MyEntity.class)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));