Alpakka Kafka Producer相对于普通Kafka Producer有以下优点:
1.对于Kafka Producer的输入数据有更强大的类型支持,支持更多类型的数据处理,如JSON、Avro、Protobuf、CSV等。
举个例子,下面是使用Alpakka Kafka Producer将JSON数据发送到Kafka的示例代码:
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.Source
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import io.circe.Json
implicit val system: ActorSystem = ActorSystem()
val bootstrapServers = "localhost:9092"
val producerSettings =
ProducerSettings(system, new StringSerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers)
val json: Json = ???
val jsonStr: String = json.toString
val record =
new ProducerRecord[String, String]("topic", jsonStr)
val done =
Source.single(record)
.runWith(Producer.plainSink(producerSettings))
done.onComplete(_ => system.terminate())
2.提供了更强大的流处理能力,例如可以进行流缓冲、流窗口、按时间和元素进行分组等。
举个例子,下面是使用Alpakka Kafka Producer在流中进行缓冲和聚合后将数据发送到Kafka的示例代码:
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.Source
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
implicit val system: ActorSystem = ActorSystem()
val bootstrapServers = "localhost:9092"
val producerSettings =
ProducerSettings(system, new StringSerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers)
val source = Source(1 to 100)
val records =
source
.grouped(10)
.map(_.sum)
.map(sum => new ProducerRecord[String,