要使用ActiveMQ的嵌入式推荐功能,可以按照以下步骤操作:
org.apache.activemq
activemq-broker
5.16.0
import org.apache.activemq.broker.BrokerService;
public class EmbeddedBrokerExample {
public static void main(String[] args) throws Exception {
// 创建ActiveMQ Broker
BrokerService broker = new BrokerService();
// 配置Broker属性
broker.setBrokerName("embeddedBroker");
broker.setPersistent(false);
broker.setUseJmx(false);
// 启动Broker
broker.start();
// 等待程序终止
Thread.sleep(Long.MAX_VALUE);
// 关闭Broker
broker.stop();
}
}
在上述示例中,创建了一个名为"embeddedBroker"的嵌入式Broker,配置了持久化为false(即非持久化消息),禁用了JMX监控,并通过start()方法启动了Broker。为了保持程序运行,使用Thread.sleep(Long.MAX_VALUE)方法来使程序休眠,直到手动停止。最后,通过调用stop()方法来关闭Broker。
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class MessageProducerExample {
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://embeddedBroker");
// 创建连接
Connection connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地(队列)
Destination destination = session.createQueue("exampleQueue");
// 创建消息生产者
MessageProducer producer = session.createProducer(destination);
// 创建消息
TextMessage message = session.createTextMessage("Hello, ActiveMQ!");
// 发送消息
producer.send(message);
// 关闭连接
connection.close();
}
}
在上述示例中,创建了一个连接工厂(使用"vm://embeddedBroker"作为Broker的URL),创建了一个连接,并启动了连接。然后,创建了一个会话和目的地(队列),然后创建了一个消息生产者,并使用createTextMessage()方法创建了一个文本消息。最后,通过调用send()方法发送消息,并关闭连接。
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class MessageConsumerExample {
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://embeddedBroker");
// 创建连接
Connection connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地(队列)
Destination destination = session.createQueue("exampleQueue");
// 创建消息消费者
MessageConsumer consumer = session.createConsumer(destination);
// 接收消息
Message message = consumer.receive();
// 处理消息
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Received message: " + textMessage.getText());
}
// 关闭连接
connection.close();
}
}
在上述示例中,创建了一个