要实现AMQP(RabbitMQ)具有集群客户端的发布/订阅,可以使用RabbitMQ的Java客户端库。下面是一个简单的代码示例:
import com.rabbitmq.client.*;
import java.io.IOException;
public class Publisher {
private static final String EXCHANGE_NAME = "my_exchange";
private static final String QUEUE_NAME = "my_queue";
private static final String ROUTING_KEY = "my_routing_key";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
String message = "Hello, World!";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes("UTF-8"));
System.out.println("Message sent: " + message);
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
public class Subscriber {
private static final String EXCHANGE_NAME = "my_exchange";
private static final String QUEUE_NAME = "my_queue";
private static final String ROUTING_KEY = "my_routing_key";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message);
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
在这个示例中,发布者(Publisher)将消息发送到特定的交换机(Exchange)和路由键(Routing Key),然后订阅者(Subscriber)从队列中接收消息。通过使用相同的交换机和队列,多个订阅者可以在一个集群中同时接收消息。
请注意,你需要替换示例中的主机名、端口号、用户名和密码,以便与你的RabbitMQ集群配置匹配。