仅通过一个SocketChannel发送多条消息

阅读本教程后: http ://rox-xmlrpc.sourceforge.net/niotut/(它是关于编写非阻塞服务器和客户端,我读NIO部分,滑雪SSL部分),现在我正在尝试重写我自己的客户端,但在尝试编辑客户端代码时我遇到了问题。

首先,我想让你看到教程的客户端代码,它包含2个文件:

  • RspHandler.java:http://rox-xmlrpc.sourceforge.net/niotut/src/RspHandler.java

  • NIOClient.java:http://rox-xmlrpc.sourceforge.net/niotut/src/NioClient.java

但我在main函数中编辑了一下NIOClient.java,以解释我的问题如下:

 import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; import java.util.*; public class NIOClient implements Runnable { // The host:port combination to connect to private InetAddress hostAddress; private int port; // The selector we'll be monitoring private Selector selector; // The buffer into which we'll read data when it's available private ByteBuffer readBuffer = ByteBuffer.allocate(8192); // A list of PendingChange instances private List pendingChanges = new LinkedList(); // Maps a SocketChannel to a list of ByteBuffer instances private Map pendingData = new HashMap(); // Maps a SocketChannel to a RspHandler private Map rspHandlers = Collections.synchronizedMap(new HashMap()); public NIOClient(InetAddress hostAddress, int port) throws IOException { this.hostAddress = hostAddress; this.port = port; this.selector = this.initSelector(); } public void send(byte[] data, RspHandler handler) throws IOException { // Start a new connection SocketChannel socket = this.initiateConnection(); // Register the response handler this.rspHandlers.put(socket, handler); // And queue the data we want written synchronized (this.pendingData) { List queue = (List) this.pendingData.get(socket); if (queue == null) { queue = new ArrayList(); this.pendingData.put(socket, queue); } queue.add(ByteBuffer.wrap(data)); } // Finally, wake up our selecting thread so it can make the required changes this.selector.wakeup(); } public void run() { while (true) { try { // Process any pending changes synchronized (this.pendingChanges) { Iterator changes = this.pendingChanges.iterator(); while (changes.hasNext()) { ChangeRequest change = (ChangeRequest) changes.next(); switch (change.type) { case ChangeRequest.CHANGEOPS: SelectionKey key = change.socket.keyFor(this.selector); key.interestOps(change.ops); break; case ChangeRequest.REGISTER: change.socket.register(this.selector, change.ops); break; } } this.pendingChanges.clear(); } // Wait for an event one of the registered channels this.selector.select(); // Iterate over the set of keys for which events are available Iterator selectedKeys = this.selector.selectedKeys().iterator(); while (selectedKeys.hasNext()) { SelectionKey key = (SelectionKey) selectedKeys.next(); selectedKeys.remove(); if (!key.isValid()) { continue; } // Check what event is available and deal with it if (key.isConnectable()) { this.finishConnection(key); } else if (key.isReadable()) { this.read(key); } else if (key.isWritable()) { this.write(key); } } } catch (Exception e) { e.printStackTrace(); } } } private void read(SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); // Clear out our read buffer so it's ready for new data this.readBuffer.clear(); // Attempt to read off the channel int numRead; try { numRead = socketChannel.read(this.readBuffer); } catch (IOException e) { // The remote forcibly closed the connection, cancel // the selection key and close the channel. key.cancel(); socketChannel.close(); return; } if (numRead == -1) { // Remote entity shut the socket down cleanly. Do the // same from our end and cancel the channel. key.channel().close(); key.cancel(); return; } // Handle the response this.handleResponse(socketChannel, this.readBuffer.array(), numRead); } private void handleResponse(SocketChannel socketChannel, byte[] data, int numRead) throws IOException { // Make a correctly sized copy of the data before handing it // to the client byte[] rspData = new byte[numRead]; System.arraycopy(data, 0, rspData, 0, numRead); // Look up the handler for this channel RspHandler handler = (RspHandler) this.rspHandlers.get(socketChannel); // And pass the response to it if (handler.handleResponse(rspData)) { // The handler has seen enough, close the connection socketChannel.close(); socketChannel.keyFor(this.selector).cancel(); } } private void write(SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); synchronized (this.pendingData) { List queue = (List) this.pendingData.get(socketChannel); // Write until there's not more data ... while (!queue.isEmpty()) { ByteBuffer buf = (ByteBuffer) queue.get(0); socketChannel.write(buf); if (buf.remaining() > 0) { // ... or the socket's buffer fills up break; } queue.remove(0); } if (queue.isEmpty()) { // We wrote away all data, so we're no longer interested // in writing on this socket. Switch back to waiting for // data. key.interestOps(SelectionKey.OP_READ); } } } private void finishConnection(SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); // Finish the connection. If the connection operation failed // this will raise an IOException. try { socketChannel.finishConnect(); } catch (IOException e) { // Cancel the channel's registration with our selector System.out.println(e); key.cancel(); return; } // Register an interest in writing on this channel key.interestOps(SelectionKey.OP_WRITE); } private SocketChannel initiateConnection() throws IOException { // Create a non-blocking socket channel SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); // Kick off connection establishment socketChannel.connect(new InetSocketAddress(this.hostAddress, this.port)); // Queue a channel registration since the caller is not the // selecting thread. As part of the registration we'll register // an interest in connection events. These are raised when a channel // is ready to complete connection establishment. synchronized(this.pendingChanges) { this.pendingChanges.add(new ChangeRequest(socketChannel, ChangeRequest.REGISTER, SelectionKey.OP_CONNECT)); } return socketChannel; } private Selector initSelector() throws IOException { // Create a new selector return SelectorProvider.provider().openSelector(); } public static void main(String[] args) { try { NIOClient client = new NIOClient( InetAddress.getByName("127.0.0.1"), 9090); Thread t = new Thread(client); t.setDaemon(true); t.start(); // 1st client.send("hehe|||".getBytes()); System.out.println("SEND: " + "hehe|||"); handler.waitForResponse(); System.out.println("------------"); // 2nd client.send(("hehe|||" + " 2").getBytes()); System.out.println("SEND: " + "hehe|||" + " 2"); handler.waitForResponse(); } catch (Exception e) { e.printStackTrace(); } } } 

我编辑的客户端只是做一件简单的事情就是向服务器发送消息然后从服务器接收回显的消息。 当然,上面的代码效果很好。 它发送2条消息然后正确接收它们。

但是我在上面的客户端中不想要的东西是: send函数调用这段代码:

  // Start a new connection SocketChannel socket = this.initiateConnection(); 

这意味着每个区分消息都将与每个区分新的SocketChannel相对应,但现在我只想使用一个SocketChannel来发送许多消息,因此我更改了客户端,如下面的代码:

 import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; import java.util.*; public class MyClient implements Runnable { // The host:port combination to connect to private InetAddress hostAddress; private int port; // The selector we'll be monitoring private Selector selector; // The buffer into which we'll read data when it's available private ByteBuffer readBuffer = ByteBuffer.allocate(8); // A list of PendingChange instances private List pendingChanges = new LinkedList(); // Maps a SocketChannel to a list of ByteBuffer instances private Map pendingData = new HashMap(); // Maps a SocketChannel to a RspHandler private Map rspHandlers = Collections.synchronizedMap(new HashMap()); private SocketChannel socket; private static MyResponseHandler handler; public MyClient(InetAddress hostAddress, int port) throws IOException { this.hostAddress = hostAddress; this.port = port; this.selector = this.initSelector(); // Start a new connection socket = this.initiateConnection(); handler = new MyResponseHandler(); // Register the response handler this.rspHandlers.put(socket, handler); } public void send(byte[] data) throws IOException { // And queue the data we want written synchronized (this.pendingData) { List queue = (List) this.pendingData.get(socket); if (queue == null) { queue = new ArrayList(); this.pendingData.put(socket, queue); } queue.add(ByteBuffer.wrap(data)); } // Finally, wake up our selecting thread so it can make the required changes this.selector.wakeup(); } public void run() { while (true) { try { // Process any pending changes synchronized (this.pendingChanges) { Iterator changes = this.pendingChanges.iterator(); while (changes.hasNext()) { ChangeRequest change = (ChangeRequest) changes.next(); switch (change.type) { case ChangeRequest.CHANGEOPS: SelectionKey key = change.socket.keyFor(this.selector); key.interestOps(change.ops); break; case ChangeRequest.REGISTER: change.socket.register(this.selector, change.ops); break; } } this.pendingChanges.clear(); } // Wait for an event one of the registered channels this.selector.select(); // Iterate over the set of keys for which events are available Iterator selectedKeys = this.selector.selectedKeys().iterator(); while (selectedKeys.hasNext()) { SelectionKey key = (SelectionKey) selectedKeys.next(); selectedKeys.remove(); if (!key.isValid()) { continue; } // Check what event is available and deal with it if (key.isConnectable()) { this.finishConnection(key); } else if (key.isReadable()) { this.read(key); } else if (key.isWritable()) { this.write(key); } } } catch (Exception e) { e.printStackTrace(); } } } private void read(SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); // Clear out our read buffer so it's ready for new data this.readBuffer.clear(); // Attempt to read off the channel int numRead; try { numRead = socketChannel.read(this.readBuffer); } catch (IOException e) { // The remote forcibly closed the connection, cancel // the selection key and close the channel. key.cancel(); socketChannel.close(); return; } if (numRead == -1) { // Remote entity shut the socket down cleanly. Do the // same from our end and cancel the channel. key.channel().close(); key.cancel(); return; } // Handle the response this.handleResponse(socketChannel, this.readBuffer.array(), numRead); } private void handleResponse(SocketChannel socketChannel, byte[] data, int numRead) throws IOException { // Make a correctly sized copy of the data before handing it // to the client byte[] rspData = new byte[numRead]; System.arraycopy(data, 0, rspData, 0, numRead); // Look up the handler for this channel MyResponseHandler handler = (MyResponseHandler) this.rspHandlers.get(socketChannel); // And pass the response to it if (handler.handleResponse(rspData)) { // The handler has seen enough, close the connection socketChannel.close(); socketChannel.keyFor(this.selector).cancel(); } } private void write(SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); synchronized (this.pendingData) { List queue = (List) this.pendingData.get(socketChannel); // Write until there's not more data ... while (!queue.isEmpty()) { ByteBuffer buf = (ByteBuffer) queue.remove(0); socketChannel.write(buf); //-- DEBUG -- System.out.println("===>>> socketChannel.write: " + new String(buf.array())); if (buf.remaining() > 0) { // ... or the socket's buffer fills up break; } } if (queue.isEmpty()) { // We wrote away all data, so we're no longer interested // in writing on this socket. Switch back to waiting for // data. key.interestOps(SelectionKey.OP_READ); } } } private void finishConnection(SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); // Finish the connection. If the connection operation failed // this will raise an IOException. try { socketChannel.finishConnect(); } catch (IOException e) { // Cancel the channel's registration with our selector System.out.println(e); key.cancel(); return; } // Register an interest in writing on this channel key.interestOps(SelectionKey.OP_WRITE); } private SocketChannel initiateConnection() throws IOException { // Create a non-blocking socket channel SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); // Kick off connection establishment socketChannel.connect(new InetSocketAddress(this.hostAddress, this.port)); // Queue a channel registration since the caller is not the // selecting thread. As part of the registration we'll register // an interest in connection events. These are raised when a channel // is ready to complete connection establishment. synchronized(this.pendingChanges) { this.pendingChanges.add(new ChangeRequest(socketChannel, ChangeRequest.REGISTER, SelectionKey.OP_CONNECT)); } return socketChannel; } private Selector initSelector() throws IOException { // Create a new selector return SelectorProvider.provider().openSelector(); } public static void main(String[] args) { try { MyClient client = new MyClient( InetAddress.getByName("127.0.0.1"), 9090); Thread t = new Thread(client); t.setDaemon(true); t.start(); // 1st client.send("hehe|||".getBytes()); System.out.println("SEND: " + "hehe|||"); handler.waitForResponse(); System.out.println("------------"); // 2nd client.send(("hehe|||" + " 2").getBytes()); System.out.println("SEND: " + "hehe|||" + " 2"); handler.waitForResponse(); } catch (Exception e) { e.printStackTrace(); } } } 

但是在运行上面的客户端之后,我只看到第一条消息被发送和接收回来,经过调试,我知道第二条消息没有发送,但我不知道为什么以及如何解决这个问题。

谁知道anwser?

感谢您阅读我很长的问题。

你是从错误的地方开始的。 该文章存在许多问题。 所有挂起 – 更改队列的东西都是一个巨大的,不必要的复杂function。 只需wakeup()选择器,如果你需要注册/取消注册另一个线程(虽然为什么你需要这样做对我来说是一个完全的谜),你可以随时改变interestOps同样的方式具有完美的可靠性,甚至如果他正在传播关于不同实施的FUD实现了。

这篇文章还有其他几个问题,表明作者并不真正知道他在说什么。 IOException并不一定意味着“远程强制关闭连接”。 他的finishConnection()方法忽略了返回值,如果为false,则表示连接仍处于挂起状态,因此它会过早地将通道注册到OP_CONNECT阶段之外。 关闭一个通道会取消该键,所以所有那些紧接在close()之前或之后的cancel()调用都是多余的并且可以被删除(尽管有些地方他取消而没有关闭,这也是错误发生的地方)。

进一步:

在刚才介绍的两种方法中,我们请求在套接字通道的选择键上设置OP_CONNECT标志。 如果我们这样做,我们将覆盖OP_CONNECT标志,并且永远不会完成连接。 如果我们合并它们,那么我们就冒着尝试在一个未连接的通道上写入的风险(或者至少不得不处理这种情况)“

这只是完整的A级废话。 设置OP_CONNECT两次,或“然后将它们组合”,无论这意味着什么,都不可能导致您“永远不会完成连接”或“尝试在未连接的通道上写入”。 他似乎认为设置两次清除它。

数据已经排队(或者我们不会首先建立连接)。

一个奇怪而无法解释的假设。

如果你在Oracle论坛迁移后仍能找到它,那么我会很好地看看他引用的“驯服NIO马戏团”主题,而不是那个相当可疑的混乱。 免责声明:我写了一些。