为测试Apache Pulsar服务的性能,我们可以使用如下代码示例:
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import java.util.concurrent.TimeUnit;
public class ProducerTest {
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String TOPIC_NAME = "test-topic";
public static void main(String[] args) throws PulsarClientException, InterruptedException {
PulsarClient client = PulsarClient.builder()
.serviceUrl(SERVICE_URL)
.build();
Producer producer = client.newProducer(Schema.STRING)
.topic(TOPIC_NAME)
.create();
long start = System.currentTimeMillis();
for (int i = 0; i < 1000000; i++) {
String message = "Test message " + i;
producer.send(message);
}
long end = System.currentTimeMillis();
System.out.println("Total time: " + TimeUnit.MILLISECONDS.toSeconds(end - start) + "s");
producer.close();
client.close();
}
}
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
public class ConsumerTest {
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String TOPIC_NAME = "test-topic";
private static final String SUBSCRIPTION_NAME = "test-subscription";
public static void main(String[] args) throws InvalidConfigurationException, PulsarClientException {
PulsarClient client = PulsarClient.builder()
.serviceUrl(SERVICE_URL)
.build();
Consumer consumer = client.newConsumer(Schema.STRING)
.topic(TOPIC_NAME)
.subscriptionName(SUBSCRIPTION_NAME)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
while (true) {
Message message = consumer.receive();
System.out.println("Received message: " + message.getValue());
consumer.acknowledge(message);
}
}
}
注意:在运行代码时,需要确保Pulsar服务已在本地启动并运行在默认端口上。