multithreadingJMS客户端ActiveMQ

我使用以下代码为多个消费者创建多个JMS会话以使用消息。 我的问题是代码以单线程方式运行。 即使消息存在于队列中,第二个线程也无法接收任何内容并且只是保持轮询。 第一个线程同时完成第一批处理并返回并消耗剩余的消息。 这里的使用有什么问题吗?

static { try { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://172.16.143.99:61616"); connection = connectionFactory.createConnection(); connection.start(); } catch (JMSException e) { LOGGER.error("Unable to initialise JMS Queue.", e); } } public JMSClientReader(boolean isQueue, String name) throws QueueException { init(isQueue,name); } @Override public void init(boolean isQueue, String name) throws QueueException { // Create a Connection try { // Create a Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); if (isQueue) { destination = new ActiveMQQueue(name);// session.createQueue("queue"); } else { destination = new ActiveMQTopic(name);// session.createTopic("topic"); } consumer = session.createConsumer(destination); } catch (JMSException e) { LOGGER.error("Unable to initialise JMS Queue.", e); throw new QueueException(e); } } public String readQueue() throws QueueException { // connection.setExceptionListener(this); // Wait for a message String text = null; Message message; try { message = consumer.receive(1000); if(message==null) return "done"; if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; text = textMessage.getText(); LOGGER.info("Received: " + text); } else { throw new JMSException("Invalid message found"); } } catch (JMSException e) { LOGGER.error("Unable to read message from Queue", e); throw new QueueException(e); } LOGGER.info("Message read is " + text); return text; } 

你的问题是prefetchPolicy。

 persistent queues (default value: 1000) non-persistent queues (default value: 1000) persistent topics (default value: 100) non-persistent topics (default value: Short.MAX_VALUE - 1) 

所有消息都被分派到第一个连接的使用者,当另一个消息连接时他没有收到消息,因此如果您有队列的并发使用者,则需要将prefetchPolicy设置为低于默认值的值,以更改此行为。 例如,将此jms.prefetchPolicy.queuePrefetch=1添加到activemq.xml中的uri配置中,或者将其设置在客户端URL上,如下所示

 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://172.16.143.99:61616?jms.prefetchPolicy.queuePrefetch=1"); 

建议使用较大的预取值以实现高消息量的高性能。 但是,对于较低的消息量,每个消息需要很长时间才能处理,因此预取应设置为1.这可确保消费者一次只处理一条消息。 但是,将预取限制指定为零将导致使用者一次一个地轮询消息,而不是将消息推送到使用者。

看看http://activemq.apache.org/what-is-the-prefetch-limit-for.html

http://activemq.apache.org/destination-options.html