通过Alpakka Kafka Consumer提供的committablePartitionedSource方法创建的流在默认情况下将为每个分区创建单独的Kafka消费者。示例如下:
import akka.kafka.scaladsl.Consumer
import akka.kafka.scaladsl.Consumer.Control
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.scaladsl.{Keep, Sink}
import akka.stream.{ActorMaterializer, Materializer}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import akka.kafka.scaladsl.Consumer.Control
import akka.kafka.scaladsl._
import akka.stream.scaladsl._
import org.apache.kafka.common.TopicPartition
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContextExecutor, Future}
object Example extends App {
implicit val ec: ExecutionContextExecutor = scala.concurrent.ExecutionContext.global
implicit val system = akka.actor.ActorSystem("example")
implicit val mat: Materializer = ActorMaterializer.create(system)
// ----------------------------------------
// Setup
val config = system.settings.config.getConfig("akka.kafka.consumer")
val consumerSettings =
ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val topics = List("test")
// ----------------------------------------
// Source creation
val partition = new TopicPartition("test", /* partition = */ 0)
val source: Source[(TopicPartition, Source[ConsumerMessage.CommittableMessage[String, String], Consumer.Control]), Consumer.Control] =
Consumer.committablePartitionedSource(consumerSettings, Subscriptions.assignment(partition))
// ----------------------------------------
// Sink creation
val sinkPartition: ((TopicPartition, Source[ConsumerMessage.CommittableMessage[String, String], Control]), Future[Done]) => Future[Done] = {
case ((topicPartition, source), streamCompletion) =>
source.runWith(Sink.ignore)
}
// ----------------------------------------
// Run
val (consumerControl, streamDone) = source
.map {
case (part, source) => (part, source.mapAsync(1)(msg => {
msg.committableOffset
.commitScaladsl()
.map(_ => msg)
}))
}
.toMat(BroadcastHub.sink(bufferSize = 10))(Keep.both)
.run()
val sinks = consumerControl.partitionedSink(sinkPartition)
val subscriptions = consumerControl.subscriptions
// ----------------------------------------
// Examples
// 1. Run stream in background with cancellable control
val control = consumerControl
.indexWhere(_.topicPartition == partition)
.filter(_ >= 0)
.toMat(Sink.foreach(idx => println(s"Commencing stream for partition $partition ($idx)")))(Keep.right)
.run()
// Wait for acknowledgement
Thread.sleep(1000)
// Cancel the stream
control.cancel()
// 2. Listen for 10 seconds and shut down
val _ = consumerControl
.throttle(1, 1.second)
.takeWithin(10.seconds)
.toMat(Sink.foreach(println))(Keep.both)
.run()
// ----------------------------------------
// Shutdown
Await.ready(system.terminate(), Duration.Inf)
}