要区分插入/更新/删除操作,需要在Pulsar JDBC Sink Connector配置文件中使用hashtable选项来设置相关的属性。对于每个消息,通过指定的主键列来确定它是插入、删除还是更新操作。
以下是示例代码:
pulsar-sink.properties文件配置:
jdbc.username=bob jdbc.password=1234 jdbc.url=jdbc:postgresql://localhost/test
sink.topic=my-topic sink.processing.guarantees=AT_LEAST_ONCE sink.max.pending.records=1000 sink.batch.size=10 sink.batch.delay.ms=500 sink.converter.class=org.apache.pulsar.io.core.JacksonJsonConverter sink.converter.schema.type=avro sink.converter.schema.registry.url=http://localhost:8081
jdbc.hash.table={"mytable": "id"}
jdbc.hash.column.op={"id": "insert"}
在上面的配置中,我们使用了hashtable选项来指定目标表和主键列。 jdbc.hash.column.op用来指定每个操作(插入,更新或删除),对于每个消息,它将操作与消息属性中的主键值匹配。在示例中,我们对所有的消息都执行插入操作。
注意,如果需要执行更新或删除操作,必须指定唯一的主键列。如果没有指定,将抛出异常。