如何在Spring启动中实现循环队列使用者

我正在构建一个消息驱动的服务,它将在一个集群中运行,需要以循环方式从RabbitMQ队列中提取消息。 该实现目前正在逐步将消息从队列中拉出,导致一些服务器得到备份而其他服务器处于空闲状态。

当前的QueueConsumerConfiguration.java如下所示:

@Configuration public class QueueConsumerConfiguration extends RabbitMqConfiguration { private Logger LOG = LoggerFactory.getLogger(QueueConsumerConfiguration.class); private static final int DEFAULT_CONSUMERS=2; @Value("${eventservice.inbound}") protected String inboudEventQueue; @Value("${eventservice.consumers}") protected int queueConsumers; @Autowired private EventHandler eventtHandler; @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); template.setRoutingKey(this.inboudEventQueue); template.setQueue(this.inboudEventQueue); template.setMessageConverter(jsonMessageConverter()); return template; } @Bean public Queue inboudEventQueue() { return new Queue(this.inboudEventQueue); } @Bean public SimpleMessageListenerContainer listenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory()); container.setQueueNames(this.inboudEventQueue); container.setMessageListener(messageListenerAdapter()); if (this.queueConsumers > 0) { LOG.info("Starting queue consumers:" + this.queueConsumers ); container.setMaxConcurrentConsumers(this.queueConsumers); container.setConcurrentConsumers(this.queueConsumers); } else { LOG.info("Starting default queue consumers:" + DEFAULT_CONSUMERS); container.setMaxConcurrentConsumers(DEFAULT_CONSUMERS); container.setConcurrentConsumers(DEFAULT_CONSUMERS); } return container; } @Bean public MessageListenerAdapter messageListenerAdapter() { return new MessageListenerAdapter(this.eventtHandler, jsonMessageConverter()); } } 

是否只是添加

 container.setChannelTransacted(true); 

配置?

RabbitMQ对所有消费者都一视同仁 – 它知道一个容器中的多个消费者之间没有区别。 多个容器中的一个消费者(例如,在不同的主机上)。 从Rabbit的角度来看,每个人都是消费者。

如果您想要更多地控制服务器关联,则需要使用多个队列,每个容器都在侦听自己的队列。

然后,您可以控制生产者端的分发 – 例如,使用主题或直接交换和特定路由密钥将消息路由到特定队列。

这将生产者与消费者紧密联系在一起(他必须知道有多少人)。

或者你可以让你的制作人使用路由键rk.0, rk.1, ..., rk.29 (反复,当达到30时重置为0)。

然后,您可以使用多个绑定绑定使用者队列 –

消费者1获得rk.0到rk.9,2获得rk.10到rk.19等等。

如果您决定增加使用者的数量,只需适当地重构绑定以重新分配工作。

容器将按需扩展到maxConcurrentConsumers,但实际上,仅当整个容器空闲一段时间时才会缩小。