Apache Kafka 中的 Compaction 是一种数据保留策略,用于保留特定键的最新值,而删除其他旧的键值对。这可以用于清理 Kafka 主题中的日志,以便只保留最新和最相关的数据。
Compaction 的工作原理如下:
Kafka 主题需要配置 cleanup.policy
参数为 compact
,以启用 Compaction。
当消息被写入主题时,Kafka 会根据消息的键(key)进行分组,并将消息追加到适当的分区(partition)中。
当某个分区中的日志段(log segment)的大小达到一定阈值时,Kafka 会触发 Compaction 过程。
Compaction 过程首先会根据每个键(key)的最新值创建一个临时的压缩日志段(compacted log segment)。
然后,Kafka 会将旧的日志段中的键值对与临时的压缩日志段进行合并。
在合并过程中,Kafka 会保留每个键的最新值,并删除旧的键值对。
合并完成后,临时的压缩日志段会成为新的日志段,并取代旧的日志段。
以下是一个使用 Apache Kafka 的 Java 代码示例,演示如何配置和使用 Compaction:
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import java.util.Collections;
import java.util.Properties;
public class KafkaCompactionExample {
private static final String TOPIC_NAME = "my_topic";
public static void main(String[] args) {
// 创建 Kafka 主题
createTopic();
// 创建生产者和消费者
KafkaProducer producer = createProducer();
KafkaConsumer consumer = createConsumer();
// 发送一些消息到主题
sendMessages(producer);
// 读取消息,触发 Compaction
readMessages(consumer);
// 关闭生产者和消费者
producer.close();
consumer.close();
}
private static KafkaProducer createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
return new KafkaProducer<>(props);
}
private static KafkaConsumer createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my_consumer_group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
return consumer;
}
private static void createTopic() {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (AdminClient admin = AdminClient.create(props)) {
NewTopic newTopic = new NewTopic(TOPIC_NAME, 1, (short) 1);
admin.createTopics(Collections.singletonList(newTopic)).all().get();
} catch (Exception e) {
e.printStackTrace();
}
}
private static void sendMessages(KafkaProducer producer) {
for (int i = 0; i < 10; i++) {
ProducerRecord record = new ProducerRecord<>(TOPIC_NAME, "key_" + i, i);
producer.send(record);
}
producer.flush();
}
private static void readMessages(KafkaConsumer