要实现ActiveMQ中的持久化主题和并发性,可以使用以下代码示例:
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TopicProducer {
private static final String BROKER_URL = "tcp://localhost:61616";
private static final String TOPIC_NAME = "persistent.topic";
public static void main(String[] args) {
try {
// 创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
// 创建连接
Connection connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建主题
Topic topic = session.createTopic(TOPIC_NAME);
// 创建消息生产者
MessageProducer producer = session.createProducer(topic);
// 创建消息
TextMessage message = session.createTextMessage("Hello, ActiveMQ!");
// 发布消息
producer.send(message);
System.out.println("消息已发送");
// 关闭连接
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class DurableSubscriber {
private static final String BROKER_URL = "tcp://localhost:61616";
private static final String TOPIC_NAME = "persistent.topic";
private static final String SUBSCRIBER_NAME = "durable.subscriber";
public static void main(String[] args) {
try {
// 创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
// 创建连接
Connection connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建主题
Topic topic = session.createTopic(TOPIC_NAME);
// 创建持久化订阅者
MessageConsumer consumer = session.createDurableSubscriber(topic, SUBSCRIBER_NAME);
// 接收消息
Message message = consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("接收到消息: " + textMessage.getText());
}
// 关闭连接
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class NonDurableSubscriber {
private static final String BROKER_URL = "tcp://localhost:61616";
private static final String TOPIC_NAME = "persistent.topic";
public static void main(String[] args) {
try {
// 创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
// 创建连接
Connection connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建主题
Topic topic = session.createTopic(TOPIC_NAME);
// 创建非持久化订阅者
MessageConsumer consumer = session.createConsumer(topic);
// 设置消息监听器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收到消息: " + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
Thread.sleep(5000); // 等待接收消息
// 关闭连接
connection.close();
} catch (JMSException | InterruptedException e) {
e.printStackTrace();
}
}
}
这些示例代码演示了如何在ActiveMQ中实现持久化主题和并发性。发布者将消息发送到主题,持久化订阅者和非持久化订阅者将接收和处理这些消息。持久化订阅者在断开