RabbitMQ:快速的生产者和缓慢的消费者

我有一个应用程序使用RabbitMQ作为消息队列在两个组件之间发送/接收消息:发送方和接收方。 发件人以非常快的方式发送消息。 接收器接收消息然后执行一些非常耗时的任务(主要是针对非常大的数据大小的数据库写入)。 由于接收器需要很长时间才能完成任务,然后检索队列中的下一条消息,因此发送方将继续快速填满队列。 所以我的问题是:这会导致消息队列溢出吗?

消息使用者如下所示:

public void onMessage() throws IOException, InterruptedException { channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); JSONObject json = new JSONObject(message); String caseID = json.getString("caseID"); //following takes very long time dao.saveToDB(caseID); } } 

消费者收到的每条消息都包含一个caseID。 对于每个caseID,它会将大量数据保存到数据库中,这需要很长时间。 目前只为RabbitMQ设置了一个消费者,因为生产者/消费者使用相同的队列来发布/订阅caseID。 那么如何才能加快消费者吞吐量,以便消费者能够赶上生产者并避免队列中的消息溢出? 我应该在消费者部分使用multithreading来加快消费率吗? 或者我应该使用多个消费者同时使用传入的消息? 或者是否存在任何异步方式让消费者异步使用消息而不等待它完成? 欢迎任何建议。

“这会导致消息队列溢出吗?”

是。 随着队列长度的增加,RabbitMQ将进入“流控制”状态,以防止过多的内存消耗。 它还将开始将消息持久化到磁盘,而不是将它们保存在内存中。

“那么我怎样才能加快消费者吞吐量,以便消费者能够赶上生产者并避免队列中的消息溢出”

你有2个选择:

  1. 添加更多消费者。 请记住,如果选择此选项,您的数据库现在将被多个并发进程操纵。 确保DB能承受额外的压力。
  2. 增加消费渠道的QOS值。 这将从队列中提取更多消息并在消费者上缓冲它们。 这将增加整体处理时间; 如果缓冲了5条消息,则第5条消息将占用消息1 … 5的处理时间。

“我应该在消费者部分使用multithreading来加快消费率吗?”

除非你有一个精心设计的解决方案。 向应用程序添加并行性将在消费者方面增加大量开销。 您最终可能会耗尽ThreadPool或限制内存使用量。

在处理AMQP时,您确实需要考虑每个流程的业务需求,以便设计最佳解决方案。 您收到的消息对时间有多敏感? 它们是否需要持久保存到DB ASAP,或者对您的用户是否重要,无论该数据是否立即可用?

如果不需要立即保留数据,则可以修改应用程序,以便消费者只需从队列中删除消息并将其保存到Redis中的缓存集合中。 引入第二个进程,然后按顺序读取和处理缓存的消息。 这将确保您的队列长度不会充分增长以导致流量控制,同时防止您的数据库被写入请求轰炸,这通常比读取请求更昂贵。 您的消费者现在只是从队列中删除消息,稍后由另一个进程处理。

“那么我怎样才能加快消费者吞吐量,以便消费者能够赶上生产者并避免队列中的消息溢出?” 这就是“使用多个消费者同时使用传入消息”的答案,使用multithreading并行运行这些消费者实现原则并无共享, http://www.eaipatterns.com/CompetingConsumers.html

您有很多方法可以提高性能。

  1. 您可以使用更多生成器创建工作队列,这样您就可以创建一个简单的负载平衡系统。 不要使用exchange —> queue但只使用queue。 阅读这篇文章RabbitMQ Non-Round Robin Dispatching

  2. 当您收到消息时,您可以创建一个poolthread以在数据库中插入数据,但在这种情况下,您必须管理失败。

但我认为主要问题是数据库而不是RabbitMQ。 通过良好的调优,multithreading和工作队列,您可以获得可扩展且快速的解决方案。

让我知道

虽然确实添加了更多的消费者可能会加快速度,但真正的问题将是保存到数据库中。

这里已经有很多答案谈论添加消费者(线程和/或机器)和改变QoS所以我不打算重申这一点。 相反,您应该认真考虑使用聚合器模式将消息聚合成一组消息,然后一次性将组批量插入数据库。

每条消息的当前代码可能会打开一个连接,插入数据,然后关闭该连接(或返回池)。 更糟糕的是它甚至可能使用交易。

通过使用聚合器模式,您可以在刷新之前基本缓冲数据。

现在编写一个好的聚合器很棘手。 您需要决定如何缓冲(即每个工作人员都有自己的缓冲区或像Redis这样的中央缓冲区)。 我相信Spring集成有一个聚合器。

作为答案我建议:两者。

您可以利用多个接收器,以及设置每个接收器以在单独的线程中执行任务,从而允许接收器接受队列中的下一个消息。

当然,这种方法假设每个操作的结果(如果我理解正确的话,在db上写入)不会以任何方式影响后续操作的结果以响应其他消息。