Spring RabbitMQ – 在具有@RabbitListener配置的服务上使用手动通道确认

如何在不使用自动确认的情况下手动确认消息。 有没有办法使用它与@RabbitListener@EnableRabbit配置样式。 大多数文档告诉我们使用SimpleMessageListenerContainerChannelAwareMessageListener 。 但是使用它会失去注释提供的灵活性。 我已经配置了我的服务如下:

 @Service public class EventReceiver { @Autowired private MessageSender messageSender; @RabbitListener(queues = "${eventqueue}") public void receiveMessage(Order order) throws Exception { // code for processing order } 

我的RabbitConfiguration如下

 @EnableRabbit public class RabbitApplication implements RabbitListenerConfigurer { public static void main(String[] args) { SpringApplication.run(RabbitApplication.class, args); } @Bean public MappingJackson2MessageConverter jackson2Converter() { MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); return converter; @Bean public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(rabbitConnectionFactory()); factory.setMaxConcurrentConsumers(5); factory.setMessageConverter((MessageConverter) jackson2Converter()); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } @Bean public ConnectionFactory rabbitConnectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost("localhost"); return connectionFactory; } @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { registrar.setContainerFactory(myRabbitListenerContainerFactory()); } @Autowired private EventReceiver receiver; } } 

如何调整手动通道确认以及上述配置风格将获得任何帮助。 如果我们实现ChannelAwareMessageListener,那么onMessage签名将会改变。 我们可以在服务上实现ChannelAwareMessageListener吗?

Channel添加到@RabbitListener方法…

 @RabbitListener(queues = "${eventqueue}") public void receiveMessage(Order order, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception { ... } 

并使用basicAckbasicReject的标记。

编辑

 @SpringBootApplication @EnableRabbit public class So38728668Application { public static void main(String[] args) throws Exception { ConfigurableApplicationContext context = SpringApplication.run(So38728668Application.class, args); context.getBean(RabbitTemplate.class).convertAndSend("", "so38728668", "foo"); context.getBean(Listener.class).latch.await(60, TimeUnit.SECONDS); context.close(); } @Bean public Queue so38728668() { return new Queue("so38728668"); } @Bean public Listener listener() { return new Listener(); } public static class Listener { private final CountDownLatch latch = new CountDownLatch(1); @RabbitListener(queues = "so38728668") public void receive(String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException { System.out.println(payload); channel.basicAck(tag, false); latch.countDown(); } } } 

application.properties:

 spring.rabbitmq.listener.acknowledge-mode=manual 

感谢Gary的帮助。 我终于解决了这个问题。 我正在记录这个以造福他人。 这需要在Spring AMQP参考文档页面中作为标准文档的一部分进行记录。 服务等级如下。

  @Service public class Consumer { @RabbitListener(queues = "${eventqueue}") public void receiveMessage(Order order, Channel channel) throws Exception { // the above methodname can be anything but should have channel as second signature channel.basicConsume(eventQueue, false, channel.getDefaultConsumer()); // Get the delivery tag long deliveryTag = channel.basicGet(eventQueue, false).getEnvelope().getDeliveryTag(); try { // code for processing order catch(Exception) { // handle exception channel.basicReject(deliveryTag, true); } // If all logic is successful channel.basicAck(deliveryTag, false); } 

配置也已修改如下

 public class RabbitApplication implements RabbitListenerConfigurer { private static final Logger log = LoggerFactory.getLogger(RabbitApplication .class); public static void main(String[] args) { SpringApplication.run(RabbitApplication.class, args); } @Bean public MappingJackson2MessageConverter jackson2Converter() { MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); return converter; } @Bean public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() { DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); factory.setMessageConverter(jackson2Converter()); return factory; } @Autowired private Consumer consumer; @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory()); } ... } 

注意:不需要配置Rabbitconnectionfactory或containerfactor等,因为注释隐含处理所有这些。