ActiveMQ限制消费者

我想对activeMQ中的某个队列的消费者进行限制,在hornetq中(jboss,这是通过对mdb Consumer的定义进行注释来实现的)。 我在activemq的文档中找不到任何类似的东西,我找到的最接近的就是这个

consumer.recvDelay 0 ms Pause consumer for recvDelay milliseconds with each message (allows consumer throttling). 

来自: http : //activemq.apache.org/activemq-performance-module-users-manual.html

但在那里我找不到我如何在java中做到这一点。

提前致谢,

问候。

编辑:这是ActiveMQManager代码和消费者代码:

 public class ActiveMQManager { private static ActiveMQConnectionFactory CONNECTION_FACTORY; public static Connection CONNECTION; public static Session SESSION; public static Destination TEST_QUEUE; public static void start() { try { CONNECTION_FACTORY = new ActiveMQConnectionFactory("vm://localhost"); CONNECTION = CONNECTION_FACTORY.createConnection(); CONNECTION.start(); SESSION = CONNECTION.createSession(false, Session.CLIENT_ACKNOWLEDGE); TestClient testClient = new TestClient(); TEST_QUEUE = SESSION.createQueue("TEST.QUEUE"); MessageConsumer testConsumer = SESSION.createConsumer(TEST_QUEUE); test.setMessageListener(testClient); } catch (Exception e) { } } public static void stop() { try { // Clean up SESSION.close(); CONNECTION.close(); } catch (JMSException e) { log.error(e); } } 

}

消费者代码非常简单(对于此示例):

 public class TestConsumer implements MessageListener { @Override public void onMessage(Message message) { //Do something with the message } 

}

这取决于所使用的消费者技术……但这里有一些选择

  • 你可以在你的消费者代码中手动引入延迟(不是一个精确的科学,但这将限制吞吐量)

  • 您还可以通过设置JMS连接的maxConcurrentConsumers属性来控制消费者使用的线程数…也就是说,这不会限制消息吞吐量,只是限制消费者使用的并发级别

  • 更好的是,您可以使用throttler EIP实现设置每个时间段消耗的确切消息数

    例如,使用Camel Throttler这是微不足道的

    from("activemq:queueA").throttle(10).to("activemq:queueB")

使用ActiveMQ,您可以设置使用者预取限制: http : //activemq.apache.org/what-is-the-prefetch-limit-for.html

要对其进行配置,您可以使用连接URL(大多数配置可以使用URL完成)或Java API。

有关更有趣的参数: http : //activemq.apache.org/connection-configuration-uri.html

考虑到Camel Throttler在Throttler阻止时将内存交换保留在内存中。 因此,如果您有100个队列使用者并且服务器(暴露SOAP服务)很慢,那么您可能在内存中最多可以有100个交换!

为此发布此问题: 监听ActiveMQ队列的所有JMS使用者的节流消耗率