Apache Kafka KRaft 是一种基于 Apache Kafka 的可复制 Log 存储的实验性尝试。KRaft 旨在提供一个基于Raft协议的异步日志存储实现,并且具有优秀的可伸缩性和可靠性特性。以下是将“Apache Kafka KRaft - Kafka Storage Tool”改写成中文的解决方法及相应的代码示例。
Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-kraft-storage"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
// 创建 KRaft 存储实例 StateStoreSupplier keyValueStore = Stores.persistentKeyValueStore("local-store"); props.put(StreamsConfig.STATE_STORES_CONFIG, Collections.singletonMap("my-state-store", keyValueStore));
// 创建 Kafka Streams 实例 KafkaStreams streams = new KafkaStreams(topology, props);
// 创建 Kafka Streams 应用 Topology topology = new Topology(); topology.addSource("source", "input-topic"); topology.addProcessor("MyProcessor", MyProcessor::new, "source"); topology.addSink("sink", "output-topic", "