在执行器服务RabbitMQ中只有一个线程同时运行

我已经创建了一个具有20个核心的指定线程池的连接。

ConnectionFactory factory = new ConnectionFactory(); .... //specified es ExecutorService consumerExecutor = Executors.newFixedThreadPool(threadNum, threadFactory); con = factory.newConnection(consumerExecutor, addresses); 

然后从此连接创建一个频道:

  final Channel channel = connection.createChannel(); 

并使用它来创建DefaultConsumer。

虽然我发现尽管线程可以用来消费消息,但总是只有一个线程消耗消息,即使消息在服务器中大量累积。

我查看源代码并找到:

 private final class WorkPoolRunnable implements Runnable { @Override public void run() { int size = MAX_RUNNABLE_BLOCK_SIZE; List block = new ArrayList(size); try { Channel key = ConsumerWorkService.this.workPool.nextWorkBlock(block, size); if (key == null) return; // nothing ready to run try { for (Runnable runnable : block) { runnable.run(); } } finally { if (ConsumerWorkService.this.workPool.finishWorkBlock(key)) { ConsumerWorkService.this.executor.execute(new WorkPoolRunnable()); } } } catch (RuntimeException e) { Thread.currentThread().interrupt(); } } } /* Basic work selector and state transition step */ private K readyToInProgress() { K key = this.ready.poll(); if (key != null) { this.inProgress.add(key); } return key; } /** * Return the next ready client, * and transfer a collection of that client's items to process. * Mark client in progress. * If there is no ready client, return null. * @param to collection object in which to transfer items * @param size max number of items to transfer * @return key of client to whom items belong, or null if there is none. */ public K nextWorkBlock(Collection to, int size) { synchronized (this) { K nextKey = readyToInProgress(); if (nextKey != null) { VariableLinkedBlockingQueue queue = this.pool.get(nextKey); drainTo(queue, to, size); } return nextKey; } } 

技巧应该在ConsumerWorkService.this.workPool.nextWorkBlock ,它从就绪队列轮询通道,并在运行回调run()后添加到完成块中的读取队列。 如果我错了,请纠正我。

这是令人困惑的,因为消费者绑定到一个通道,并且在最后一个任务块完成之前通道不会释放到队列,这意味着线程池始终只为该消费者提供一个线程。

问题:

  1. 为什么RabbitMQ设计这个模型
  2. 我们如何优化此问题
  3. 是否可以将任务提交到handleDelivery的独立线程池以使用消息以及ack(仅在任务完成后确保消息确认)

> 1.为什么RabbitMQ设计这个模型

我想知道自己的原因。 但这一事实清楚地反映在他们的文件中 :

每个Channel都有自己的调度线程。 对于每个渠道一个消费者的最常见用例,这意味着消费者不会阻止其他消费者。 如果每个频道有多个消费者,请注意长时间运行的消费者可能会阻止向该频道上的其他消费者发送回调。

> 2.我们如何优化此问题

您可以通过将实际工作提交到另一个线程池来拥有多个通道或从处理中分离消息消耗 。 您可以在本文中找到更多详细信息。

> 3.将任务提交到handleDelivery中的独立线程池以使用消息和ack(仅在任务完成后确保消息确认)是否合适

从文档引用:

使用手动确认时,重要的是要考虑确认的线程。 如果它与接收传递的线程不同(例如,Consumer#handleDelivery委托传递处理到另一个线程),则将多个参数设置为true进行确认是不安全的,并且将导致双重确认,因此会导致通道级协议exception关闭频道。 一次确认一条消息可能是安全的。