Akka Stream 提供了一个名为“reactive-kafka”的库,可以很方便地使用 Kafka 作为 Source 和 Sink 来构建动态的数据处理流。具体步骤如下:
libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "2.1.0"
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
val kafkaProducer = producerSettings.createKafkaProducer()
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val kafkaConsumer = Consumer.plainSource(consumerSettings, Subscriptions.topics("test"))
.map(_.value())
val websocketSource =
Source.actorRef[String](bufferSize = 100, OverflowStrategy.fail)
.mapAsync(1)(msg => {
// 解析 Websocket 消息,如果是动态调整数据处理流的消息,则返回相应的 Source 或 Flow
// 如果是其他消息,则原封不动地返回
})
.collect { case x: sourceOrFlow => x } // 过滤出 Source 或 Flow
.switchMapConcat(identity) // 动态切换 Source 或 Flow
val kafkaSink =
Flow[String]
.map(value => new ProducerRecord[String, String]("test", value))
.to(Producer.plainSink(producerSettings, kafkaProducer))
val dataFlow = Flow.fromSinkAnd