我是 Java 新手,正在开发一个使用多个(不同)主题并将其发送到另一台服务器的项目。我想知道处理多个主题的最佳方法是什么。
据我了解,每个消费者都与一个主题相关,因此,如果我必须使用多个主题,则每个不同的主题都需要一个消费者。由于消费者进行阻塞调用,我需要为每个消费者调用一个线程来并行消费这些主题。
如果我想进一步提高吞吐量,那么每个消费者有一个老板线程(附加到一个主题)并允许每个老板线程设置工作线程以相应地提高性能是一种好习惯吗?
请告知这是否是一个好的做法,如果不是,还有其他替代选择吗?有没有众所周知的设计模式来处理这个问题
为什么我选择消费者模型而不是侦听器模型?
我还有一个限制,即消费者收到消息后需要将消息发送到另一台接收服务器。如果接收服务器关闭(在新版本推送期间),那么我必须暂停消费消息,直到接收服务器启动。在这种情况下,拥有消息侦听器将无济于事,因为当接收服务器关闭时我将无法暂停侦听器。我说的对吗?还是有办法暂停监听器并停止消费消息,直到接收服务器启动?
我的方法是使用监听器功能。
你的对象实现了MessageListener
接口,然后将消息侦听器添加到消费者。在这种情况下,客户端库将为您处理线程,从队列中读取消息并将它们发送给侦听器。
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MyMessageConsumer implements MessageListener {
public static void main() {
try {
MyMessageConsumer myMessageConsumer = new MyMessageConsumer();
// This example is using the ActiveMQ client library
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("nio://localhost:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination1 = session.createQueue("MyTopic1");
MessageConsumer consumer1 = session.createConsumer(destination1);
consumer1.setMessageListener(myMessageConsumer);
Destination destination2 = session.createQueue("MyTopic2");
MessageConsumer consumer2 = session.createConsumer(destination2);
consumer2.setMessageListener(myMessageConsumer);
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
@Override
public void onMessage(Message message) {
// Handle my messages here
}
}
会话交易
在此选项中,我们使用事务处理消息,如果调用 session.rollback() ,它将传递消息。当操作成功时,您确认();当操作失败时,您回滚()。
包 io.bessel.test;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MyMessageConsumer implements MessageListener {
public static void main(String ... arguments) {
try {
// This example is using the ActiveMQ client library
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("nio://localhost:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MyMessageConsumer myMessageConsumer = new MyMessageConsumer(session);
Destination destination1 = session.createQueue("MyTopic1");
MessageConsumer consumer1 = session.createConsumer(destination1);
consumer1.setMessageListener(myMessageConsumer);
Destination destination2 = session.createQueue("MyTopic2");
MessageConsumer consumer2 = session.createConsumer(destination2);
consumer2.setMessageListener(myMessageConsumer);
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
private final Session session;
public MyMessageConsumer(Session session) {
this.session = session;
}
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
String text = ((TextMessage) message).getText();
System.out.println(String.format("Received message: %s", text));
this.session.rollback();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)