如何使用java和spring 3.0从JMS主题(而不是队列)同时处理多个消息?

请注意,我希望多个消息侦听器同时处理来自主题的连续消息。 此外,我希望每个消息监听器都以事务方式运行,以便给定消息监听器中的处理失败将导致该监听器的消息保留在该主题上。

Spring DefaultMessageListenerContainer似乎只支持JMS队列的并发性。

我是否需要实例化多个DefaultMessageListenerContainers?

如果时间沿垂直轴向下流动:

ListenerA reads msg 1 ListenerB reads msg 2 ListenerC reads msg 3 ListenerA reads msg 4 ListenerB reads msg 5 ListenerC reads msg 6 ListenerA reads msg 7 ListenerB reads msg 8 ListenerC reads msg 9 ListenerA reads msg 10 ListenerB reads msg 11 ListenerC reads msg 12 ... 

更新:
感谢您对T. T.Rob和@skaffman的反馈。

我最终做的是创建多个concurrency=1 DefaultMessageListenerContainers ,然后将逻辑放在消息监听器中,这样只有一个线程可以处理给定的消息ID。

您不需要多个DefaultMessageListenerContainer实例,但是您需要使用concurrentConsumers属性将DefaultMessageListenerContainer配置为concurrentConsumers

指定要创建的并发使用者数。 默认值为1。

为此设置指定更高的值将在运行时增加计划并发使用者的标准级别:这实际上是将在任何给定时间安排的最小并发使用者数。 这是一个静态设置; 对于动态缩放,请考虑指定“maxConcurrentConsumers”设置。

建议增加并发使用者的数量,以便扩展从队列进入的消息的消耗。 但请注意,一旦多个消费者注册,任何订购保证都会丢失。 一般来说,坚持使用1个消费者来获得低容量队列。

然而,底部有一个很大的警告:

不要为主题增加并发使用者的数量 。 这将导致同时消费同一消息,这几乎是不可取的。

这很有意思,当你想到它时会有意义。 如果您有多个DefaultMessageListenerContainer实例,则会发生相同的情况。

我想也许你需要重新考虑你的设计,虽然我不确定我的建议。 同时消费发布/订阅消息似乎是一件非常合理的事情,但是如何避免同时向所有消费者传递相同的消息?

至少在ActiveMQ中你想要的是完全支持的,他的名字是VirtualTopic

这个概念是:

  1. 您可以创建VirtualTopic (只需使用VirtualTopic.前缀创建主题VirtualTopic. )例如。 VirtualTopic.Color
  2. 创建订阅此VirtualTopic的消费者,该消费者匹配此模式Consumer..VirtualTopic.例如。 Consumer.client1.VirtualTopic.Color ,这样做,Activemq 将创建一个具有该名称的队列 ,该队列将订阅VirtualTopic.Color然后发布到此Virtual Topic的每条消息都将被传递到client1队列,请注意它的工作方式类似于rabbitmq交换。
  3. 您已经完成了,现在您可以像每个队列一样使用client1队列,包括许多消费者,DLQ,自定义重新传递策略等。
  4. 此时我认为您了解可以创建client2client3以及您想要多少订阅者 ,所有这些订阅者都将收到发布到VirtualTopic.Color的消息副本

这里的代码

 @Component public class ColorReceiver { private static final Logger LOGGER = LoggerFactory.getLogger(MailReceiver.class); @Autowired private JmsTemplate jmsTemplate; // simply generating data to the topic long id=0; @Scheduled(fixedDelay = 500) public void postMail() throws JMSException, IOException { final Color colorName = new Color[]{Color.BLUE, Color.RED, Color.WHITE}[new Random().nextInt(3)]; final Color color = new Color(++id, colorName.getName()); final ActiveMQObjectMessage message = new ActiveMQObjectMessage(); message.setObject(color); message.setProperty("color", color.getName()); LOGGER.info("status=color-post, color={}", color); jmsTemplate.convertAndSend(new ActiveMQTopic("VirtualTopic.color"), message); } /** * Listen all colors messages */ @JmsListener( destination = "Consumer.client1.VirtualTopic.color", containerFactory = "colorContainer" selector = "color <> 'RED'" ) public void genericReceiveMessage(Color color) throws InterruptedException { LOGGER.info("status=GEN-color-receiver, color={}", color); } /** * Listen only red colors messages * * the destination ClientId have not necessary exists (it means that his name can be a fancy name), the unique requirement is that * the containers clientId need to be different between each other */ @JmsListener( // destination = "Consumer.redColorContainer.VirtualTopic.color", destination = "Consumer.client1.VirtualTopic.color", containerFactory = "redColorContainer", selector = "color='RED'" ) public void receiveMessage(ObjectMessage message) throws InterruptedException, JMSException { LOGGER.info("status=RED-color-receiver, color={}", message.getObject()); } /** * Listen all colors messages */ @JmsListener( destination = "Consumer.client2.VirtualTopic.color", containerFactory = "colorContainer" ) public void genericReceiveMessage2(Color color) throws InterruptedException { LOGGER.info("status=GEN-color-receiver-2, color={}", color); } } @SpringBootApplication @EnableJms @EnableScheduling @Configuration public class Config { /** * Each @JmsListener declaration need a different containerFactory because ActiveMQ requires different * clientIds per consumer pool (as two @JmsListener above, or two application instances) * */ @Bean public JmsListenerContainerFactory colorContainer(ActiveMQConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) { final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setConcurrency("1-5"); configurer.configure(factory, connectionFactory); // container.setClientId("aId..."); lets spring generate a random ID return factory; } @Bean public JmsListenerContainerFactory redColorContainer(ActiveMQConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) { // necessary when post serializable objects (you can set it at application.properties) connectionFactory.setTrustedPackages(Arrays.asList(Color.class.getPackage().getName())); final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setConcurrency("1-2"); configurer.configure(factory, connectionFactory); return factory; } } public class Color implements Serializable { public static final Color WHITE = new Color("WHITE"); public static final Color BLUE = new Color("BLUE"); public static final Color RED = new Color("RED"); private String name; private long id; // CONSTRUCTORS, GETTERS AND SETTERS } 

这是运输提供商的差异在JMS的抽象中浮现的那些场合之一。 JMS希望为主题上的每个订阅者提供消息的副本。 但是你想要的行为实际上是一个队列。 我怀疑还有其他要求推动这个未被描述的发布/订阅解决方案 – 例如,其他事情需要订阅独立于您的应用的相同主题。

如果我在WebSphere MQ中执行此操作,解决方案是创建管理订阅,这将导致给定主题上的每个消息的单个副本放置到队列中。 然后,您的多个订阅者可以竞争该队列上的消息。 这样,您的应用程序可以具有多个线程,其中消息被分发,同时独立于此应用程序的其他订阅者可以动态地(取消)订阅相同的主题。

不幸的是,没有通用的JMS可移植方式。 您在很大程度上依赖于运输提供商的实施。 我可以说的唯一一个是WebSphere MQ,但我确信其他传输方式会以某种方式支持这种情况,如果您具有创造性,则会有不同程度的支持。

这是一种可能性:

1)仅创建一个配置有bean和方法的DMLC来处理传入消息。 将其并发性设置为1。

2)配置一个任务执行程序,其#threads等于你想要的并发性。 为实际应该处理消息的对象创建对象池。 将任务执行程序和对象池的引用提供给您在#1中配置的bean。 如果实际的消息处理bean不是线程安全的,那么对象池很有用。

3)对于传入消息,DMLC中的bean创建一个自定义Runnable,将其指向消息和对象池,并将其提供给任务执行程序。

4)Runnable的run方法从对象池中获取一个bean,并使用给定的消息调用其“process”方法。

#4可以使用代理和对象池进行管理,以使其更容易。

我还没有尝试过这个解决方案,但似乎符合要求。 请注意,此解决方案不如EJB MDB强大。 如果抛出RuntimeException,Spring将不会丢弃池中的对象。

我遇到了同样的问题。 我正在调查RabbitMQ,它似乎在他们称之为“工作队列”的设计模式中提供了一个完美的解决方案。 更多信息: http : //www.rabbitmq.com/tutorials/tutorial-two-java.html

如果你没有完全依赖于JMS,你可能会对此进行调查。 可能还有一个JMS到AMQP桥,但这可能开始看起来很hacky。

我有一些乐趣(阅读:困难)在我的Mac上安装和运行RabbitMQ,但我认为我已接近使用它,如果我能够解决这个问题,我会回复。

碰到了这个问题。 我的配置是:

创建一个id="DefaultListenerContainer"的bean,添加属性name="concurrentConsumers" value="10"和property name="maxConcurrentConsumers" value ="50"

到目前为止工作正常。 我打印了线程ID并validation了多个线程是否已创建并重用。