在测试代码中设置 ConsumerSettings 属性的 auto.offset.reset 属性为 earliest,以确保在测试过程中读取所有消息。示例代码如下:
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("test-group")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
上一篇:AlpakkaKafkaProducer相较于普通KafkaProducer的优势是什么?
下一篇:AlpakkaS3源导致AkkaStream崩溃,出现TcpIdleTimeoutException:连接上发生TCP空闲超时