避免使用默认的Spring Kafka序列化和反序列化配置,手动设置序列化和反序列化类,并将其配置为使用JSON格式。下面是一个示例:
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public ProducerFactory producerFactory() {
Map config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MyObjectJsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public ConsumerFactory consumerFactory() {
Map config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyObjectJsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
在此示例中,我们使用自定义的序列化和反序列化器(MyObjectJsonSerializer
和MyObjectJsonDeserializer
)来将数据序列化和反序列化为JSON格式,并避免了使用Spring Kafka默认的对象类型头(__TypeId__
和spring_json_header_types
)。
下一篇:避免SPSC队列索引的错误共享