是的,Alpakka Kafka Consumer.committablePartitionedSource会为每个分区使用单独的kafka consumer。可以通过如下代码来证明:
val committablePartitionedSource = Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics(topic)) val result = committablePartitionedSource.mapAsync(parallelism = 1) { partition => val (topicPartition, source) = partition val partitionOffset = getPartitionOffsetFromDb(topicPartition) source .via(businessFlow) .mapAsync(1)(msg => saveMessageToDb(topicPartition, msg)) .map(_.committableOffset) .via(Committer.flow(CommitterSettings(system))) .runWith(Sink.ignore) }
在上述代码中,我们定义了一个committablePartitionedSource并使用了mapAsync运算符来将每个分区的Source单独进行业务处理。可以看到,每次运行mapAsync时都只针对一个分区。因此,每个分区都将使用自己的kafka consumer。
如果您希望为多个分区共享一个kafka consumer,请使用CommittableSource而不是committablePartitionedSource。例如:
val committableSource = Consumer.committableSource(consumerSettings, Subscriptions.topics(topic)) val result = committableSource.mapAsync(parallelism = 1) { msg => saveMessageToDbAndCommitOffset(msg) }
在上述代码中,我们使用CommittableSource并且在mapAsync处理程序中将所有接收到的消息保存到数据库中。因此,我们使用的是共享的kafka consumer。