如何在Spring异步MessageListener用例中发生业务exception时请求RabbitMQ重试
我有一个Spring AMQP消息监听器正在运行。
public class ConsumerService implements MessageListener { @Autowired RabbitTemplate rabbitTemplate; @Override public void onMessage(Message message) { try { testService.process(message); //This process method can throw Business Exception } catch (BusinessException e) { //Here we can just log the exception. How the retry attempt is made? } catch (Exception e) { //Here we can just log the exception. How the retry attempt is made? } } }
如您所见,在处理过程中可能会出现exception。 我想重试因为Catch块中的特定错误。 我不能通过onMessage中的exception。 如何告诉RabbitMQ有exception并重试?
由于onMessage()
不允许抛出已检查的exception,因此可以在RuntimeException
包装exception并重新抛出它。
try { testService.process(message); } catch (BusinessException e) { throw new RuntimeException(e); }
但请注意,这可能会导致邮件无限期地重新传递。 这是如何工作的:
RabbitMQ支持拒绝消息并要求代理重新排队。 这显示在这里 。 但RabbitMQ本身并没有重试策略的机制,例如设置最大重试次数,延迟等。
使用Spring AMQP时,“拒绝重新排队”是默认选项。 当存在未处理的exception时,Spring的SimpleMessageListenerContainer
将默认执行此操作。 所以在你的情况下你只需要重新抛出被捕获的exception。 但请注意,如果您无法处理消息并且始终抛出exception,则会无限期地重新传递该消息并导致无限循环。
您可以通过抛出AmqpRejectAndDontRequeueException
exception来覆盖每个消息的此行为,在这种情况下,消息不会被重新排队。
您还可以通过设置完全关闭SimpleMessageListenerContainer
的“拒绝重新排队”行为
container.setDefaultRequeueRejected(false)
当消息被拒绝且没有被重新排队时,如果在RabbitMQ中设置了一个消息,它将丢失或传送到DLQ。
如果您需要具有最大尝试次数,延迟等的重试策略,最简单的方法是设置弹簧“无状态” RetryOperationsInterceptor
,它将在线程内执行所有重试(使用Thread.sleep()
),而不会在每次重试时拒绝该消息(所以没有每次重试都会返回RabbitMQ)。 重试耗尽时,默认情况下会记录一条警告并消息。 如果要发送到DLQ,您将需要RepublishMessageRecoverer
或自定义MessageRecoverer
来拒绝该消息而不重新排队(在后一种情况下,您还应该在队列上设置 RabbitMQ DLQ)。 默认消息恢复器的示例:
container.setAdviceChain(new Advice[] { org.springframework.amqp.rabbit.config.RetryInterceptorBuilder .stateless() .maxAttempts(5) .backOffOptions(1000, 2, 5000) .build() });
这显然有一个缺点,即您将在整个重试期间占用线程。 您还可以选择使用“有状态” RetryOperationsInterceptor
,它会在每次重试时将消息发送回RabbitMQ,但延迟仍将通过应用程序中的Thread.sleep()
实现,另外设置有状态拦截器是一个有点复杂。
因此,如果您希望在不占用Thread
情况下重试延迟,则需要在RabbitMQ队列上使用TTL进行更复杂的自定义解决方案。 如果你不想要指数退避(所以延迟不会在每次重试时增加),它会更简单一些。 要实现这样的解决方案,您基本上在rabbitMQ上创建另一个带有参数的队列: "x-message-ttl":
"x-dead-letter-exchange":"
。 然后在主队列上设置"x-dead-letter-exchange":"
。 因此,现在当您拒绝并且不重新排队消息时,RabbitMQ会将其重定向到第二个队列。 当TTL过期时,它将被重定向到原始队列,从而重新传送到应用程序。 所以现在你需要一个重试拦截器,在每次失败后拒绝给RabbitMQ的消息,并跟踪重试次数。 为了避免在应用程序中保持状态的需要(因为如果您的应用程序是群集的,您需要复制状态),您可以从RabbitMQ设置的x-death
标头计算重试计数。 在此处查看有关此标题的更多信息。 因此,在这一点上实现自定义拦截器比使用此行为自定义Spring状态拦截器更容易。
另请参阅Spring AMQP参考中有关重试的部分 。
- 如何创建包含Tomcat和MySQL的Java webapp安装程序(.exe)?“
- java.net.ConnectException:连接被拒绝
- 在同一时间激活的多个进程中以Java方式更新数据
- 如何使用FileInputStream访问jar中的txt文件?
- Eclipse IDE插件开发:将文件从插件jar复制到活动项目文件夹
- 从J2ME客户端轮询HTTP服务器
- MongoDB:不能使用游标迭代所有数据
- MongoDB Java插入引发org.bson.codecs.configuration.CodecConfigurationException:找不到类io.github.ilkgunel.mongodb.Pojo的编解码器
- 如何将所选对象从一个JList传输到另一个JList?