Java:高性能消息传递(单生产者/单一消费者)

我最初在这里问过这个问题,但我意识到我的问题不是关于一个真正的循环。 我想知道的是,在Java中进行高性能异步消息传递的正确方法是什么?

我想做什么……

我有大约10,000名消费者,每个消费者都从他们的私人队列中消费消息。 我有一个线程一个接一个地生成消息并将它们放在正确的消费者队列中。 每个使用者无限循环,检查消息是否出现在队列中并进行处理。

我认为这个术语是“单一生产者/单一消费者”,因为有一个生产者,每个消费者只能在他们的私人队列上工作(多个消费者永远不会从同一个队列中读取)。

在Consumer.java里面:

@Override public void run() { while (true) { Message msg = messageQueue.poll(); if (msg != null) { ... // do something with the message } } } 

Producer正在快速地将消息放入消费者消息队列中(每秒数百万条消息)。 消费者应该尽快处理这些消息!

注意: while (true) { ... }由Producer作为最后一条消息发送的KILL消息终止。

但是,我的问题是关于设计此消息传递的正确方法。 我应该为messageQueue使用什么样的队列? 应该是同步还是异步? 如何设计消息? 我应该使用while-true循环吗? 消费者应该是一个线程,还是其他什么? 10,000个线程会慢慢爬行吗? 什么是线程的替代品?

那么, 在Java中进行高性能消息传递的正确方法是什么?

我会说10,000个线程的上下文切换开销会非常高,更不用说内存开销了。 默认情况下,在32位平台上,每个线程使用256kb的默认堆栈大小,因此只有堆栈的2.5GB。 显然你说的是64位,但即使这样,也有相当多的内存。 由于使用的内存量很大,缓存将会颠覆很多,而cpu将受到内存带宽的限制。

我会寻找一种避免使用这么multithreading来避免分配大量堆栈和上下文切换开销的设计。 您不能同时处理10,000个线程。 当前的硬件通常少于100个核心。

我会为每个硬件线程创建一个队列,并以循环方式分发消息。 如果处理时间变化很大,则有些线程在给予更多工作之前完成处理队列的危险,而其他线程永远不会通过他们分配的工作。 这可以通过使用JSR-166 ForkJoin框架中实现的工作窃取来避免。

由于通信是从发布者到订阅者的一种方式,因此Message不需要任何特殊设计,假设订阅者在发布消息后不更改消息。

编辑:阅读评论,如果你有10,000个符号,然后创建一些通用订阅者线程(每个核心一个订阅者线程),异步收到来自发布者的消息(例如通过他们的消息队列)。 订阅者从队列中提取消息,从消息中检索符号,并在消息处理程序的Map中查找它,检索处理程序,并调用处理程序以同步处理消息。 完成后,它会重复,从队列中获取下一条消息。 如果必须按顺序处理相同符号的消息(这就是为什么我猜你想要10,000个队列。),你需要将符号映射到订阅者。 例如,如果有10个用户,则符号0-999到达用户0,1000-1999到用户1等。更精细的方案是根据它们的频率分布映射符号,使得每个用户获得大致相同的负载。 例如,如果10%的流量是符号0,则订户0将仅处理该一个符号,而其他符号将在其他订户之间分配。

你可以使用这个( 我应该使用Java中的哪个ThreadPool? ):

 class Main { ExecutorService threadPool = Executors.newFixedThreadPool( Runtime.availableProcessors()*2); public static void main(String[] args){ Set consumers = getConsumers(threadPool); for(Consumer consumer : consumers){ threadPool.execute(consumer); } } } 

 class Consumer { private final ExecutorService tp; private final MessageQueue messageQueue; Consumer(ExecutorService tp,MessageQueue queue){ this.tp = tp; this.messageQueue = queue; } @Override public void run(){ Message msg = messageQueue.poll(); if (msg != null) { try{ ... // do something with the message finally{ this.tp.execute(this); } } } } } 

通过这种方式,您可以轻松地进行调度。

首先,除非您选择完整的设计文档或者为自己尝试不同的方法,否则没有一个正确的答案。

我假设您的处理不会是计算密集型的,否则您不会考虑同时处理10000个队列。 一种可能的解决方案是通过每个CPU使用两个线程来最小化上下文切换。 除非您的系统要严格实时处理数据,否则可能会给每个队列带来更大的延迟,但整体吞吐量会更高。

例如 – 让您的生产者线程在自己的CPU上运行,并将批量消息放入消费者线程。 然后,每个消费者线程将消息分发到其N个私有队列,执行处理步骤,接收新的数据批处理等等。 同样,取决于您的延迟容差,因此处理步骤可能意味着处理所有队列,固定数量的队列,除非达到时间阈值,否则它可以包含多个队列。 能够轻松地告知哪个队列属于哪个消费者线程(例如,如果队列按顺序编号:int consumerThreadNum = queueNum&0x03)将是有益的,因为每次在哈希表中查找它们可能很慢。

为了最大限度地减少内存抖动,始终创建/销毁队列可能不是一个好主意,因此您可能希望为每个线程预分配(最大队列数/队列数)队列对象。 当队列完成而不是被销毁时,它可以被清除并重用。 你不希望gc过于频繁和长时间地妨碍你。

另一个未知的是,如果您的生产者为每个队列生成完整的数据集,或者将以块的forms发送数据,直到收到KILL命令。 如果您的生产者发送完整的数据集,您可以完全取消队列概念,并在数据到达消费者线程时处理数据。

拥有相对于硬件和操作系统容量的消费者线程池。 这些消费者线程可以轮询您的消息队列。

我要么让消息知道如何处理它们,或者在初始化时使用消费者线程类注册处理器。

在没有关于处理符号的约束的更多细节的情况下,很难给出非常具体的建议。

你应该看看这篇slashdot文章:

http://developers.slashdot.org/story/10/07/27/1925209/Java-IO-Faster-Than-NIO

它有很多关于multithreading与单选和线程池参数的讨论和实际测量数据。