要获取Kafka消费者组信息,我们可以使用Kafka Admin API来查询消费者组偏移量等信息。以下是一个示例代码,演示如何在Seek方法中获取消费者组信息:
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
String topic = record.topic();
int partition = record.partition();
long offset = record.offset();
String key = record.key();
String value = record.value();
// 获取消费者组信息
Map offsets = new HashMap<>();
for (TopicPartition tp : consumer.assignment()) {
OffsetAndMetadata offsetAndMetadata = consumer.committed(tp);
if (offsetAndMetadata != null) {
offsets.put(tp, offsetAndMetadata);
}
}
System.out.printf("topic=%s, partition=%d, offset=%d, key=%s, value=%s, offsets=%s\n",
topic, partition, offset, key, value, offsets);
}
}