问题描述: 在Spring Kafka中使用Avro GenericRecord进行反序列化时遇到了问题。
解决方法:
org.springframework.kafka
spring-kafka
org.apache.avro
avro
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.avro.Schema;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
public class AvroDeserializer implements Deserializer {
private final KafkaAvroDeserializer inner;
public AvroDeserializer() {
inner = new KafkaAvroDeserializer();
}
@Override
public void configure(Map configs, boolean isKey) {
String schemaRegistryUrl = (String) configs.get(AbstractKafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG);
SchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 100);
inner.configure(configs, isKey);
inner.configure(configs, schemaRegistryClient);
}
@Override
public T deserialize(String topic, byte[] data) {
return (T) inner.deserialize(topic, data);
}
@Override
public void close() {
inner.close();
}
}
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.group-id}")
private String groupId;
@Value("${kafka.schema-registry-url}")
private String schemaRegistryUrl;
@Bean
public Map consumerConfigs() {
Map props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroDeserializer.class);
return props;
}
@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public KafkaConsumer kafkaConsumer() {
return new KafkaConsumer();
}
}
import org.apache.avro.generic.GenericRecord;
import org.springframework.kafka.annotation.KafkaListener;
public class KafkaConsumer {
@KafkaListener(topics = "${kafka.topic}")
public void receive(GenericRecord record) {
// 处理接收到的消息
}
}
通过以上步骤,我们可以在Spring Kafka中使用Avro GenericRecord进行反序列化。确保配置中的Avro模式注册中心URL正确,并在消费者配置中使用Avro反序列化器。然后在消费者方法