在Akka Kafka中,可以使用Consumer.committableSource
来列出可用主题。下面是一个使用Scala的示例代码:
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.scaladsl.Sink
import org.apache.kafka.common.serialization.StringDeserializer
import scala.concurrent.ExecutionContext.Implicits.global
object ListTopicsExample extends App {
// Kafka consumer settings
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
// Create a source with all available topics
val topicsSource = Consumer.committableSource(consumerSettings, Subscriptions.topics())
// Sink that prints out the topics
val printSink = Sink.foreach { msg =>
println(s"Received message from topic: ${msg.record.topic()}")
}
// Run the stream
topicsSource.to(printSink).run()
}
上述代码使用Consumer.committableSource
创建了一个包含所有可用主题的源,然后使用Sink.foreach
将每个主题打印出来。可以根据需要修改代码来实现其他功能。