Java – 使用nio的ReadObject

在传统的阻塞线程服务器中,我会做这样的事情

class ServerSideThread { ObjectInputStream in; ObjectOutputStream out; Engine engine; public ServerSideThread(Socket socket, Engine engine) { in = new ObjectInputStream(socket.getInputStream()); out = new ObjectOutputStream(socket.getOutputStream()); this.engine = engine; } public void sendMessage(Message m) { out.writeObject(m); } public void run() { while(true) { Message m = (Message)in.readObject(); engine.queueMessage(m,this); // give the engine a message with this as a callback } } } 

现在,可以预期该对象非常大。 在我的nio循环中,我不能简单地等待对象通过,所有其他连接(具有更小的工作负载)将等待我。

在告诉我的nio频道之前,我怎么才能通知连接有整个对象?

您可以将对象写入ByteArrayOutputStream,从而允许您在发送对象之前给出长度。 在接收方,在尝试解码之前读取所需的数据量。

但是,您可能会发现使用Object * Stream阻塞IO(而不是NIO)更简单,更有效


编辑这样的东西

 public static void send(SocketChannel socket, Serializable serializable) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); for(int i=0;i<4;i++) baos.write(0); ObjectOutputStream oos = new ObjectOutputStream(baos); oos.writeObject(serializable); oos.close(); final ByteBuffer wrap = ByteBuffer.wrap(baos.toByteArray()); wrap.putInt(0, baos.size()-4); socket.write(wrap); } private final ByteBuffer lengthByteBuffer = ByteBuffer.wrap(new byte[4]); private ByteBuffer dataByteBuffer = null; private boolean readLength = true; public Serializable recv(SocketChannel socket) throws IOException, ClassNotFoundException { if (readLength) { socket.read(lengthByteBuffer); if (lengthByteBuffer.remaining() == 0) { readLength = false; dataByteBuffer = ByteBuffer.allocate(lengthByteBuffer.getInt(0)); lengthByteBuffer.clear(); } } else { socket.read(dataByteBuffer); if (dataByteBuffer.remaining() == 0) { ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(dataByteBuffer.array())); final Serializable ret = (Serializable) ois.readObject(); // clean up dataByteBuffer = null; readLength = true; return ret; } } return null; } 

受上面代码的启发,我创建了一个( GoogleCode项目 )

它包括一个简单的unit testing:

 SeriServer server = new SeriServer(6001, nthreads); final SeriClient client[] = new SeriClient[nclients]; //write the data with multiple threads to flood the server for (int cnt = 0; cnt < nclients; cnt++) { final int counterVal = cnt; client[cnt] = new SeriClient("localhost", 6001); Thread t = new Thread(new Runnable() { public void run() { try { for (int cnt2 = 0; cnt2 < nsends; cnt2++) { String msg = "[" + counterVal + "]"; client[counterVal].send(msg); } } catch (IOException e) { e.printStackTrace(); fail(); } } }); t.start(); } HashMap counts = new HashMap(); int nullCounts = 0; for (int cnt = 0; cnt < nsends * nclients;) { //read the data from a vector (that the server pool automatically fills SeriDataPackage data = server.read(); if (data == null) { nullCounts++; System.out.println("NULL"); continue; } if (counts.containsKey(data.getObject())) { Integer c = counts.get(data.getObject()); counts.put((String) data.getObject(), c + 1); } else { counts.put((String) data.getObject(), 1); } cnt++; System.out.println("Received: " + data.getObject()); } // asserts the results Collection values = counts.values(); for (Integer value : values) { int ivalue = value; assertEquals(nsends, ivalue); System.out.println(value); } assertEquals(counts.size(), nclients); System.out.println(counts.size()); System.out.println("Finishing"); server.shutdown();