ActiveMQ限制消费者
我想对activeMQ中的某个队列的消费者进行限制,在hornetq中(jboss,这是通过对mdb Consumer的定义进行注释来实现的)。 我在activemq的文档中找不到任何类似的东西,我找到的最接近的就是这个
consumer.recvDelay 0 ms Pause consumer for recvDelay milliseconds with each message (allows consumer throttling).
来自: http : //activemq.apache.org/activemq-performance-module-users-manual.html
但在那里我找不到我如何在java中做到这一点。
提前致谢,
问候。
编辑:这是ActiveMQManager代码和消费者代码:
public class ActiveMQManager { private static ActiveMQConnectionFactory CONNECTION_FACTORY; public static Connection CONNECTION; public static Session SESSION; public static Destination TEST_QUEUE; public static void start() { try { CONNECTION_FACTORY = new ActiveMQConnectionFactory("vm://localhost"); CONNECTION = CONNECTION_FACTORY.createConnection(); CONNECTION.start(); SESSION = CONNECTION.createSession(false, Session.CLIENT_ACKNOWLEDGE); TestClient testClient = new TestClient(); TEST_QUEUE = SESSION.createQueue("TEST.QUEUE"); MessageConsumer testConsumer = SESSION.createConsumer(TEST_QUEUE); test.setMessageListener(testClient); } catch (Exception e) { } } public static void stop() { try { // Clean up SESSION.close(); CONNECTION.close(); } catch (JMSException e) { log.error(e); } }
}
消费者代码非常简单(对于此示例):
public class TestConsumer implements MessageListener { @Override public void onMessage(Message message) { //Do something with the message }
}
这取决于所使用的消费者技术……但这里有一些选择
-
你可以在你的消费者代码中手动引入延迟(不是一个精确的科学,但这将限制吞吐量)
-
您还可以通过设置JMS连接的maxConcurrentConsumers属性来控制消费者使用的线程数…也就是说,这不会限制消息吞吐量,只是限制消费者使用的并发级别
-
更好的是,您可以使用throttler EIP实现设置每个时间段消耗的确切消息数
例如,使用Camel Throttler这是微不足道的
from("activemq:queueA").throttle(10).to("activemq:queueB")
使用ActiveMQ,您可以设置使用者预取限制: http : //activemq.apache.org/what-is-the-prefetch-limit-for.html
要对其进行配置,您可以使用连接URL(大多数配置可以使用URL完成)或Java API。
有关更有趣的参数: http : //activemq.apache.org/connection-configuration-uri.html
考虑到Camel Throttler在Throttler阻止时将内存交换保留在内存中。 因此,如果您有100个队列使用者并且服务器(暴露SOAP服务)很慢,那么您可能在内存中最多可以有100个交换!
为此发布此问题: 监听ActiveMQ队列的所有JMS使用者的节流消耗率