AWS MSK(Amazon Managed Streaming for Apache Kafka)是一项托管服务,旨在简化 Apache Kafka 的设置和管理。默认情况下,AWS MSK 启用了 Kafka 消费者的自动提交偏移量功能(enable.auto.commit),这意味着消费者将每隔一段时间或在收到一定数量的消息后自动提交偏移量。
如果要禁用这个功能,可以在消费者代码中指定 enable.auto.commit=false(或 enableAutoCommit(false))的配置选项,例如:
Properties props = new Properties();
props.put("bootstrap.servers", "your_broker_list_here");
props.put("enable.auto.commit", "false");
props.put("group.id", "your_group_id_here");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("your_topic_here"));
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
System.out.println("Received message: " + record.value());
}
consumer.commitSync(); // 手动提交偏移量
}
在这个示例中,我们将 enable.auto.commit 设置为 false,然后在每个轮询期间手动提交偏移量(consumer.commitSync())。这将确保消费者仅在处理完消息后才提交偏移量,从而减少重复消费或数据丢失的风险。