要解决这个问题,首先需要创建一个Kafka消费者来消费其他人的消息。以下是一个使用Java代码的示例:
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
private static final String TOPIC = "your-topic-name";
private static final String BOOTSTRAP_SERVERS = "your-bootstrap-servers";
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
props.put("group.id", "your-consumer-group-id");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
Consumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC));
while (true) {
ConsumerRecords records = consumer.poll(100);
records.forEach(record -> {
// 处理消息的逻辑
System.out.println("消费消息:" + record.value());
});
}
}
}
要运行此代码示例,您需要将your-topic-name
替换为您要消费的实际Kafka主题的名称,并将your-bootstrap-servers
替换为您的Kafka代理的引导服务器地址。
此示例创建一个Kafka消费者,并使用subscribe
方法订阅特定的主题。然后,使用poll
方法从Kafka代理拉取消息,并使用forEach
遍历每条消息,您可以在其中添加您自己的处理逻辑。
请注意,此示例中的消费者是一个无限循环,以持续消费消息。您可以根据自己的需求进行修改。
希望这可以帮助到您!