此错误表示在消费者组中,当前消费者不是协调员。通常出现这种错误的原因是,其他成员已经成为协调员,或者协调员已经彻底关闭。要解决此问题,可以尝试重新启动消费者并等待其成为协调员。另外,确保分配给消费者的组ID是唯一的,并且消费者组中没有任何相同唯一id的成员。以下是一个示例代码,可以通过将消费者组ID唯一地分配给每个消费者来解决此问题:
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your bootstrap servers");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "unique-group-id");
// create consumer instance
KafkaConsumer consumer = new KafkaConsumer<>(properties);
// subscribe consumer to topic
consumer.subscribe(Collections.singletonList("your-topic"));
// poll for new records
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
System.out.println("Received message: " + record.value());
}
}
请注意,这只是一个示例代码,并且需要根据您的特定情况进行修改和调整。