使用手动注册模式代替自动注册模式。以下是示例代码:
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
@Configuration
@EnableKafka
public class KafkaConfig {
private static final String TOPIC_NAME = "example.topic";
@Bean
public KafkaAdmin kafkaAdmin() {
Map configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic() {
return new NewTopic(TOPIC_NAME, 1, (short) 1);
}
@Bean
public KafkaTemplate kafkaTemplate() {
Map properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(properties));
}
@Bean
public ConsumerFactory consumerFactory() {
Map props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Object.class));
}
}
在此示例中,我们通过手动配置bean来注册主题,并使用相应的序列化程序和消费者工厂。这允许我们更好地掌控模式注册。