我该如何处理Java中的multithreading?

我正在研究与Java相关的实际场景;套接字程序。 现有系统和预期系统如下。

现有系统 – 系统检查是否满足某个条件。 如果是这样它将创建一些要发送的消息并将其放入队列中。

队列处理器是一个单独的线程。 它会定期检查队列中是否存在项目。 如果找到任何项目(消息),它只是将消息发送到远程主机(硬编码)并从队列中删除该项目。

预期的系统 – 就是这样的。 在满足特定条件时创建消息,但在每种情况下接收者都不相同。 所以有很多方法。

  1. 将消息放入同一队列但使用其接收者ID。 在这种情况下,第二个线程可以识别接收器,以便可以将消息发送到该接收器。

  2. 有多个线程。 在这种情况下,当满足条件并且接收器处于“新”状态时,它会创建一个新队列并将消息放入该队列。 并且新线程初始化以处理该队列。 如果下一个消息被定向到同一个接收者,它应该放在同一个队列中,如果不是新队列,则应该创建该线程。

现在我想实现第二个,有点卡住了。 我该怎么做? 骨架就足够了,您不必担心如何创建队列等… 🙂

更新:我也认为方法1是最好的方法。 我读了一些关于线程的文章并做出了这个决定。 但是,了解如何实施方法2也是非常值得的。

首先,如果你计划有很多接收器,我不会使用ONE-THREAD-AND-QUEUE-PER-RECEIVER方法。 你可能最终会遇到大量没有做任何事情的线程,而且我可能会伤害到你的性能。 另一种方法是使用工作线程的线程池,只从共享队列中挑选任务,每个任务都有自己的接收器ID,也许还有一个共享字典,其中包含与每个接收器的套接字连接,供工作线程使用。

话虽如此,如果你仍然想要追求你的方法,你可以做的是:

1)创建一个新类来处理新的线程执行:

public class Worker implements Runnable { private Queue myQueue = new Queue(); public void run() { while (true) { string messageToProcess = null; synchronized (myQueue) { if (!myQueue.empty()) { // get your data from queue messageToProcess = myQueue.pop(); } } if (messageToProcess != null) { // do your stuff } Thread.sleep(500); // to avoid spinning } } public void queueMessage(String message) { synchronized(myQueue) { myQueue.add(message); } } } 

2)在主线程上,创建消息并使用字典(哈希表)来查看接收者的线程是否已经创建。 如果是,则只是将新消息排队。 如果没有,请创建一个新线程,将其放入哈希表并对新消息进行排队:

 while (true) { String msg = getNewCreatedMessage(); // you get your messages from here int id = getNewCreatedMessageId(); // you get your rec's id from here Worker w = myHash(id); if (w == null) { // create new Worker thread w = new Worker(); new Thread(w).start(); } w.queueMessage(msg); } 

祝你好运。

编辑:您可以通过使用此方法提到的BlockingQueue Brian来改进此解决方案。

考虑使用Java消息服务(JMS)而不是重新发明轮子?

我可以建议你看看BlockingQueue吗? 您的调度进程可以写入此队列(put),客户端可以采用线程安全方式。 所以你根本不需要编写队列实现。

如果你有一个包含不同消息类型的队列,那么你需要为每个客户端实现一些peek-type机制(即他们必须检查队列的头部并且只采用他们的那些)。 为了有效地工作,消费者必须以及时和健壮的方式提取他们所需的数据。

如果每个消息/消费者类型有一个队列/线程,那么这将更容易/更可靠。

您的客户端实现只需要循环:

 while (!done) { Object item = queue.take(); // process item } 

请注意,队列可以使用generics,并且take()是阻塞的。

当然,在多个消费者收集不同类型的消息时,您可能需要考虑基于空间的体系结构 。 这不具有队列(FIFO)特性,但可以以非常简单的方式为您提供多个消费者。

无论你是否拥有大量的终端机器和偶尔发送的消息,或者有几台终端机器和频繁的消息,你都需要稍微权衡一下。

如果你有很多终端机器,那么每台终端机器上只有一个线程听起来有点过头,除非你真的要不断地向所有这些机器传输消息。 我建议有一个线程池,只会在某些边界之间增长。 为此,您可以使用ThreadPoolExecutor。 当您需要发布消息时,实际上是向执行者提交了一个runnable,它将发送消息:

 Executor msgExec = new ThreadPoolExecutor(...); public void sendMessage(final String machineId, byte[] message) { msgExec.execute(new Runnable() { public void run() { sendMessageNow(machineId, message); } }); } private void sendMessageNow(String machineId, byte[] message) { // open connection to machine and send message, thinking // about the case of two simultaneous messages to a machine, // and whether you want to cache connections. } 

如果您只有几台终端机器,那么每台机器可能有一个BlockingQueue,并且每个阻塞队列的线程都在等待下一条消息。 在这种情况下,模式更像是这样(谨防未经测试的头顶星期日早上代码):

 ConcurrentHashMap queuePerMachine; public void sendMessage(String machineId, byte[] message) { BockingQueue q = queuePerMachine.get(machineId); if (q == null) { q = new BockingQueue(); BockingQueue prev = queuePerMachine.putIfAbsent(machineId, q); if (prev != null) { q = prev; } else { (new QueueProessor(q)).start(); } } q.put(new Message(message)); } private class QueueProessor extends Thread { private final BockingQueue q; QueueProessor(BockingQueue q) { this.q = q; } public void run() { Socket s = null; for (;;) { boolean needTimeOut = (s != null); Message m = needTimeOut ? q.poll(60000, TimeUnit.MILLISECOND) : q.take(); if (m == null) { if (s != null) // close s and null } else { if (s == null) { // open s } // send message down s } } // add appropriate error handling and finally } } 

在这种情况下,如果该机器的消息在60秒内没有到达,我们将关闭连接。

你应该使用JMS吗? 好吧,你必须权衡这对你来说是否复杂。 我个人的感觉是,保证一个特殊的框架并不是一项复杂的任务。 但我确信意见不同。

PS实际上,现在我看一下这个,你可能把队列放在线程对象里面,只是映射机器ID – >线程对象。 无论如何,你明白了。

您可以尝试使用SomnifugiJMS ,这是一个使用java.util.concurrent作为各种实际“引擎”的in-vm JMS实现。

它可能对你的目的来说有些过分,但很可能使你的应用程序几乎没有其他编程(如果适用)分发,只需插入一个不同的JMS实现,如ActiveMQ ,你就完成了。