Properties props = new Properties();
props.put("bootstrap.servers", "your-bootstrap-server");
props.put("group.id", "your-group-id");
//订阅单个主题
consumer.subscribe(Collections.singletonList("your-topic"));
//订阅多个主题
consumer.subscribe(Arrays.asList("topic1", "topic2"));
props.put("group.id", "your-group-id");
props.put("max.poll.records", "500");
props.put("max.poll.interval.ms", "300000");
//增加消费者数
props.put("client.id", "your-client-id" + i);
props.put("group.id", "your-group-id-" + i);
//手动提交偏移量
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord record : records) {
//处理消息
}
consumer.commitSync();
}
通过上述步骤,应该能够解决AWS Kafka集群上服务未消费所有主题消费者组的消息的问题。