Java网络:即兴的Socket / InputStream

我正在Java的套接字上实现一个面向事件的层,我想知道是否有办法确定是否有待读取的数据。

我的常规方法是从套接字读入缓冲区,并在缓冲区填充给定量的字节时调用提供的回调(如果每次到达时都需要触发回调,则可以为0),但是我怀疑Java已经为我做了缓冲。

InputStream的available()方法对此可靠吗? 我应该read()并在Socket上做我自己的缓冲吗? 或者还有另一种方式吗?

不久,没有。 available()不可靠(至少不适合我)。 我建议使用与SelectorSelectionKey连接的java.nio.channels.SocketChannel 。 这个解决方案有点基于事件,但比普通的套接字更复杂。

对于客户:

  1. 构造套接字通道( socket ),打开一个选择器( selector = Selector.open(); )。
  2. 使用非阻塞socket.configureBlocking(false);
  3. 注册连接器socket.register(selector, SelectionKey.OP_CONNECT);
  4. 连接socket.connect(new InetSocketAddress(host, port));
  5. 看看是否有新的selector.select();
  6. 如果“new”表示连接成功,则为OP_READ注册选择器; 如果“new”指的是可用的数据,只需从套接字读取即可。

但是,为了使它具有异步性,您需要设置一个单独的线程(尽管套接字被创建为非阻塞,但线程仍将阻塞),以检查是否已经到达某些内容。

对于服务器,有ServerSocketChannel ,您可以使用OP_ACCEPT

作为参考,这是我的代码(客户端),应该给你一个提示:

  private Thread readingThread = new ListeningThread(); /** * Listening thread - reads messages in a separate thread so the application does not get blocked. */ private class ListeningThread extends Thread { public void run() { running = true; try { while(!close) listen(); messenger.close(); } catch(ConnectException ce) { doNotifyConnectionFailed(ce); } catch(Exception e) { // e.printStackTrace(); messenger.close(); } running = false; } } /** * Connects to host and port. * @param host Host to connect to. * @param port Port of the host machine to connect to. */ public void connect(String host, int port) { try { SocketChannel socket = SocketChannel.open(); socket.configureBlocking(false); socket.register(this.selector, SelectionKey.OP_CONNECT); socket.connect(new InetSocketAddress(host, port)); } catch(IOException e) { this.doNotifyConnectionFailed(e); } } /** * Waits for an event to happen, processes it and then returns. * @throws IOException when something goes wrong. */ protected void listen() throws IOException { // see if there are any new things going on this.selector.select(); // process events Iterator iter = selector.selectedKeys().iterator(); while(iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); // check validity if(key.isValid()) { // if connectable... if(key.isConnectable()) { // ...establish connection, make messenger, and notify everyone SocketChannel client = (SocketChannel)key.channel(); // now this is tricky, registering for OP_READ earlier causes the selector not to wait for incoming bytes, which results in 100% cpu usage very, very fast if(client!=null && client.finishConnect()) { client.register(this.selector, SelectionKey.OP_READ); } } // if readable, tell messenger to read bytes else if(key.isReadable() && (SocketChannel)key.channel()==this.messenger.getSocket()) { // read message here } } } } /** * Starts the client. */ public void start() { // start a reading thread if(!this.running) { this.readingThread = new ListeningThread(); this.readingThread.start(); } } /** * Tells the client to close at nearest possible moment. */ public void close() { this.close = true; } 

对于服务器:

  /** * Constructs a server. * @param port Port to listen to. * @param protocol Protocol of messages. * @throws IOException when something goes wrong. */ public ChannelMessageServer(int port) throws IOException { this.server = ServerSocketChannel.open(); this.server.configureBlocking(false); this.server.socket().bind(new InetSocketAddress(port)); this.server.register(this.selector, SelectionKey.OP_ACCEPT); } /** * Waits for event, then exits. * @throws IOException when something goes wrong. */ protected void listen() throws IOException { // see if there are any new things going on this.selector.select(); // process events Iterator iter = selector.selectedKeys().iterator(); while(iter.hasNext()) { SelectionKey key = iter.next(); // do something with the connected socket iter.remove(); if(key.isValid()) this.process(key); } } /** * Processes a selection key. * @param key SelectionKey. * @throws IOException when something is wrong. */ protected void process(SelectionKey key) throws IOException { // if incoming connection if(key.isAcceptable()) { // get client SocketChannel client = (((ServerSocketChannel)key.channel()).accept()); try { client.configureBlocking(false); client.register(this.selector, SelectionKey.OP_READ); } catch(Exception e) { // catch } } // if readable, tell messenger to read else if(key.isReadable()) { // read } } 

希望这可以帮助。

available()只会告诉您是否可以在不进入操作系统的情况下读取数据。 它在这里不是很有用。

您可以根据需要执行阻止或非阻塞读取。 当没有要读取的数据时,只会返回非阻塞读取,这可能就是您想要的。