使用线程处理套接字

我正在开发一个基本上是聊天室的java程序。 这是一个课程的分配,所以没有代码请,我只是有一些问题确定最可行的方式来处理我需要做的事情。 我已经为单个客户端设置了一个服务器程序,使用线程获取数据输入流和一个线程来处理数据输出流上的发送。 我现在需要做的是为每个传入请求创建一个新线程。

我的想法是创建一个链表来包含客户端套接字,或者可能包含线程。 我磕磕绊绊的地方是弄清楚如何处理将消息发送给所有客户。 如果我为每个传入消息都有一个线程,那么我该如何转身并将其发送到每个客户端套接字。

我想如果我有一个客户端套件的链表,那么我可以遍历列表并将其发送给每个,但是每次我都必须创建一个dataoutputstream。 我可以创建dataoutputstream的链接列表吗? 对不起,如果它听起来像我在漫无目的,但我不想只是开始编码,如果没有一个好的计划,它可能会变得混乱。 谢谢!

编辑我决定发布我到目前为止的代码。 我还没有机会测试它,所以任何评论都会很棒。 谢谢!

import java.io.BufferedReader; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStreamReader; import java.net.Socket; import java.net.ServerSocket; import java.util.LinkedList; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class prog4_server { // A Queue of Strings used to hold out bound Messages // It blocks till on is available static BlockingQueue outboundMessages = new LinkedBlockingQueue(); // A linked list of data output streams // to all the clients static LinkedList outputstreams; // public variables to track the number of clients // and the state of the server static Boolean serverstate = true; static int clients = 0; public static void main(String[] args) throws IOException{ //create a server socket and a clientSocket ServerSocket serverSocket = null; try { serverSocket = new ServerSocket(6789); } catch (IOException e) { System.out.println("Could not listen on port: 6789"); System.exit(-1); }// try{...}catch(IOException e){...} Socket clientSocket; // start the output thread which waits for elements // in the message queue OutputThread out = new OutputThread(); out.start(); while(serverstate){ try { // wait and accept a new client // pass the socket to a new Input Thread clientSocket = serverSocket.accept(); DataOutputStream ServerOut = new DataOutputStream(clientSocket.getOutputStream()); InputThread in = new InputThread(clientSocket, clients); in.start(); outputstreams.add(ServerOut); } catch (IOException e) { System.out.println("Accept failed: 6789"); System.exit(-1); }// try{...}catch{..} // increment the number of clients and report clients = clients++; System.out.println("Client #" + clients + "Accepted"); }//while(serverstate){... }//public static void main public static class OutputThread extends Thread { //OutputThread Class Constructor OutputThread() { }//OutputThread(...){... public void run() { //string variable to contain the message String msg = null; while(!this.interrupted()) { try { msg = outboundMessages.take(); for(int i=0;i<outputstreams.size();i++){ outputstreams.get(i).writeBytes(msg + '\n'); }// for(...){... } catch (IOException e) { System.out.println(e); } catch (InterruptedException e){ System.out.println(e); }//try{...}catch{...} }//while(...){ }//public void run(){... }// public OutputThread(){... public static class InputThread extends Thread { Boolean threadstate = true; BufferedReader ServerIn; String user; int threadID; //SocketThread Class Constructor InputThread(Socket clientSocket, int ID) { threadID = ID; try{ ServerIn = new BufferedReader( new InputStreamReader(clientSocket.getInputStream())); user = ServerIn.readLine(); } catch(IOException e){ System.out.println(e); } }// InputThread(...){... public void run() { String msg = null; while (threadstate) { try { msg = ServerIn.readLine(); if(msg.equals("EXITEXIT")){ // if the client is exiting close the thread // close the output stream with the same ID // and decrement the number of clients threadstate = false; outputstreams.get(threadID).close(); outputstreams.remove(threadID); clients = clients--; if(clients == 0){ // if the number of clients has dropped to zero // close the server serverstate = false; ServerIn.close(); }// if(clients == 0){... }else{ // add a message to the message queue outboundMessages.add(user + ": " + msg); }//if..else... } catch (IOException e) { System.out.println(e); }// try { ... } catch { ...} }// while }// public void run() { ... } public static class ServerThread extends Thread { //public variable declaration BufferedReader UserIn = new BufferedReader(new InputStreamReader(System.in)); //OutputThread Class Constructor ServerThread() { }//OutputThread(...){... public void run() { //string variable to contain the message String msg = null; try { //while loop will continue until //exit command is received //then send the exit command to all clients msg = UserIn.readLine(); while (!msg.equals("EXITEXIT")) { System.out.println("Enter Message: "); msg = UserIn.readLine(); }//while(...){ outboundMessages.add(msg); serverstate = false; UserIn.close(); } catch (IOException e) { System.out.println(e); }//try{...}catch{...} }//public void run(){... }// public serverThread(){... }// public class prog4_server 

我过去通过为每个客户端连接定义一个“ MessageHandler ”类来解决这个问题,负责入站/出站消息流量。 在内部,处理程序使用BlockingQueue实现(在其上放置出站消息)(由内部工作线程)。 I / O发送方线程不断尝试从队列中读取(如果需要,则阻塞)并将检索到的每条消息发送给客户端。

这是一些骨架示例代码(未经测试):

 /** * Our Message definition. A message is capable of writing itself to * a DataOutputStream. */ public interface Message { void writeTo(DataOutputStream daos) throws IOException; } /** * Handler definition. The handler contains two threads: One for sending * and one for receiving messages. It is initialised with an open socket. */ public class MessageHandler { private final DataOutputStream daos; private final DataInputStream dais; private final Thread sender; private final Thread receiver; private final BlockingQueue outboundMessages = new LinkedBlockingQueue(); public MessageHandler(Socket skt) throws IOException { this.daos = new DataOutputStream(skt.getOutputStream()); this.dais = new DataInputStream(skt.getInputStream()); // Create sender and receiver threads responsible for performing the I/O. this.sender = new Thread(new Runnable() { public void run() { while (!Thread.interrupted()) { Message msg = outboundMessages.take(); // Will block until a message is available. try { msg.writeTo(daos); } catch(IOException ex) { // TODO: Handle exception } } } }, String.format("SenderThread-%s", skt.getRemoteSocketAddress())); this.receiver = new Thread(new Runnable() { public void run() { // TODO: Read from DataInputStream and create inbound message. } }, String.format("ReceiverThread-%s", skt.getRemoteSocketAddress())); sender.start(); receiver.start(); } /** * Submits a message to the outbound queue, ready for sending. */ public void sendOutboundMessage(Message msg) { outboundMessages.add(msg); } public void destroy() { // TODO: Interrupt and join with threads. Close streams and socket. } } 

请注意,Nikolai的正确之处在于,每个连接使用1(或2)个线程的阻塞I / O不是可扩展的解决方案,通常可以使用Java NIO编写应用程序以解决此问题。 但是,实际上,除非您正在编写成千上万个客户端同时连接的企业服务器,否则这不是真正的问题。 使用Java NIO编写无错误的可伸缩应用程序很困难 ,当然也不是我推荐的。