如何使Spring JMSListener突发到最大并发线程?

我有一个使用ActiveMQ版本5.10的Spring JMS应用程序。 我正在执行简单的并发测试。 我使用Spring Boot,当前版本和注释来配置JMSListener和消息生成器。

消息生产者只是尽可能快地在队列上抛出消息。 消息侦听器将消息从队列中拉出,但在获取消息后hibernate1秒钟 – 模拟消息侦听器在获取消息后需要执行的一些工作。

我将JMSListener设置为100-1000个并发线程。 如果我同时启动消息生产者和消费者(都在他们自己的JVM中运行),即使最大范围设置为1000,消费者也永远不会超过最小配置的线程。

如果我让生产者首先启动并在队列上放置几千条消息,然后启动一个或多个消费者实例,它将稳定地提升线程,从100开始,然后每秒20个左右的线程,直到达到状态队列中有大约20-30条消息在飞行中。 它永远不会捕获生成器 – 即使消费者没有接近其maxConcurrency计数,也总会有一些消息在队列中。

为什么消息使用者没有突然进入一堆额外的线程来清空队列而不是让队列中有20-30条消息呢? 消费者是否有办法继续快速添加线程以赶上队列中的消息?

以下是代码的相关部分。

消息制作者

@Component public class ClientServiceImpl implements ClientService { private static final String QUEUE="message.test.queue"; @Autowired private JmsTemplate jmsTemplate; @Override public void submitMessage(ImportantMessage importantMessage) { System.out.println("*** Sending " + importantMessage); jmsTemplate.convertAndSend(QUEUE, importantMessage); } } 

消息消费者

 @SpringBootApplication @EnableJms public class AmqConsumerApplication { public static void main(String[] args) { SpringApplication.run(AmqConsumerApplication.class, args); } @Value("${JMSHost}") private String JMS_BROKER_URL; @Autowired static Command command; @Bean public ConnectionFactory connectionFactory() { ConnectionFactory factory= new ActiveMQConnectionFactory(JMS_BROKER_URL); ((ActiveMQConnectionFactory)factory).setTrustAllPackages(true); ((ActiveMQConnectionFactory)factory).setOptimizeAcknowledge(true); ((ActiveMQConnectionFactory)factory).setAlwaysSessionAsync(false); return factory; } } 

监听器配置为这样……

 @Component public class TransformationListener { private static final String QUEUE="message.test.queue?consumer.prefetchSize=10"; @JmsListener(destination=QUEUE, concurrency = "100-1000") public void handleRequest(ImportantMessage importantMessage) { System.out.println("*** Recieved message: " + importantMessage + " on thread" + Thread.currentThread().getId()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } 

你还在面对这种行为吗? 您是否在http://activemq.apache.org/what-is-the-prefetch-limit-for.html上阅读了“Pooled Consumers and prefetch”这个建议您是否尝试过prefetchSize = 0或1? 我认为1可以解决您的问题。 如果prefetchSize> 1,则可能需要将AbortSlowAckConsumerStrategy降低到低于默认值30s。 要在您的情况下有超过100个线程消耗消息,您需要超过1000条消息未消耗且未在队列中预取,因为prefetchSize为10。