在Apache Pulsar和Kafka中,消费者的消息获取方式有所不同。
在Apache Pulsar中,消费者是通过订阅主题来接收消息的,而不是轮询主题。当有新的消息可用时,Pulsar会将消息推送给消费者。下面是使用Apache Pulsar的代码示例:
import org.apache.pulsar.client.api.*;
public class PulsarConsumerExample {
public static void main(String[] args) throws PulsarClientException {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscribe();
while (true) {
Message message = consumer.receive();
try {
// 处理消息
System.out.println(new String(message.getValue()));
consumer.acknowledge(message);
} catch (Exception e) {
// 处理异常
consumer.negativeAcknowledge(message);
}
}
}
}
在Kafka中,消费者需要轮询主题来获取消息。消费者通过调用poll()
方法来获取一批待处理的消息,然后处理这些消息。下面是使用Kafka的代码示例:
import org.apache.kafka.clients.consumer.*;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "my-group");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
// 处理消息
System.out.println(record.value());
}
}
}
}
需要注意的是,上述示例中的代码仅供参考,实际使用时可能需要根据具体情况进行修改。
上一篇:Apache Pulsar读取器