在ActiveMQ和Camel中,可以通过使用RouteBuilder来创建多个路由,并使用CountDownLatch来等待所有路由完成。以下是一个示例代码:
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import java.util.concurrent.CountDownLatch;
public class MultipleRoutesExample {
public static void main(String[] args) throws Exception {
// 创建Camel上下文
CamelContext context = new DefaultCamelContext();
// 添加ActiveMQ组件
context.addComponent("activemq", ActiveMQComponent.activeMQComponent("tcp://localhost:61616"));
// 创建CountDownLatch,用于等待所有路由完成
final CountDownLatch latch = new CountDownLatch(2);
// 创建第一个路由
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("activemq:queue:input1")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
// 处理消息
System.out.println("Processing message from input1: " + exchange.getIn().getBody());
}
})
.to("activemq:queue:output1")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
// 完成后减少CountDownLatch计数
latch.countDown();
}
});
}
});
// 创建第二个路由
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("activemq:queue:input2")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
// 处理消息
System.out.println("Processing message from input2: " + exchange.getIn().getBody());
}
})
.to("activemq:queue:output2")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
// 完成后减少CountDownLatch计数
latch.countDown();
}
});
}
});
// 启动Camel上下文
context.start();
// 发送消息到input1队列
context.createProducerTemplate().sendBody("activemq:queue:input1", "Message 1");
// 发送消息到input2队列
context.createProducerTemplate().sendBody("activemq:queue:input2", "Message 2");
// 等待所有路由完成
latch.await();
// 停止Camel上下文
context.stop();
}
}
上面的代码创建了两个路由,分别从input1和input2队列接收消息,并将处理后的消息发送到output1和output2队列。在每个路由的最后,使用Processor将CountDownLatch的计数减少。
在main方法中,首先创建了CountDownLatch对象,然后启动了Camel上下文。接下来,使用ProducerTemplate发送消息到input1和input2队列。最后,使用latch.await()方法等待所有路由完成。
请确保已经正确安装和配置了ActiveMQ和Camel的依赖项。