使用java nio socket时,“已建立的连接被主机中的软件中止”

我使用java nio套接字开发了一个java服务器。 这是我的应用程序的代码:

 public class EchoServer { static final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(Main.class); private static final int BUFFER_SIZE = 1024; private final static int DEFAULT_PORT = 4664; private InetAddress hostAddress = null; private int port; private String ipAddress = "my ip"; private Selector selector; // The buffer into which we'll read data when it's available private ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE); int timestamp = 1; HashMap connectedClients = new HashMap(); HashMap clientIds= new HashMap(); HashMap messageToClients = new HashMap(); public EchoServer() { this(DEFAULT_PORT); } public EchoServer(int port) { try{ this.port = port; hostAddress = InetAddress.getByName(ipAddress); selector = initSelector(); loop(); }catch(Exception ex){ logger.error("Exception Accoured:",ex); } } private Selector initSelector() { try{ Selector socketSelector = SelectorProvider.provider().openSelector(); ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); InetSocketAddress isa = new InetSocketAddress(hostAddress, port); serverChannel.socket().bind(isa); serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT); return socketSelector; }catch(Exception ex){ logger.error("Exception Accoured:",ex); return null; } } private void loop() { while (true) { try { // Do defined operations for clients // ------------------------------ selector.select(); Iterator selectedKeys = selector.selectedKeys() .iterator(); while (selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selectedKeys.remove(); if (!key.isValid()) { logger.warn(key.hashCode() + "- is invalid"); continue; } // Check what event is available and deal with it if (key.isAcceptable()) { accept(key); } else if (key.isReadable()) { read(key); } else if (key.isWritable()) { write(key); } } // Fetch List from server // ----------------------------------------- try { ResultSet resultset = DataBase.getInstance() .getQueryResult(); boolean flag = false; while (resultset.next()) { String mobileNumber = resultset.getString("MobileNo"); String message = resultset.getInt("IsMessage") + "," + resultset.getInt("IsDeliver") + "," + resultset.getInt("IsGroup") + "," + resultset.getInt("IsSeen"); messageToClients.put(mobileNumber, message); } } catch (Exception ex) { //ex.printStackTrace(); logger.error("Exception Accoured:",ex); } // Wait for 1 second // ----------------------------------------------- Thread.sleep(1000); timestamp++; } catch (Exception e) { e.printStackTrace(); System.exit(1); } } } private void accept(SelectionKey key) { try{ // Initialize the connection ------------------------------------------ ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key .channel(); SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); logger.info("New client accepted"); // Fire read for reading phone number -------------------------------- socketChannel.register(selector, SelectionKey.OP_READ); }catch(Exception ex){ logger.error("Exception Accoured:",ex); } } private void read(SelectionKey key) { try{ // Initialize Socket ----------------------------------------------------- SocketChannel socketChannel = (SocketChannel) key.channel(); // Reading Client Number ------------------------------------------------- readBuffer.clear(); int numRead; try { numRead = socketChannel.read(readBuffer); } catch (IOException e) { logger.error("Forceful shutdown"); key.cancel(); return; } // read was not successful if (numRead == -1) { logger.error("Graceful shutdown"); key.cancel(); return; } // read was successful and now we can write it to String readBuffer.flip(); byte[] bytes = new byte[readBuffer.limit()]; readBuffer.get(bytes); String number = new String(bytes); number = number.replace("\r\n", ""); number = number.trim(); // Update Connect Clients Status ----------------------------------------- Integer clientId=clientIds.get(number); if ( clientId == null) { connectedClients.put(key.hashCode(), number); clientIds.put(number, key.hashCode()); logger.error(number + "- (" + key.hashCode() + ") has Connected"); }else{ connectedClients.remove(clientId); connectedClients.put(key.hashCode(), number); clientIds.put(number, key.hashCode()); logger.error(number + "- (" + key.hashCode() + ") REconnected"); } //System.err.println("All clients number are:" + connectedClients.size()); logger.error("All clients number are:" + connectedClients.size()); // Fire Write Operations ------------------------------------------------- socketChannel.register(selector, SelectionKey.OP_WRITE); }catch(Exception ex){ //ex.printStackTrace(); logger.error("Exception Accoured:",ex); } } private void write(SelectionKey key) { try { //Check channel still alive ---------------------------------------------- String clientNumber = connectedClients.get(key.hashCode()); if(clientNumber == null){ key.cancel(); return; } // Get Channel ----------------------------------------------------------- SocketChannel socketChannel = (SocketChannel) key.channel(); // Send Message if client number have new message ------------------------ if (messageToClients.get(clientNumber) != null) { logger.info(clientNumber + "-" + key.hashCode() + "- Sent write message"); String timeStamp = String.valueOf(timestamp); String message = messageToClients.get(clientNumber); ByteBuffer dummyResponse = ByteBuffer.wrap((message + "\r\n").getBytes("UTF-8")); socketChannel.write(dummyResponse); messageToClients.remove(clientNumber); } // Fire new write state -------------------------------------------------- socketChannel.register(selector, SelectionKey.OP_WRITE); } catch (IOException iox) { logger.error("Exception Accoured:",iox); String number = connectedClients.get(key.hashCode()); clientIds.remove(number); connectedClients.remove(key.hashCode()); key.cancel(); } catch(Exception ex){ logger.error("Exception Accoured:",ex); } } 

}

当我使用2-3个客户端进行测试时,它工作正常,但是当我开始使用大约100-300客户端进行测试时,我多次在exception下进行了测试(实际上它正在write()方法和line socketChannel.write(dummyResponse);

 java.io.IOException: An established connection was aborted by the software in your host machine at sun.nio.ch.SocketDispatcher.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(Unknown Source) at sun.nio.ch.IOUtil.writeFromNativeBuffer(Unknown Source) at sun.nio.ch.IOUtil.write(Unknown Source) at sun.nio.ch.SocketChannelImpl.write(Unknown Source) at net.behboodi.testserver.EchoServer.write(EchoServer.java:274) at net.behboodi.testserver.EchoServer.loop(EchoServer.java:106) at net.behboodi.testserver.EchoServer.(EchoServer.java:56) at net.behboodi.testserver.EchoServer.(EchoServer.java:47) at net.behboodi.testserver.Main.main(Main.java:44) 

然后我无法从服务器接收消息。

当你得到流结束时( read()返回-1)你没有关闭频道,所以你正在泄漏频道。 您需要关闭频道,并注意这样做会自动取消该密钥。 仅仅取消密钥是不够的。 取消密钥时相同,因为clientNumber == null或者您收到IOException

注意,你不应该注册OP_WRITE除非你刚刚从write()调用中收到零,并且你应该在没有得到零的情况下取消注册。