Java NIO客户端

import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.*; public class EchoServer { private InetAddress addr; private int port; private Selector selector; private Map<SocketChannel,List> dataMap; public EchoServer(InetAddress addr, int port) throws IOException { this.addr = addr; this.port = port; dataMap = new HashMap<SocketChannel,List>(); startServer(); } private void startServer() throws IOException { // create selector and channel this.selector = Selector.open(); ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); // bind to port InetSocketAddress listenAddr = new InetSocketAddress(this.addr, this.port); serverChannel.socket().bind(listenAddr); serverChannel.register(this.selector, SelectionKey.OP_ACCEPT); log("Echo server ready. Ctrl-C to stop."); // processing while (true) { // wait for events this.selector.select(); // wakeup to work on selected keys Iterator keys = this.selector.selectedKeys().iterator(); while (keys.hasNext()) { SelectionKey key = (SelectionKey) keys.next(); // this is necessary to prevent the same key from coming up // again the next time around. keys.remove(); if (! key.isValid()) { continue; } if (key.isAcceptable()) { this.accept(key); } else if (key.isReadable()) { this.read(key); } else if (key.isWritable()) { this.write(key); } else if (key.isConnectable()) { this.doConnect(key); } } } } private void doConnect(SelectionKey key) { SocketChannel channel = (SocketChannel) key.channel(); if (channel.finishConnect()) { /* success */ System.out.println("Connected"); } else { /* failure */ System.out.println("failure"); } } public void connect(String hostname, int port) throws IOException { SocketChannel clientChannel = SocketChannel.open(); clientChannel.configureBlocking(false); clientChannel.connect(new InetSocketAddress(hostname,port)); clientChannel.register(selector,SelectionKey.OP_CONNECT); clientChannel.write(ByteBuffer.wrap(("$Hello "+UserInfo[0]+"|").getBytes("US-ASCII"))); } private void accept(SelectionKey key) throws IOException { ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); SocketChannel channel = serverChannel.accept(); channel.configureBlocking(false); // write welcome message channel.write(ByteBuffer.wrap("Welcome, this is the echo server\r\n".getBytes("US-ASCII"))); Socket socket = channel.socket(); SocketAddress remoteAddr = socket.getRemoteSocketAddress(); log("Connected to: " + remoteAddr); dataMap.put(channel, new ArrayList()); // register channel with selector for further IO channel.register(this.selector, SelectionKey.OP_READ); } private void read(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(8192); int numRead = -1; try { numRead = channel.read(buffer); } catch (IOException e) { e.printStackTrace(); } if (numRead == -1) { this.dataMap.remove(channel); Socket socket = channel.socket(); SocketAddress remoteAddr = socket.getRemoteSocketAddress(); log("Connection closed by client: " + remoteAddr); channel.close(); key.cancel(); return; } byte[] data = new byte[numRead]; System.arraycopy(buffer.array(), 0, data, 0, numRead); log("Got: " + new String(data, "US-ASCII")); doEcho(key, data); // write back to client } private void write(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); List pendingData = this.dataMap.get(channel); Iterator items = pendingData.iterator(); while (items.hasNext()) { byte[] item = items.next(); items.remove(); channel.write(ByteBuffer.wrap(item)); } key.interestOps(SelectionKey.OP_READ); } private void doEcho(SelectionKey key, byte[] data) { SocketChannel channel = (SocketChannel) key.channel(); List pendingData = this.dataMap.get(channel); pendingData.add(data); key.interestOps(SelectionKey.OP_WRITE); } private static void log(String s) { System.out.println(s); } public static void main(String[] args) throws Exception { new EchoServer(null, 8989); } } 

该程序适用于传入连接。 但是当我进行传出连接时,该程序不起作用。 我需要通过connect(String hostname,int port)连续建立一些连接,并在方法read()中接收数据。 该程序停止在clientChannel.register(…)行上工作

您需要检查可连接的密钥,例如

 if (key.isConnectable()) { this.doConnect(key); } ... private void doConnect(SelectionKey key) { SocketChannel channel = (SocketChannel) key.channel(); if (channel.finishConnect()) { /* success */ } else { /* failure */ } } 

使用SocketChannel.finishConnect确定连接是否已成功建立。

这是我一直在使用的NIO客户端示例。 它提供了超时的开放/写入/读取function,适用于请求 – 响应消息传递。

棘手的部分始终是各方如何识别数据包被完全接收。 这个例子假设

  • 服务器获取one_line_command + newline(客户端 – >服务器数据包)
  • 客户端接收带有“>>”终结符行的1..n行,而不在终结符行中跟踪换行符(server-> client packet)
  • 您可以将终结符指定为“>> \ n”,但readUntil需要在换行符解析器中进行小修复

您可以编写4字节长度的报头,固定大小的数据包拆分器或分隔符0x27字节,但要确保它不能是数据有效负载的值。 NIO read()或write()从不假设您在一次调用中收到完整数据包。 或者可以在一个read()缓冲区中读取两个或多个数据包字节。 由我们来决定数据包解析器不会丢失字节。

 import java.util.*; import java.io.*; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; public class DPSocket { private boolean debug; private String host; private int port; private String charset; private ByteArrayOutputStream inBuffer; private ByteBuffer buf; private Selector selector; private SocketChannel channel; public DPSocket(String host, int port, String charset) { this.charset = charset==null || charset.equals("") ? "UTF-8" : charset; this.host = host; this.port = port; } public boolean isDebug() { return debug; } public void setDebug(boolean b) { debug=b; } public void open(long timeout) throws IOException { selector = Selector.open(); channel = SocketChannel.open(); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_CONNECT); channel.connect(new InetSocketAddress(host, port)); inBuffer = new ByteArrayOutputStream(1024); buf = ByteBuffer.allocate(1*1024); long sleep = Math.min(timeout, 1000); while(timeout > 0) { if (selector.select(sleep) < 1) { timeout-=sleep; continue; } Iterator keys = selector.selectedKeys().iterator(); while(keys.hasNext()) { SelectionKey key = keys.next(); keys.remove(); if (!key.isValid() || !key.isConnectable()) continue; SocketChannel channel = (SocketChannel)key.channel(); if (channel.isConnectionPending()) { channel.finishConnect(); channel.configureBlocking(false); if (debug) System.out.println("finishConnect"); return; // we are ready to receive bytes } } } throw new IOException("Connection timed out"); } public void close() { try { if(channel!=null) channel.close(); } catch(Exception ex) { } try { if(selector!=null) selector.close(); } catch(Exception ex) { } inBuffer=null; buf=null; } public void write(String data, long timeout) throws IOException { write(data.getBytes(charset), timeout); } public void write(byte[] bytes, long timeout) throws IOException { ByteBuffer outBuffer = ByteBuffer.wrap(bytes); channel.register(selector, SelectionKey.OP_WRITE); long sleep = Math.min(timeout, 1000); while(timeout > 0) { if (selector.select(sleep) < 1) { timeout-=sleep; continue; } Iterator keys = selector.selectedKeys().iterator(); while(keys.hasNext()) { SelectionKey key = keys.next(); keys.remove(); if (!key.isValid() || !key.isWritable()) continue; SocketChannel channel = (SocketChannel)key.channel(); if (debug) System.out.println("write remaining="+outBuffer.remaining()); channel.write(outBuffer); if (debug) System.out.println("write remaining="+outBuffer.remaining()); if (outBuffer.remaining()<1) return; } } throw new IOException("Write timed out"); } public List readUntil(String terminator, long timeout, boolean trimLines) throws IOException { return readUntil(new String[]{terminator}, timeout, trimLines); } public List readUntil(String[] terminators, long timeout, boolean trimLines) throws IOException { List lines = new ArrayList(12); inBuffer.reset(); // End of packet terminator strings, line startsWith "aabbcc" string. byte[][] arrTerminators = new byte[terminators.length][]; int[] idxTerminators = new int[terminators.length]; for(int idx=0; idx < terminators.length; idx++) { arrTerminators[idx] = terminators[idx].getBytes(charset); idxTerminators[idx] = 0; } int idxLineByte=-1; channel.register(selector, SelectionKey.OP_READ); long sleep = Math.min(timeout, 1000); while(timeout>0) { if (selector.select(sleep) < 1) { timeout-=sleep; continue; } Iterator keys = selector.selectedKeys().iterator(); while(keys.hasNext()) { SelectionKey key = keys.next(); keys.remove(); if (!key.isValid() || !key.isReadable()) continue; SocketChannel channel = (SocketChannel)key.channel(); buf.clear(); int len = channel.read(buf); if (len == -1) throw new IOException("Socket disconnected"); buf.flip(); for(int idx=0; idx lines; dps.open(15000); dps.write("Command1 arg1 arg2"+NEWLINE, TIMEOUT); lines = dps.readUntil(">>", TIMEOUT, true); dps.write("Command2 arg1 arg2"+NEWLINE, TIMEOUT); lines = dps.readUntil(">>", TIMEOUT, true); } catch (Exception ex) { String msg = ex.getMessage(); if (msg==null) msg = ex.getClass().getName(); if (msg.contains("timed out") || msg.contains("Invalid command ")) { System.out.println("ERROR: " + ex.getMessage()); } else { System.out.print("ERROR: "); ex.printStackTrace(); } } finally { dps.close(); } } }