java.nio选择器和SocketChannel用于继续流式传输

我目前正在为一个应用程序使用java.nio.channel.Selectors和SocketChannels,该应用程序将打开1对多连接以继续流式传输到服务器。 我的应用程序有三个线程:StreamWriteWorker – 对SocketChannel执行写操作,StreamReadWorker – 从缓冲区读取字节并解析内容,StreamTaskDispatcher – 执行Selector对readyOps的选择,并为工作线程调度新的runnable。

问题 – 选择器的选择方法上的调用仅在第一次调用时返回值> 0(有效的readyOps); 我能够在所有就绪通道上执行一次写入和发送数据,但是Selector选择方法的以下所有调用都返回0。

问题:每次读/写后我是否需要在SocketChannel上调用close(我希望不是!)? 如果不是什么原因可能导致SocketChannel无法用于任何读/写操作?

对不起,我不能发布代码,但我希望我已经清楚地解释了问题,以便有人帮忙。 我已经搜索了答案,我看到你关闭后不能重用SocketChannel连接,但是我的频道不应该关闭,服务器永远不会收到EOF流结果。

我取得了一些进展,并发现由于json解析错误,服务器应用程序上没有发生写操作。 所以现在我的客户端应用程序代码上的SocketChannel在处理读取操作后就可以进行另一次写操作了。 我想这是SocketChannels的TCP特性。 但是,SocketChannel不可用于服务器应用程序端的另一个读取操作。 这是SocketChannels的正常行为吗? 在读取操作之后是否需要在客户端关闭连接并建立新连接?

这是我想要做的代码示例:

package org.stream.socket; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.nio.charset.CodingErrorAction; import java.util.HashMap; import java.util.Iterator; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.lang3.RandomStringUtils; import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonPrimitive; import com.google.gson.stream.JsonToken; public class ClientServerTest { private LinkedBlockingQueue dataQueue = new LinkedBlockingQueue(); private ExecutorService executor = Executors.newFixedThreadPool(1); private HashMap uuidToSize = new HashMap(); private class StreamWriteTask implements Runnable { private ByteBuffer buffer; private SelectionKey key; private Selector selector; private StreamWriteTask(ByteBuffer buffer, SelectionKey key, Selector selector) { this.buffer = buffer; this.key = key; this.selector = selector; } @Override public void run() { SocketChannel sc = (SocketChannel) key.channel(); byte[] data = (byte[]) key.attachment(); buffer.clear(); buffer.put(data); buffer.flip(); int results = 0; while (buffer.hasRemaining()) { try { results = sc.write(buffer); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } if (results == 0) { buffer.compact(); buffer.flip(); data = new byte[buffer.remaining()]; buffer.get(data); key.interestOps(SelectionKey.OP_WRITE); key.attach(data); selector.wakeup(); return; } } key.interestOps(SelectionKey.OP_READ); key.attach(null); selector.wakeup(); } } private class StreamReadTask implements Runnable { private ByteBuffer buffer; private SelectionKey key; private Selector selector; private StreamReadTask(ByteBuffer buffer, SelectionKey key, Selector selector) { this.buffer = buffer; this.key = key; this.selector = selector; } private boolean checkUUID(byte[] data) { return uuidToSize.containsKey(new String(data)); } @Override public void run() { SocketChannel sc = (SocketChannel) key.channel(); buffer.clear(); byte[] data = (byte[]) key.attachment(); if (data != null) { buffer.put(data); } int count = 0; int readAttempts = 0; try { while ((count = sc.read(buffer)) > 0) { readAttempts++; } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } if (count == 0) { buffer.flip(); data = new byte[buffer.limit()]; buffer.get(data); if (checkUUID(data)) { key.interestOps(SelectionKey.OP_READ); key.attach(data); } else { System.out.println("Clinet Read - uuid ~~~~ " + new String(data)); key.interestOps(SelectionKey.OP_WRITE); key.attach(null); } } if (count == -1) { try { sc.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } selector.wakeup(); } } private class ClientWorker implements Runnable { @Override public void run() { try { Selector selector = Selector.open(); SocketChannel sc = SocketChannel.open(); sc.configureBlocking(false); sc.connect(new InetSocketAddress("127.0.0.1", 9001)); sc.register(selector, SelectionKey.OP_CONNECT); ByteBuffer buffer = ByteBuffer.allocateDirect(65535); while (selector.isOpen()) { int count = selector.select(10); if (count == 0) { continue; } Iterator it = selector.selectedKeys().iterator(); while (it.hasNext()) { final SelectionKey key = it.next(); it.remove(); if (!key.isValid()) { continue; } if (key.isConnectable()) { sc = (SocketChannel) key.channel(); if (!sc.finishConnect()) { continue; } sc.register(selector, SelectionKey.OP_WRITE); } if (key.isReadable()) { key.interestOps(0); executor.execute(new StreamReadTask(buffer, key, selector)); } if (key.isWritable()) { key.interestOps(0); if(key.attachment() == null){ key.attach(dataQueue.take()); } executor.execute(new StreamWriteTask(buffer, key, selector)); } } } } catch (IOException ex) { // Handle Exception }catch(InterruptedException ex){ } } } private class ServerWorker implements Runnable { @Override public void run() { try { Selector selector = Selector.open(); ServerSocketChannel ssc = ServerSocketChannel.open(); ServerSocket socket = ssc.socket(); socket.bind(new InetSocketAddress(9001)); ssc.configureBlocking(false); ssc.register(selector, SelectionKey.OP_ACCEPT); ByteBuffer buffer = ByteBuffer.allocateDirect(65535); DataHandler handler = new DataHandler(); while (selector.isOpen()) { int count = selector.select(10); if (count == 0) { continue; } Iterator it = selector.selectedKeys().iterator(); while (it.hasNext()) { final SelectionKey key = it.next(); it.remove(); if (!key.isValid()) { continue; } if (key.isAcceptable()) { ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_READ); } if (key.isReadable()) { handler.readSocket(buffer, key); } if (key.isWritable()) { handler.writeToSocket(buffer, key); } } } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } private class DataHandler { private JsonObject parseData(StringBuilder builder) { if (!builder.toString().endsWith("}")) { return null; } JsonParser parser = new JsonParser(); JsonObject obj = (JsonObject) parser.parse(builder.toString()); return obj; } private void readSocket(ByteBuffer buffer, SelectionKey key) throws IOException { SocketChannel sc = (SocketChannel) key.channel(); buffer.clear(); int count = Integer.MAX_VALUE; int readAttempts = 0; try { while ((count = sc.read(buffer)) > 0) { readAttempts++; } } catch (IOException e) { e.printStackTrace(); } if (count == 0) { buffer.flip(); StringBuilder builder = key.attachment() instanceof StringBuilder ? (StringBuilder) key .attachment() : new StringBuilder(); Charset charset = Charset.forName("UTF-8"); CharsetDecoder decoder = charset.newDecoder(); decoder.onMalformedInput(CodingErrorAction.IGNORE); System.out.println(buffer); CharBuffer charBuffer = decoder.decode(buffer); String content = charBuffer.toString(); charBuffer = null; builder.append(content); System.out.println(content); JsonObject obj = parseData(builder); if (obj == null) { key.attach(builder); key.interestOps(SelectionKey.OP_READ); } else { System.out.println("data ~~~~~~~ " + builder.toString()); JsonPrimitive uuid = obj.get("uuid").getAsJsonPrimitive(); key.attach(uuid.toString().getBytes()); key.interestOps(SelectionKey.OP_WRITE); } } if (count == -1) { key.attach(null); sc.close(); } } private void writeToSocket(ByteBuffer buffer, SelectionKey key) throws IOException { SocketChannel sc = (SocketChannel) key.channel(); byte[] data = (byte[]) key.attachment(); buffer.clear(); buffer.put(data); buffer.flip(); int writeAttempts = 0; while (buffer.hasRemaining()) { int results = sc.write(buffer); writeAttempts++; System.out.println("Write Attempt #" + writeAttempts); if (results == 0) { buffer.compact(); buffer.flip(); data = new byte[buffer.remaining()]; buffer.get(data); key.attach(data); key.interestOps(SelectionKey.OP_WRITE); break; } } key.interestOps(SelectionKey.OP_READ); key.attach(null); } } public ClientServerTest() { for (int index = 0; index < 1000; index++) { JsonObject obj = new JsonObject(); String uuid = UUID.randomUUID().toString(); uuidToSize.put(uuid, uuid.length()); obj.addProperty("uuid", uuid); String data = RandomStringUtils.randomAlphanumeric(10000); obj.addProperty("event", data); dataQueue.add(obj.toString().getBytes()); } Thread serverWorker = new Thread(new ServerWorker()); serverWorker.start(); Thread clientWorker = new Thread(new ClientWorker()); clientWorker.start(); } /** * @param args */ public static void main(String[] args) { ClientServerTest test = new ClientServerTest(); for(;;){ } } } 

  1. 处理OP_CONNECT的正确方法是尝试一次finishConnect() ,如果成功取消注册OP_CONNECT并注册OP_READOP_WRITE ,则可能是后者,因为您是客户端。 在非阻塞模式下循环和睡眠没有意义。 如果finishConnect()返回false,则OP_CONNECT将再次触发。

  2. 你对!key.isAcceptable() !key.isReadable()!key.isWriteable()绝对没有任何意义。 如果密钥可以接受,请调用accept() 。 如果它是可读的,请调用read() 。 如果它是可写的,请调用write() 。 就这么简单。

  3. 您需要注意,通道几乎总是可写的,除了它们的套接字发送缓冲区已满的短暂时期。 所以只有在你有东西要写的时候才能注册OP_WRITE ,或者你尝试写入并获得零回报更好。 然后当OP_WRITE触发时,重试写入并取消注册OP_WRITE除非你得到另一个零。

  4. 你的ByteBuffer太经济了。 实际上, 每个频道需要一个。 您可以将其保存为密钥附件,以便在需要时将其恢复。 否则,您没有任何方法可以累积部分读取,这些读取肯定会发生,或者任何重试写入的方式。