如何在Spring异步MessageListener用例中发生业务exception时请求RabbitMQ重试

我有一个Spring AMQP消息监听器正在运行。

public class ConsumerService implements MessageListener { @Autowired RabbitTemplate rabbitTemplate; @Override public void onMessage(Message message) { try { testService.process(message); //This process method can throw Business Exception } catch (BusinessException e) { //Here we can just log the exception. How the retry attempt is made? } catch (Exception e) { //Here we can just log the exception. How the retry attempt is made? } } } 

如您所见,在处理过程中可能会出现exception。 我想重试因为Catch块中的特定错误。 我不能通过onMessage中的exception。 如何告诉RabbitMQ有exception并重试?

由于onMessage()不允许抛出已检查的exception,因此可以在RuntimeException包装exception并重新抛出它。

 try { testService.process(message); } catch (BusinessException e) { throw new RuntimeException(e); } 

但请注意,这可能会导致邮件无限期地重新传递。 这是如何工作的:

RabbitMQ支持拒绝消息并要求代理重新排队。 这显示在这里 。 但RabbitMQ本身并没有重试策略的机制,例如设置最大重试次数,延迟等。

使用Spring AMQP时,“拒绝重新排队”是默认选项。 当存在未处理的exception时,Spring的SimpleMessageListenerContainer将默认执行此操作。 所以在你的情况下你只需要重新抛出被捕获的exception。 但请注意,如果您无法处理消息并且始终抛出exception,则会无限期地重新传递该消息并导致无限循环。

您可以通过抛出AmqpRejectAndDontRequeueExceptionexception来覆盖每个消息的此行为,在这种情况下,消息不会被重新排队。

您还可以通过设置完全关闭SimpleMessageListenerContainer的“拒绝重新排队”行为

 container.setDefaultRequeueRejected(false) 

当消息被拒绝且没有被重新排队时,如果在RabbitMQ中设置了一个消息,它将丢失或传送到DLQ。

如果您需要具有最大尝试次数,延迟等的重试策略,最简单的方法是设置弹簧“无状态” RetryOperationsInterceptor ,它将在线程内执行所有重试(使用Thread.sleep() ),而不会在每次重试时拒绝该消息(所以没有每次重试都会返回RabbitMQ)。 重试耗尽时,默认情况下会记录一条警告并消息。 如果要发送到DLQ,您将需要RepublishMessageRecoverer或自定义MessageRecoverer来拒绝该消息而不重新排队(在后一种情况下,您还应该在队列上设置 RabbitMQ DLQ)。 默认消息恢复器的示例:

 container.setAdviceChain(new Advice[] { org.springframework.amqp.rabbit.config.RetryInterceptorBuilder .stateless() .maxAttempts(5) .backOffOptions(1000, 2, 5000) .build() }); 

这显然有一个缺点,即您将在整个重试期间占用线程。 您还可以选择使用“有状态” RetryOperationsInterceptor ,它会在每次重试时将消息发送回RabbitMQ,但延迟仍将通过应用程序中的Thread.sleep()实现,另外设置有状态拦截器是一个有点复杂。

因此,如果您希望在不占用Thread情况下重试延迟,则需要在RabbitMQ队列上使用TTL进行更复杂的自定义解决方案。 如果你不想要指数退避(所以延迟不会在每次重试时增加),它会更简单一些。 要实现这样的解决方案,您基本上在rabbitMQ上创建另一个带有参数的队列: "x-message-ttl": "x-dead-letter-exchange":"" 。 然后在主队列上设置"x-dead-letter-exchange":"" 。 因此,现在当您拒绝并且不重新排队消息时,RabbitMQ会将其重定向到第二个队列。 当TTL过期时,它将被重定向到原始队列,从而重新传送到应用程序。 所以现在你需要一个重试拦截器,在每次失败后拒绝给RabbitMQ的消息,并跟踪重试次数。 为了避免在应用程序中保持状态的需要(因为如果您的应用程序是群集的,您需要复制状态),您可以从RabbitMQ设置的x-death标头计算重试计数。 在此处查看有关此标题的更多信息。 因此,在这一点上实现自定义拦截器比使用此行为自定义Spring状态拦截器更容易。

另请参阅Spring AMQP参考中有关重试的部分 。