要回答这个问题,需要先理解每种轮询方式的工作原理和适用场景。以下是对三种轮询方式的简要解释:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
service MyService {
rpc StreamMessages(MessageRequest) returns (stream MessageResponse) {}
}
void rpc StreamMessages(MessageRequest request) returns (stream MessageResponse response) {
// request与response是流形式,可以发送和接收多条消息
}