Spring-AMQP重新排队消息计数基于JVM吗?

我正在寻找rabbitmq文档,似乎rabbitmq不处理消息重新传递计数。 如果我要手动ACK / NACK消息,我需要将重试计数保留在内存中(例如,通过使用correlationId作为映射中的唯一键),或者通过在消息中设置我自己的标头,并重新传送它(因此把它放在队列的末尾)

然而,这是弹簧处理的情况。 具体来说,我指的是RetryInterceptorBuilder.stateful()。maxAttempts(x)。 这个计数是特定于JVM的,还是以某种方式操纵消息?

例如,我有一个部署到2台服务器的Web应用程序,maxAttempts设置为5.总重新传输计数是否可能是5-9,具体取决于重新传递和重新处理的顺序。服务器?

Rabbit / AMQP不允许在基于拒绝重新排队时修改消息。

状态(基于messageId)在RetryContextCache维护; 默认是MapRetryContextCache 。 这并不适合“群集”,因为正如你所说,尝试可能达到((maxAttempts - 1) * n + 1) ; 加上它会导致内存泄漏(某些服务器上的状态)。 您可以在RetryTemplate (构建器中的RetryTemplate配置SoftReferenceMapRetryContextCache以避免内存泄漏,但这只能解决内存泄漏问题。

您需要使用自定义RetryContextCache和一些持久共享存储(例如redis)。

我通常建议在这种情况下使用无状态恢复 – 重试完全在容器中完成,并且根本不涉及兔子(直到重试耗尽,在这种情况下,消息被丢弃或根据代理配置发送到DLX / DLQ) 。

如果您不关心消息顺序(并且我认为您没有给出竞争消费者),一种有趣的技术是拒绝该消息,将其发送到具有到期设置的DLQ,并且当DLQ消息到期时,将它路由回原始队列的尾部(而不是头部)。 在这种情况下,可以检查x-death标头以确定重试的次数。

这个答案和这个 答案有更多细节。