要使用Alpakka Akka Stream从Kafka中读取数据,你需要添加相关的依赖库,并编写相应的代码。
首先,确保你的项目pom.xml文件中添加了以下依赖库:
com.typesafe.akka
akka-stream-kafka_2.12
2.1.0
com.typesafe.akka
akka-stream_2.12
2.6.12
接下来,你可以使用以下示例代码来从Kafka中读取数据:
import akka.actor.ActorSystem;
import akka.kafka.ConsumerSettings;
import akka.kafka.scaladsl.Consumer;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.scaladsl.Sink;
import akka.stream.scaladsl.Source;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.HashMap;
import java.util.Map;
public class KafkaConsumerExample {
public static void main(String[] args) {
// 创建ActorSystem和Materializer
ActorSystem system = ActorSystem.create("example");
Materializer materializer = ActorMaterializer.create(system);
// Kafka配置
String bootstrapServers = "localhost:9092";
String groupId = "group1";
String topic = "test-topic";
// 创建Kafka消费者设置
Map config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
ConsumerSettings consumerSettings = ConsumerSettings.create(system, new StringDeserializer(), new ByteArrayDeserializer())
.withBootstrapServers(bootstrapServers)
.withGroupId(groupId);
// 创建Source并从Kafka读取数据
Source source = Consumer.plainSource(consumerSettings, Subscriptions.topics(topic))
.map(record -> new String(record.value()));
// 定义Sink来处理接收到的数据
Sink sink = Sink.foreach(System.out::println);
// 运行Stream流程
source.runWith(sink, materializer);
}
}
以上示例代码创建了一个简单的Kafka消费者,从指定的主题中读取数据,并将数据打印到控制台。你可以根据自己的需求修改代码,进行进一步的处理数据操作。
确保你的Kafka服务器正常运行,并且主题存在,然后运行上述代码,即可从Kafka中读取数据。