在Apache Pulsar中,消息传递语义包括“至少一次”(at least once)和“最多一次”(at most once)语义。
示例代码:
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
public class AtLeastOnceExample {
public static void main(String[] args) throws PulsarClientException {
String serviceUrl = "pulsar://localhost:6650";
String topic = "persistent://public/default/my-topic";
PulsarClient client = PulsarClient.builder()
.serviceUrl(serviceUrl)
.build();
Consumer consumer = client.newConsumer(GenericSchema.of())
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(0, TimeUnit.SECONDS)
.subscribe();
while (true) {
Message message = consumer.receive();
try {
System.out.println("Received message: " + message.getValue());
// 处理消息的逻辑
consumer.acknowledge(message);
} catch (Exception e) {
// 处理异常情况
consumer.negativeAcknowledge(message);
}
}
}
}
示例代码:
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
public class AtMostOnceExample {
public static void main(String[] args) throws PulsarClientException {
String serviceUrl = "pulsar://localhost:6650";
String topic = "persistent://public/default/my-topic";
PulsarClient client = PulsarClient.builder()
.serviceUrl(serviceUrl)
.build();
Consumer consumer = client.newConsumer(GenericSchema.of())
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(0, TimeUnit.SECONDS)
.subscribe();
while (true) {
Message message = consumer.receive();
System.out.println("Received message: " + message.getValue());
// 处理消息的逻辑
}
}
}
请注意,以上示例代码仅展示了如何设置消息传递语义,实际使用中还需要根据具体的业务逻辑进行相应的处理和错误处理。