Apache Kafka和GCP PubSub是两种不同的消息传递系统,它们在设计和使用方式上存在一些区别。
Apache Kafka示例代码:
// 生产者代码
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer producer = new KafkaProducer<>(props);
ProducerRecord record = new ProducerRecord<>("my_topic", "key", "value");
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("Message sent successfully");
}
}
});
producer.close();
}
}
// 消费者代码
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "my_group");
Consumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my_topic"));
while (true) {
ConsumerRecords records = consumer.poll(1000);
for (ConsumerRecord record : records) {
System.out.println("Received message: " + record.value());
}
}
}
}
GCP PubSub示例代码:
// 发布者代码
import com.google.cloud.pubsub.v1.Publisher;
import com.google.pubsub.v1.PubsubMessage;
import com.google.protobuf.ByteString;
public class PubSubPublisherExample {
public static void main(String[] args) throws Exception {
String projectId = "your-project-id";
String topicId = "your-topic-id";
Publisher publisher = Publisher.newBuilder(topicName).build();
ByteString data = ByteString.copyFromUtf8("Hello, PubSub!");
PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
.setData(data)
.build();
publisher.publish(pubsubMessage);
publisher.shutdown();
}
}
// 订阅者代码
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
public class PubSubSubscriberExample {
public static void main(String[] args) throws Exception {
String projectId = "your-project-id";
String subscriptionId = "your-subscription-id";
ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId);
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, new MessageReceiver() {
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
System.out.println("Received message: " + message.getData().toStringUtf8());
consumer.ack();
}
}).build();
subscriber.start();
}
}
这些示例代码演示了如何在Apache Kafka和GCP PubSub中