非阻塞套接字

在Java中实现非阻塞套接字的最佳方法是什么?

还是有这样的事情? 我有一个通过套接字与服务器通信的程序,但是如果数据/连接有问题,我不希望套接字调用阻塞/导致延迟。

其中一些答案是不正确的。 SocketChannel.configureBlocking(false)将其置于非阻塞模式。 您不需要选择器来执行此操作。 您只需要一个Selector来实现超时或带有非阻塞套接字的多路复用 I / O.

Java 2 Standard Edition 1.4中引入的Java 非阻塞套接字允许应用程序之间的网络通信,而不会阻止使用套接字的进程。 但是什么是非阻塞套接字,在哪种情况下它可能有用,以及它是如何工作的?

什么是非阻塞套接字?

非阻塞套接字允许在通道上进行I / O操作,而不会阻止使用它的进程。 这意味着,我们可以使用单个线程来处理多个并发连接并获得“异步高性能”读/写操作(有些人可能不同意)

好的, 在哪些情况下它可能有用?

假设您希望实现接受不同客户端连接的服务器。 同时假设您希望服务器能够同时处理多个请求。 使用传统方式,您有两种选择来开发这样的服务器:

  • 实现一个multithreading服务器,为每个连接手动处理一个线程。
  • 使用外部第三方模块。

这两种解决方案都可以工作,但是采用第一种解决方案来开发整个线程管理解决方案,具有相关的并发性和冲突问题。 第二种解决方案使应用程序依赖于非JDK外部模块,可能您必须使库适应您的需求。 通过非阻塞套接字,您可以实现非阻塞服务器,而无需直接管理线程或使用外部模块。

这个怎么运作?

在详细介绍之前,您需要了解的术语很少:

  • 在基于NIO的实现中,我们不是将数据写入输出流并从输入流中读取数据,而是从缓冲区读取和写入数据。 缓冲区可以定义为临时存储。
  • 通道将大量数据传入和传出缓冲区 。 此外,它可以被视为通信的端点。
  • 准备选择是一个概念,它指的是“选择在读取或写入数据时不会阻塞的套接字的能力”。

Java NIO有一个名为Selector的类,它允许单个线程检查多个通道上的I / O事件。 这怎么可能? 好吧, selector可以检查通道的“准备就绪” ,例如客户端尝试连接或读/写操作。 也就是说, Selector每个实例都可以监视更多的套接字通道 ,从而监视更多的连接。 现在,当通道上发生某些事件(发生事件)时, selector通知应用程序处理请求selector通过创建事件键 (或选择键)来完成它,这些键是SelectionKey类的实例。 每个key包含有关发出请求的人以及请求的 类型的信息 ,如图1所示。

图1:结构图 图1:结构图

一个基本的实现

服务器实现由无限循环组成,其中selector等待事件并创建事件密钥。 密钥有四种可能的类型:

  • 可接受:关联的客户端请求连接。
  • 可连接:服务器接受连接。
  • 可读:服务器可以读取。
  • 可写:服务器可以写。

通常在服务器端创建acceptable密钥。 实际上,这种密钥只是简单地通知服务器客户端需要连接,然后服务器将套接字通道个性化并将其与选择器相关联以进行读/写操作。 在此之后,当接受的客户端读取或写入某些内容时,选择器将为该客户端创建readablewriteable密钥。

现在,您已准备好按照提议的算法用Java编写服务器。 可以通过以下方式创建套接字通道, selector和套接字选择器注册:

 final String HOSTNAME = "127.0.0.1"; final int PORT = 8511; // This is how you open a ServerSocketChannel serverChannel = ServerSocketChannel.open(); // You MUST configure as non-blocking or else you cannot register the serverChannel to the Selector. serverChannel.configureBlocking(false); // bind to the address that you will use to Serve. serverChannel.socket().bind(new InetSocketAddress(HOSTNAME, PORT)); // This is how you open a Selector selector = Selector.open(); /* * Here you are registering the serverSocketChannel to accept connection, thus the OP_ACCEPT. * This means that you just told your selector that this channel will be used to accept connections. * We can change this operation later to read/write, more on this later. */ serverChannel.register(selector, SelectionKey.OP_ACCEPT); 

首先,我们使用ServerSocketChannel.open()方法创建一个SocketChannel实例。 接下来, configureBlocking(false)调用将此channel设置为非阻塞 。 与服务器的连接由serverChannel.socket().bind()方法完成。 HOSTNAME表示服务器的IP地址, PORT是通信端口。 最后,调用Selector.open()方法创建一个selector实例并将其注册到channel和注册类型。 在此示例中,注册类型为OP_ACCEPT ,这意味着选择器仅报告客户端尝试连接到服务器。 其他可能的选项是: OP_CONNECT ,将由客户端使用; OP_READ ; 和OP_WRITE

现在我们需要使用无限循环来处理这些请求。 一个简单的方法如下:

 // Run the server as long as the thread is not interrupted. while (!Thread.currentThread().isInterrupted()) { /* * selector.select(TIMEOUT) is waiting for an OPERATION to be ready and is a blocking call. * For example, if a client connects right this second, then it will break from the select() * call and run the code below it. The TIMEOUT is not needed, but its just so it doesn't * block undefinable. */ selector.select(TIMEOUT); /* * If we are here, it is because an operation happened (or the TIMEOUT expired). * We need to get the SelectionKeys from the selector to see what operations are available. * We use an iterator for this. */ Iterator keys = selector.selectedKeys().iterator(); while (keys.hasNext()) { SelectionKey key = keys.next(); // remove the key so that we don't process this OPERATION again. keys.remove(); // key could be invalid if for example, the client closed the connection. if (!key.isValid()) { continue; } /* * In the server, we start by listening to the OP_ACCEPT when we register with the Selector. * If the key from the keyset is Acceptable, then we must get ready to accept the client * connection and do something with it. Go read the comments in the accept method. */ if (key.isAcceptable()) { System.out.println("Accepting connection"); accept(key); } /* * If you already read the comments in the accept() method, then you know we changed * the OPERATION to OP_WRITE. This means that one of these keys in the iterator will return * a channel that is writable (key.isWritable()). The write() method will explain further. */ if (key.isWritable()) { System.out.println("Writing..."); write(key); } /* * If you already read the comments in the write method then you understand that we registered * the OPERATION OP_READ. That means that on the next Selector.select(), there is probably a key * that is ready to read (key.isReadable()). The read() method will explain further. */ if (key.isReadable()) { System.out.println("Reading connection"); read(key); } } } 

您可以在此处找到实施源

注意:异步服务器

作为非阻塞实现的替代方案,我们可以部署异步服务器。 例如,您可以使用AsynchronousServerSocketChannel类,它为面向流的侦听套接字提供异步通道。

要使用它,首先执行其静态open()方法,然后bind()到特定端口 。 接下来,您将执行其accept()方法,并向其传递一个实现CompletionHandler接口的类。 通常,您会发现将处理程序创建为匿名内部类

从这个AsynchronousServerSocketChannel对象中,您调用accept()来告诉它开始侦听连接,并向其传递一个自定义的CompletionHandler实例。 当我们调用accept() ,它会立即返回。 请注意,这与传统的阻塞方法不同; 而accept()方法被阻塞,直到客户端连接到它AsynchronousServerSocketChannel accept()方法为您处理它。

这里有一个例子:

 public class NioSocketServer { public NioSocketServer() { try { // Create an AsynchronousServerSocketChannel that will listen on port 5000 final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel .open() .bind(new InetSocketAddress(5000)); // Listen for a new request listener.accept(null, new CompletionHandler() { @Override public void completed(AsynchronousSocketChannel ch, Void att) { // Accept the next connection listener.accept(null, this); // Greet the client ch.write(ByteBuffer.wrap("Hello, I am Echo Server 2020, let's have an engaging conversation!\n".getBytes())); // Allocate a byte buffer (4K) to read from the client ByteBuffer byteBuffer = ByteBuffer.allocate(4096); try { // Read the first line int bytesRead = ch.read(byteBuffer).get(20, TimeUnit.SECONDS); boolean running = true; while (bytesRead != -1 && running) { System.out.println("bytes read: " + bytesRead); // Make sure that we have data to read if (byteBuffer.position() > 2) { // Make the buffer ready to read byteBuffer.flip(); // Convert the buffer into a line byte[] lineBytes = new byte[bytesRead]; byteBuffer.get(lineBytes, 0, bytesRead); String line = new String(lineBytes); // Debug System.out.println("Message: " + line); // Echo back to the caller ch.write(ByteBuffer.wrap(line.getBytes())); // Make the buffer ready to write byteBuffer.clear(); // Read the next line bytesRead = ch.read(byteBuffer).get(20, TimeUnit.SECONDS); } else { // An empty line signifies the end of the conversation in our protocol running = false; } } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { // The user exceeded the 20 second timeout, so close the connection ch.write(ByteBuffer.wrap("Good Bye\n".getBytes())); System.out.println("Connection timed out, closing connection"); } System.out.println("End of conversation"); try { // Close the connection if we need to if (ch.isOpen()) { ch.close(); } } catch (I/OException e1) { e1.printStackTrace(); } } @Override public void failed(Throwable exc, Void att) { ///... } }); } catch (I/OException e) { e.printStackTrace(); } } public static void main(String[] args) { NioSocketServer server = new NioSocketServer(); try { Thread.sleep(60000); } catch (Exception e) { e.printStackTrace(); } } } 

你可以在这里找到完整的代码

除了使用非阻塞IO之外,您可能会发现为连接创建写入线程要简单得多。

注意:如果您只需要几千个连接,则每个连接一到两个线程更简单。 如果每台服务器有大约一万或更多的连接,则需要NIO和选择器。

java.nio包提供了Selector的工作方式,就像在C中一样。

我刚写了这段代码。 它运作良好。 这是上面答案中提到的Java NIO的一个例子,但在这里我发布了代码。

 ServerSocketChannel ssc = null; try { ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress(port)); ssc.configureBlocking(false); while (true) { SocketChannel sc = ssc.accept(); if (sc == null) { // No connections came . } else { // You got a connection. Do something } } } catch (IOException e) { e.printStackTrace(); }