以下是一个使用ActiveMQ实现单个生产者和多个消费者的示例代码:
生产者代码:
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Producer {
public static void main(String[] args) throws JMSException {
// 创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建连接
Connection connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目标
Destination destination = session.createQueue("example.queue");
// 创建生产者
MessageProducer producer = session.createProducer(destination);
// 发送消息
for (int i = 0; i < 10; i++) {
TextMessage message = session.createTextMessage("Message " + i);
producer.send(message);
System.out.println("Sent message: " + message.getText());
}
// 关闭连接
connection.close();
}
}
消费者代码:
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Consumer implements MessageListener {
public static void main(String[] args) throws JMSException {
// 创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建连接
Connection connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目标
Destination destination = session.createQueue("example.queue");
// 创建消费者
MessageConsumer consumer = session.createConsumer(destination);
// 注册消息监听器
consumer.setMessageListener(new Consumer());
// 等待消息
System.out.println("Waiting for messages...");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 关闭连接
connection.close();
}
@Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Received message: " + textMessage.getText());
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
上述代码创建了一个ActiveMQ连接工厂,使用TCP协议连接到本地的ActiveMQ服务器。然后,使用连接工厂创建连接和会话。生产者代码创建了一个生产者,并发送10条文本消息到名为"example.queue"的队列中。
消费者代码创建了一个消费者,并注册了一个消息监听器。当有消息到达时,onMessage方法将被调用,打印接收到的消息。
请确保已经安装并启动了ActiveMQ服务器,并且在运行生产者和消费者代码之前。