在Apache Storm中使用JSON Kafka spout的解决方法如下:
org.apache.storm
storm-core
2.2.0
org.apache.storm
storm-kafka
2.2.0
provided
import org.apache.storm.kafka.spout.*;
import org.apache.storm.spout.*;
import org.apache.kafka.common.serialization.StringDeserializer;
public class JsonKafkaSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private KafkaSpout kafkaSpout;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
KafkaSpoutConfig kafkaSpoutConfig = KafkaSpoutConfig.builder("localhost:9092", "topic")
.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "storm-kafka-spout")
.build();
this.kafkaSpout = new KafkaSpout<>(kafkaSpoutConfig);
}
@Override
public void nextTuple() {
kafkaSpout.nextTuple();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("key", "value"));
}
@Override
public void ack(Object msgId) {
kafkaSpout.ack(msgId);
}
@Override
public void fail(Object msgId) {
kafkaSpout.fail(msgId);
}
}
import org.apache.storm.*;
import org.apache.storm.topology.*;
import org.apache.storm.tuple.Fields;
public class JsonKafkaTopology {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafkaSpout", new JsonKafkaSpout());
builder.setBolt("jsonBolt", new JsonBolt()).shuffleGrouping("kafkaSpout");
Config config = new Config();
config.setDebug(true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("json-kafka-topology", config, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
在这个示例中,假设你已经在本地启动了Kafka服务器,并有一个名为"topic"的主题。
请注意,这只是一个简单的示例,你可能需要根据你的实际需求进行更改和调整。