Java非阻塞IO选择器导致通道寄存器阻塞

我有两个线程,我正在处理Java NIO的非阻塞套接字。 这是线程正在做的事情:

线程1:调用选择器的select()方法的循环。 如果有任何密钥可用,则会相应地处理它们。

线程2:偶尔通过调用register()将SocketChannel注册到选择器。

问题是,除非select()的超时非常小(比如大约100ms),对register()的调用将无限期地阻塞。 即使通道配置为非阻塞,并且javadocs声明Selector对象是线程安全的(但它的选择键不是,我知道)。

所以任何人都对这个问题有什么看法? 如果我把所有东西都放在一个线程中,该应用程 那时没有问题,但我真的想要有单独的线程。 任何帮助表示赞赏。 我在下面发布了我的示例代码:

将选择(1000)更改为选择(100)并且它将起作用。 保留为select()或select(1000),但不会。

import java.io.IOException; import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class UDPSocket { private DatagramChannel clientChannel; private String dstHost; private int dstPort; private static Selector recvSelector; private static volatile boolean initialized; private static ExecutorService eventQueue = Executors.newSingleThreadExecutor(); public static void init() { initialized = true; try { recvSelector = Selector.open(); } catch (IOException e) { System.err.println(e); } Thread t = new Thread(new Runnable() { @Override public void run() { while(initialized) { readData(); Thread.yield(); } } 
}); t.start(); } public static void shutdown() { initialized = false; } private static void readData() { try { int numKeys = recvSelector.select(1000); if (numKeys > 0) { Iterator i = recvSelector.selectedKeys().iterator(); while(i.hasNext()) { SelectionKey key = i.next(); i.remove(); if (key.isValid() && key.isReadable()) { DatagramChannel channel = (DatagramChannel) key.channel(); // allocate every time we receive so that it's a copy that won't get erased final ByteBuffer buffer = ByteBuffer.allocate(Short.MAX_VALUE); channel.receive(buffer); buffer.flip(); final SocketSubscriber subscriber = (SocketSubscriber) key.attachment(); // let user handle event on a dedicated thread eventQueue.execute(new Runnable() { @Override public void run() { subscriber.onData(buffer); } }); } } } } catch (IOException e) { System.err.println(e); }
} public UDPSocket(String dstHost, int dstPort) { try { this.dstHost = dstHost; this.dstPort = dstPort; clientChannel = DatagramChannel.open(); clientChannel.configureBlocking(false); } catch (IOException e) { System.err.println(e); } } public void addListener(SocketSubscriber subscriber) { try { DatagramChannel serverChannel = DatagramChannel.open(); serverChannel.configureBlocking(false); DatagramSocket socket = serverChannel.socket(); socket.bind(new InetSocketAddress(dstPort)); SelectionKey key = serverChannel.register(recvSelector, SelectionKey.OP_READ); key.attach(subscriber); } catch (IOException e) { System.err.println(e); } } public void send(ByteBuffer buffer) { try { clientChannel.send(buffer, new InetSocketAddress(dstHost, dstPort)); } catch (IOException e) { System.err.println(e); } } public void close() { try { clientChannel.close(); } catch (IOException e) { System.err.println(e); } } }

import java.io.IOException; import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class UDPSocket { private DatagramChannel clientChannel; private String dstHost; private int dstPort; private static Selector recvSelector; private static volatile boolean initialized; private static ExecutorService eventQueue = Executors.newSingleThreadExecutor(); public static void init() { initialized = true; try { recvSelector = Selector.open(); } catch (IOException e) { System.err.println(e); } Thread t = new Thread(new Runnable() { @Override public void run() { while(initialized) { readData(); Thread.yield(); } }
}); t.start(); } public static void shutdown() { initialized = false; } private static void readData() { try { int numKeys = recvSelector.select(1000); if (numKeys > 0) { Iterator i = recvSelector.selectedKeys().iterator(); while(i.hasNext()) { SelectionKey key = i.next(); i.remove(); if (key.isValid() && key.isReadable()) { DatagramChannel channel = (DatagramChannel) key.channel(); // allocate every time we receive so that it's a copy that won't get erased final ByteBuffer buffer = ByteBuffer.allocate(Short.MAX_VALUE); channel.receive(buffer); buffer.flip(); final SocketSubscriber subscriber = (SocketSubscriber) key.attachment(); // let user handle event on a dedicated thread eventQueue.execute(new Runnable() { @Override public void run() { subscriber.onData(buffer); } }); } } } } catch (IOException e) { System.err.println(e); }
} public UDPSocket(String dstHost, int dstPort) { try { this.dstHost = dstHost; this.dstPort = dstPort; clientChannel = DatagramChannel.open(); clientChannel.configureBlocking(false); } catch (IOException e) { System.err.println(e); } } public void addListener(SocketSubscriber subscriber) { try { DatagramChannel serverChannel = DatagramChannel.open(); serverChannel.configureBlocking(false); DatagramSocket socket = serverChannel.socket(); socket.bind(new InetSocketAddress(dstPort)); SelectionKey key = serverChannel.register(recvSelector, SelectionKey.OP_READ); key.attach(subscriber); } catch (IOException e) { System.err.println(e); } } public void send(ByteBuffer buffer) { try { clientChannel.send(buffer, new InetSocketAddress(dstHost, dstPort)); } catch (IOException e) { System.err.println(e); } } public void close() { try { clientChannel.close(); } catch (IOException e) { System.err.println(e); } } }

import java.io.IOException; import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class UDPSocket { private DatagramChannel clientChannel; private String dstHost; private int dstPort; private static Selector recvSelector; private static volatile boolean initialized; private static ExecutorService eventQueue = Executors.newSingleThreadExecutor(); public static void init() { initialized = true; try { recvSelector = Selector.open(); } catch (IOException e) { System.err.println(e); } Thread t = new Thread(new Runnable() { @Override public void run() { while(initialized) { readData(); Thread.yield(); } }
}); t.start(); } public static void shutdown() { initialized = false; } private static void readData() { try { int numKeys = recvSelector.select(1000); if (numKeys > 0) { Iterator i = recvSelector.selectedKeys().iterator(); while(i.hasNext()) { SelectionKey key = i.next(); i.remove(); if (key.isValid() && key.isReadable()) { DatagramChannel channel = (DatagramChannel) key.channel(); // allocate every time we receive so that it's a copy that won't get erased final ByteBuffer buffer = ByteBuffer.allocate(Short.MAX_VALUE); channel.receive(buffer); buffer.flip(); final SocketSubscriber subscriber = (SocketSubscriber) key.attachment(); // let user handle event on a dedicated thread eventQueue.execute(new Runnable() { @Override public void run() { subscriber.onData(buffer); } }); } } } } catch (IOException e) { System.err.println(e); }
} public UDPSocket(String dstHost, int dstPort) { try { this.dstHost = dstHost; this.dstPort = dstPort; clientChannel = DatagramChannel.open(); clientChannel.configureBlocking(false); } catch (IOException e) { System.err.println(e); } } public void addListener(SocketSubscriber subscriber) { try { DatagramChannel serverChannel = DatagramChannel.open(); serverChannel.configureBlocking(false); DatagramSocket socket = serverChannel.socket(); socket.bind(new InetSocketAddress(dstPort)); SelectionKey key = serverChannel.register(recvSelector, SelectionKey.OP_READ); key.attach(subscriber); } catch (IOException e) { System.err.println(e); } } public void send(ByteBuffer buffer) { try { clientChannel.send(buffer, new InetSocketAddress(dstHost, dstPort)); } catch (IOException e) { System.err.println(e); } } public void close() { try { clientChannel.close(); } catch (IOException e) { System.err.println(e); } } }

import java.io.IOException; import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class UDPSocket { private DatagramChannel clientChannel; private String dstHost; private int dstPort; private static Selector recvSelector; private static volatile boolean initialized; private static ExecutorService eventQueue = Executors.newSingleThreadExecutor(); public static void init() { initialized = true; try { recvSelector = Selector.open(); } catch (IOException e) { System.err.println(e); } Thread t = new Thread(new Runnable() { @Override public void run() { while(initialized) { readData(); Thread.yield(); } }
}); t.start(); } public static void shutdown() { initialized = false; } private static void readData() { try { int numKeys = recvSelector.select(1000); if (numKeys > 0) { Iterator i = recvSelector.selectedKeys().iterator(); while(i.hasNext()) { SelectionKey key = i.next(); i.remove(); if (key.isValid() && key.isReadable()) { DatagramChannel channel = (DatagramChannel) key.channel(); // allocate every time we receive so that it's a copy that won't get erased final ByteBuffer buffer = ByteBuffer.allocate(Short.MAX_VALUE); channel.receive(buffer); buffer.flip(); final SocketSubscriber subscriber = (SocketSubscriber) key.attachment(); // let user handle event on a dedicated thread eventQueue.execute(new Runnable() { @Override public void run() { subscriber.onData(buffer); } }); } } } } catch (IOException e) { System.err.println(e); }
} public UDPSocket(String dstHost, int dstPort) { try { this.dstHost = dstHost; this.dstPort = dstPort; clientChannel = DatagramChannel.open(); clientChannel.configureBlocking(false); } catch (IOException e) { System.err.println(e); } } public void addListener(SocketSubscriber subscriber) { try { DatagramChannel serverChannel = DatagramChannel.open(); serverChannel.configureBlocking(false); DatagramSocket socket = serverChannel.socket(); socket.bind(new InetSocketAddress(dstPort)); SelectionKey key = serverChannel.register(recvSelector, SelectionKey.OP_READ); key.attach(subscriber); } catch (IOException e) { System.err.println(e); } } public void send(ByteBuffer buffer) { try { clientChannel.send(buffer, new InetSocketAddress(dstHost, dstPort)); } catch (IOException e) { System.err.println(e); } } public void close() { try { clientChannel.close(); } catch (IOException e) { System.err.println(e); } } }

import java.io.IOException; import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class UDPSocket { private DatagramChannel clientChannel; private String dstHost; private int dstPort; private static Selector recvSelector; private static volatile boolean initialized; private static ExecutorService eventQueue = Executors.newSingleThreadExecutor(); public static void init() { initialized = true; try { recvSelector = Selector.open(); } catch (IOException e) { System.err.println(e); } Thread t = new Thread(new Runnable() { @Override public void run() { while(initialized) { readData(); Thread.yield(); } }
}); t.start(); } public static void shutdown() { initialized = false; } private static void readData() { try { int numKeys = recvSelector.select(1000); if (numKeys > 0) { Iterator i = recvSelector.selectedKeys().iterator(); while(i.hasNext()) { SelectionKey key = i.next(); i.remove(); if (key.isValid() && key.isReadable()) { DatagramChannel channel = (DatagramChannel) key.channel(); // allocate every time we receive so that it's a copy that won't get erased final ByteBuffer buffer = ByteBuffer.allocate(Short.MAX_VALUE); channel.receive(buffer); buffer.flip(); final SocketSubscriber subscriber = (SocketSubscriber) key.attachment(); // let user handle event on a dedicated thread eventQueue.execute(new Runnable() { @Override public void run() { subscriber.onData(buffer); } }); } } } } catch (IOException e) { System.err.println(e); }
} public UDPSocket(String dstHost, int dstPort) { try { this.dstHost = dstHost; this.dstPort = dstPort; clientChannel = DatagramChannel.open(); clientChannel.configureBlocking(false); } catch (IOException e) { System.err.println(e); } } public void addListener(SocketSubscriber subscriber) { try { DatagramChannel serverChannel = DatagramChannel.open(); serverChannel.configureBlocking(false); DatagramSocket socket = serverChannel.socket(); socket.bind(new InetSocketAddress(dstPort)); SelectionKey key = serverChannel.register(recvSelector, SelectionKey.OP_READ); key.attach(subscriber); } catch (IOException e) { System.err.println(e); } } public void send(ByteBuffer buffer) { try { clientChannel.send(buffer, new InetSocketAddress(dstHost, dstPort)); } catch (IOException e) { System.err.println(e); } } public void close() { try { clientChannel.close(); } catch (IOException e) { System.err.println(e); } } }

import java.io.IOException; import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class UDPSocket { private DatagramChannel clientChannel; private String dstHost; private int dstPort; private static Selector recvSelector; private static volatile boolean initialized; private static ExecutorService eventQueue = Executors.newSingleThreadExecutor(); public static void init() { initialized = true; try { recvSelector = Selector.open(); } catch (IOException e) { System.err.println(e); } Thread t = new Thread(new Runnable() { @Override public void run() { while(initialized) { readData(); Thread.yield(); } }
}); t.start(); } public static void shutdown() { initialized = false; } private static void readData() { try { int numKeys = recvSelector.select(1000); if (numKeys > 0) { Iterator i = recvSelector.selectedKeys().iterator(); while(i.hasNext()) { SelectionKey key = i.next(); i.remove(); if (key.isValid() && key.isReadable()) { DatagramChannel channel = (DatagramChannel) key.channel(); // allocate every time we receive so that it's a copy that won't get erased final ByteBuffer buffer = ByteBuffer.allocate(Short.MAX_VALUE); channel.receive(buffer); buffer.flip(); final SocketSubscriber subscriber = (SocketSubscriber) key.attachment(); // let user handle event on a dedicated thread eventQueue.execute(new Runnable() { @Override public void run() { subscriber.onData(buffer); } }); } } } } catch (IOException e) { System.err.println(e); }
} public UDPSocket(String dstHost, int dstPort) { try { this.dstHost = dstHost; this.dstPort = dstPort; clientChannel = DatagramChannel.open(); clientChannel.configureBlocking(false); } catch (IOException e) { System.err.println(e); } } public void addListener(SocketSubscriber subscriber) { try { DatagramChannel serverChannel = DatagramChannel.open(); serverChannel.configureBlocking(false); DatagramSocket socket = serverChannel.socket(); socket.bind(new InetSocketAddress(dstPort)); SelectionKey key = serverChannel.register(recvSelector, SelectionKey.OP_READ); key.attach(subscriber); } catch (IOException e) { System.err.println(e); } } public void send(ByteBuffer buffer) { try { clientChannel.send(buffer, new InetSocketAddress(dstHost, dstPort)); } catch (IOException e) { System.err.println(e); } } public void close() { try { clientChannel.close(); } catch (IOException e) { System.err.println(e); } } }

import java.io.IOException; import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class UDPSocket { private DatagramChannel clientChannel; private String dstHost; private int dstPort; private static Selector recvSelector; private static volatile boolean initialized; private static ExecutorService eventQueue = Executors.newSingleThreadExecutor(); public static void init() { initialized = true; try { recvSelector = Selector.open(); } catch (IOException e) { System.err.println(e); } Thread t = new Thread(new Runnable() { @Override public void run() { while(initialized) { readData(); Thread.yield(); } }
}); t.start(); } public static void shutdown() { initialized = false; } private static void readData() { try { int numKeys = recvSelector.select(1000); if (numKeys > 0) { Iterator i = recvSelector.selectedKeys().iterator(); while(i.hasNext()) { SelectionKey key = i.next(); i.remove(); if (key.isValid() && key.isReadable()) { DatagramChannel channel = (DatagramChannel) key.channel(); // allocate every time we receive so that it's a copy that won't get erased final ByteBuffer buffer = ByteBuffer.allocate(Short.MAX_VALUE); channel.receive(buffer); buffer.flip(); final SocketSubscriber subscriber = (SocketSubscriber) key.attachment(); // let user handle event on a dedicated thread eventQueue.execute(new Runnable() { @Override public void run() { subscriber.onData(buffer); } }); } } } } catch (IOException e) { System.err.println(e); }
} public UDPSocket(String dstHost, int dstPort) { try { this.dstHost = dstHost; this.dstPort = dstPort; clientChannel = DatagramChannel.open(); clientChannel.configureBlocking(false); } catch (IOException e) { System.err.println(e); } } public void addListener(SocketSubscriber subscriber) { try { DatagramChannel serverChannel = DatagramChannel.open(); serverChannel.configureBlocking(false); DatagramSocket socket = serverChannel.socket(); socket.bind(new InetSocketAddress(dstPort)); SelectionKey key = serverChannel.register(recvSelector, SelectionKey.OP_READ); key.attach(subscriber); } catch (IOException e) { System.err.println(e); } } public void send(ByteBuffer buffer) { try { clientChannel.send(buffer, new InetSocketAddress(dstHost, dstPort)); } catch (IOException e) { System.err.println(e); } } public void close() { try { clientChannel.close(); } catch (IOException e) { System.err.println(e); } } }

import java.io.IOException; import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class UDPSocket { private DatagramChannel clientChannel; private String dstHost; private int dstPort; private static Selector recvSelector; private static volatile boolean initialized; private static ExecutorService eventQueue = Executors.newSingleThreadExecutor(); public static void init() { initialized = true; try { recvSelector = Selector.open(); } catch (IOException e) { System.err.println(e); } Thread t = new Thread(new Runnable() { @Override public void run() { while(initialized) { readData(); Thread.yield(); } }
}); t.start(); } public static void shutdown() { initialized = false; } private static void readData() { try { int numKeys = recvSelector.select(1000); if (numKeys > 0) { Iterator i = recvSelector.selectedKeys().iterator(); while(i.hasNext()) { SelectionKey key = i.next(); i.remove(); if (key.isValid() && key.isReadable()) { DatagramChannel channel = (DatagramChannel) key.channel(); // allocate every time we receive so that it's a copy that won't get erased final ByteBuffer buffer = ByteBuffer.allocate(Short.MAX_VALUE); channel.receive(buffer); buffer.flip(); final SocketSubscriber subscriber = (SocketSubscriber) key.attachment(); // let user handle event on a dedicated thread eventQueue.execute(new Runnable() { @Override public void run() { subscriber.onData(buffer); } }); } } } } catch (IOException e) { System.err.println(e); }
} public UDPSocket(String dstHost, int dstPort) { try { this.dstHost = dstHost; this.dstPort = dstPort; clientChannel = DatagramChannel.open(); clientChannel.configureBlocking(false); } catch (IOException e) { System.err.println(e); } } public void addListener(SocketSubscriber subscriber) { try { DatagramChannel serverChannel = DatagramChannel.open(); serverChannel.configureBlocking(false); DatagramSocket socket = serverChannel.socket(); socket.bind(new InetSocketAddress(dstPort)); SelectionKey key = serverChannel.register(recvSelector, SelectionKey.OP_READ); key.attach(subscriber); } catch (IOException e) { System.err.println(e); } } public void send(ByteBuffer buffer) { try { clientChannel.send(buffer, new InetSocketAddress(dstHost, dstPort)); } catch (IOException e) { System.err.println(e); } } public void close() { try { clientChannel.close(); } catch (IOException e) { System.err.println(e); } } }

 import java.io.IOException; import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class UDPSocket { private DatagramChannel clientChannel; private String dstHost; private int dstPort; private static Selector recvSelector; private static volatile boolean initialized; private static ExecutorService eventQueue = Executors.newSingleThreadExecutor(); public static void init() { initialized = true; try { recvSelector = Selector.open(); } catch (IOException e) { System.err.println(e); } Thread t = new Thread(new Runnable() { @Override public void run() { while(initialized) { readData(); Thread.yield(); } } 
}); t.start(); } public static void shutdown() { initialized = false; } private static void readData() { try { int numKeys = recvSelector.select(1000); if (numKeys > 0) { Iterator i = recvSelector.selectedKeys().iterator(); while(i.hasNext()) { SelectionKey key = i.next(); i.remove(); if (key.isValid() && key.isReadable()) { DatagramChannel channel = (DatagramChannel) key.channel(); // allocate every time we receive so that it's a copy that won't get erased final ByteBuffer buffer = ByteBuffer.allocate(Short.MAX_VALUE); channel.receive(buffer); buffer.flip(); final SocketSubscriber subscriber = (SocketSubscriber) key.attachment(); // let user handle event on a dedicated thread eventQueue.execute(new Runnable() { @Override public void run() { subscriber.onData(buffer); } }); } } } } catch (IOException e) { System.err.println(e); }
} public UDPSocket(String dstHost, int dstPort) { try { this.dstHost = dstHost; this.dstPort = dstPort; clientChannel = DatagramChannel.open(); clientChannel.configureBlocking(false); } catch (IOException e) { System.err.println(e); } } public void addListener(SocketSubscriber subscriber) { try { DatagramChannel serverChannel = DatagramChannel.open(); serverChannel.configureBlocking(false); DatagramSocket socket = serverChannel.socket(); socket.bind(new InetSocketAddress(dstPort)); SelectionKey key = serverChannel.register(recvSelector, SelectionKey.OP_READ); key.attach(subscriber); } catch (IOException e) { System.err.println(e); } } public void send(ByteBuffer buffer) { try { clientChannel.send(buffer, new InetSocketAddress(dstHost, dstPort)); } catch (IOException e) { System.err.println(e); } } public void close() { try { clientChannel.close(); } catch (IOException e) { System.err.println(e); } } }

import java.io.IOException; import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class UDPSocket { private DatagramChannel clientChannel; private String dstHost; private int dstPort; private static Selector recvSelector; private static volatile boolean initialized; private static ExecutorService eventQueue = Executors.newSingleThreadExecutor(); public static void init() { initialized = true; try { recvSelector = Selector.open(); } catch (IOException e) { System.err.println(e); } Thread t = new Thread(new Runnable() { @Override public void run() { while(initialized) { readData(); Thread.yield(); } }
}); t.start(); } public static void shutdown() { initialized = false; } private static void readData() { try { int numKeys = recvSelector.select(1000); if (numKeys > 0) { Iterator i = recvSelector.selectedKeys().iterator(); while(i.hasNext()) { SelectionKey key = i.next(); i.remove(); if (key.isValid() && key.isReadable()) { DatagramChannel channel = (DatagramChannel) key.channel(); // allocate every time we receive so that it's a copy that won't get erased final ByteBuffer buffer = ByteBuffer.allocate(Short.MAX_VALUE); channel.receive(buffer); buffer.flip(); final SocketSubscriber subscriber = (SocketSubscriber) key.attachment(); // let user handle event on a dedicated thread eventQueue.execute(new Runnable() { @Override public void run() { subscriber.onData(buffer); } }); } } } } catch (IOException e) { System.err.println(e); }
} public UDPSocket(String dstHost, int dstPort) { try { this.dstHost = dstHost; this.dstPort = dstPort; clientChannel = DatagramChannel.open(); clientChannel.configureBlocking(false); } catch (IOException e) { System.err.println(e); } } public void addListener(SocketSubscriber subscriber) { try { DatagramChannel serverChannel = DatagramChannel.open(); serverChannel.configureBlocking(false); DatagramSocket socket = serverChannel.socket(); socket.bind(new InetSocketAddress(dstPort)); SelectionKey key = serverChannel.register(recvSelector, SelectionKey.OP_READ); key.attach(subscriber); } catch (IOException e) { System.err.println(e); } } public void send(ByteBuffer buffer) { try { clientChannel.send(buffer, new InetSocketAddress(dstHost, dstPort)); } catch (IOException e) { System.err.println(e); } } public void close() { try { clientChannel.close(); } catch (IOException e) { System.err.println(e); } } }

import java.io.IOException; import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class UDPSocket { private DatagramChannel clientChannel; private String dstHost; private int dstPort; private static Selector recvSelector; private static volatile boolean initialized; private static ExecutorService eventQueue = Executors.newSingleThreadExecutor(); public static void init() { initialized = true; try { recvSelector = Selector.open(); } catch (IOException e) { System.err.println(e); } Thread t = new Thread(new Runnable() { @Override public void run() { while(initialized) { readData(); Thread.yield(); } }
}); t.start(); } public static void shutdown() { initialized = false; } private static void readData() { try { int numKeys = recvSelector.select(1000); if (numKeys > 0) { Iterator i = recvSelector.selectedKeys().iterator(); while(i.hasNext()) { SelectionKey key = i.next(); i.remove(); if (key.isValid() && key.isReadable()) { DatagramChannel channel = (DatagramChannel) key.channel(); // allocate every time we receive so that it's a copy that won't get erased final ByteBuffer buffer = ByteBuffer.allocate(Short.MAX_VALUE); channel.receive(buffer); buffer.flip(); final SocketSubscriber subscriber = (SocketSubscriber) key.attachment(); // let user handle event on a dedicated thread eventQueue.execute(new Runnable() { @Override public void run() { subscriber.onData(buffer); } }); } } } } catch (IOException e) { System.err.println(e); }
} public UDPSocket(String dstHost, int dstPort) { try { this.dstHost = dstHost; this.dstPort = dstPort; clientChannel = DatagramChannel.open(); clientChannel.configureBlocking(false); } catch (IOException e) { System.err.println(e); } } public void addListener(SocketSubscriber subscriber) { try { DatagramChannel serverChannel = DatagramChannel.open(); serverChannel.configureBlocking(false); DatagramSocket socket = serverChannel.socket(); socket.bind(new InetSocketAddress(dstPort)); SelectionKey key = serverChannel.register(recvSelector, SelectionKey.OP_READ); key.attach(subscriber); } catch (IOException e) { System.err.println(e); } } public void send(ByteBuffer buffer) { try { clientChannel.send(buffer, new InetSocketAddress(dstHost, dstPort)); } catch (IOException e) { System.err.println(e); } } public void close() { try { clientChannel.close(); } catch (IOException e) { System.err.println(e); } } }

import java.io.IOException; import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class UDPSocket { private DatagramChannel clientChannel; private String dstHost; private int dstPort; private static Selector recvSelector; private static volatile boolean initialized; private static ExecutorService eventQueue = Executors.newSingleThreadExecutor(); public static void init() { initialized = true; try { recvSelector = Selector.open(); } catch (IOException e) { System.err.println(e); } Thread t = new Thread(new Runnable() { @Override public void run() { while(initialized) { readData(); Thread.yield(); } }
}); t.start(); } public static void shutdown() { initialized = false; } private static void readData() { try { int numKeys = recvSelector.select(1000); if (numKeys > 0) { Iterator i = recvSelector.selectedKeys().iterator(); while(i.hasNext()) { SelectionKey key = i.next(); i.remove(); if (key.isValid() && key.isReadable()) { DatagramChannel channel = (DatagramChannel) key.channel(); // allocate every time we receive so that it's a copy that won't get erased final ByteBuffer buffer = ByteBuffer.allocate(Short.MAX_VALUE); channel.receive(buffer); buffer.flip(); final SocketSubscriber subscriber = (SocketSubscriber) key.attachment(); // let user handle event on a dedicated thread eventQueue.execute(new Runnable() { @Override public void run() { subscriber.onData(buffer); } }); } } } } catch (IOException e) { System.err.println(e); }
} public UDPSocket(String dstHost, int dstPort) { try { this.dstHost = dstHost; this.dstPort = dstPort; clientChannel = DatagramChannel.open(); clientChannel.configureBlocking(false); } catch (IOException e) { System.err.println(e); } } public void addListener(SocketSubscriber subscriber) { try { DatagramChannel serverChannel = DatagramChannel.open(); serverChannel.configureBlocking(false); DatagramSocket socket = serverChannel.socket(); socket.bind(new InetSocketAddress(dstPort)); SelectionKey key = serverChannel.register(recvSelector, SelectionKey.OP_READ); key.attach(subscriber); } catch (IOException e) { System.err.println(e); } } public void send(ByteBuffer buffer) { try { clientChannel.send(buffer, new InetSocketAddress(dstHost, dstPort)); } catch (IOException e) { System.err.println(e); } } public void close() { try { clientChannel.close(); } catch (IOException e) { System.err.println(e); } } }

import java.io.IOException; import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class UDPSocket { private DatagramChannel clientChannel; private String dstHost; private int dstPort; private static Selector recvSelector; private static volatile boolean initialized; private static ExecutorService eventQueue = Executors.newSingleThreadExecutor(); public static void init() { initialized = true; try { recvSelector = Selector.open(); } catch (IOException e) { System.err.println(e); } Thread t = new Thread(new Runnable() { @Override public void run() { while(initialized) { readData(); Thread.yield(); } }
}); t.start(); } public static void shutdown() { initialized = false; } private static void readData() { try { int numKeys = recvSelector.select(1000); if (numKeys > 0) { Iterator i = recvSelector.selectedKeys().iterator(); while(i.hasNext()) { SelectionKey key = i.next(); i.remove(); if (key.isValid() && key.isReadable()) { DatagramChannel channel = (DatagramChannel) key.channel(); // allocate every time we receive so that it's a copy that won't get erased final ByteBuffer buffer = ByteBuffer.allocate(Short.MAX_VALUE); channel.receive(buffer); buffer.flip(); final SocketSubscriber subscriber = (SocketSubscriber) key.attachment(); // let user handle event on a dedicated thread eventQueue.execute(new Runnable() { @Override public void run() { subscriber.onData(buffer); } }); } } } } catch (IOException e) { System.err.println(e); }
} public UDPSocket(String dstHost, int dstPort) { try { this.dstHost = dstHost; this.dstPort = dstPort; clientChannel = DatagramChannel.open(); clientChannel.configureBlocking(false); } catch (IOException e) { System.err.println(e); } } public void addListener(SocketSubscriber subscriber) { try { DatagramChannel serverChannel = DatagramChannel.open(); serverChannel.configureBlocking(false); DatagramSocket socket = serverChannel.socket(); socket.bind(new InetSocketAddress(dstPort)); SelectionKey key = serverChannel.register(recvSelector, SelectionKey.OP_READ); key.attach(subscriber); } catch (IOException e) { System.err.println(e); } } public void send(ByteBuffer buffer) { try { clientChannel.send(buffer, new InetSocketAddress(dstHost, dstPort)); } catch (IOException e) { System.err.println(e); } } public void close() { try { clientChannel.close(); } catch (IOException e) { System.err.println(e); } } }

import java.io.IOException; import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class UDPSocket { private DatagramChannel clientChannel; private String dstHost; private int dstPort; private static Selector recvSelector; private static volatile boolean initialized; private static ExecutorService eventQueue = Executors.newSingleThreadExecutor(); public static void init() { initialized = true; try { recvSelector = Selector.open(); } catch (IOException e) { System.err.println(e); } Thread t = new Thread(new Runnable() { @Override public void run() { while(initialized) { readData(); Thread.yield(); } }
}); t.start(); } public static void shutdown() { initialized = false; } private static void readData() { try { int numKeys = recvSelector.select(1000); if (numKeys > 0) { Iterator i = recvSelector.selectedKeys().iterator(); while(i.hasNext()) { SelectionKey key = i.next(); i.remove(); if (key.isValid() && key.isReadable()) { DatagramChannel channel = (DatagramChannel) key.channel(); // allocate every time we receive so that it's a copy that won't get erased final ByteBuffer buffer = ByteBuffer.allocate(Short.MAX_VALUE); channel.receive(buffer); buffer.flip(); final SocketSubscriber subscriber = (SocketSubscriber) key.attachment(); // let user handle event on a dedicated thread eventQueue.execute(new Runnable() { @Override public void run() { subscriber.onData(buffer); } }); } } } } catch (IOException e) { System.err.println(e); }
} public UDPSocket(String dstHost, int dstPort) { try { this.dstHost = dstHost; this.dstPort = dstPort; clientChannel = DatagramChannel.open(); clientChannel.configureBlocking(false); } catch (IOException e) { System.err.println(e); } } public void addListener(SocketSubscriber subscriber) { try { DatagramChannel serverChannel = DatagramChannel.open(); serverChannel.configureBlocking(false); DatagramSocket socket = serverChannel.socket(); socket.bind(new InetSocketAddress(dstPort)); SelectionKey key = serverChannel.register(recvSelector, SelectionKey.OP_READ); key.attach(subscriber); } catch (IOException e) { System.err.println(e); } } public void send(ByteBuffer buffer) { try { clientChannel.send(buffer, new InetSocketAddress(dstHost, dstPort)); } catch (IOException e) { System.err.println(e); } } public void close() { try { clientChannel.close(); } catch (IOException e) { System.err.println(e); } } }

 import java.nio.ByteBuffer; 

public interface SocketSubscriber {public void onData(ByteBuffer data); }

用法示例:

 public class Test implements SocketSubscriber { public static void main(String[] args) throws Exception { UDPSocket.init(); UDPSocket test = new UDPSocket("localhost", 1234); test.addListener(new Test()); UDPSocket test2 = new UDPSocket("localhost", 4321); test2.addListener(new Test()); System.out.println("Listening..."); ByteBuffer buffer = ByteBuffer.allocate(500); test.send(buffer); buffer.rewind(); test2.send(buffer); System.out.println("Data sent..."); Thread.sleep(5000); UDPSocket.shutdown(); } 

@Override public void onData(ByteBuffer data){System.out.println(“Received”+ data.limit()+“bytes of data。”); }}

Selector有几个记录的内部同步级别,您将全部遇到这些级别。 在调用register().之前,在选择器上调用wakeup() register(). 如果选择的键为零,则确保select()循环正常工作,这就是wakeup().会发生的情况wakeup().

我今天遇到了同样的问题(那就是“wakeupAndRegister”无法使用)。 我希望我的解决方案可能有所帮助:

创建同步对象:

 Object registeringSync = new Object(); 

通过以下方式注册频道:

 synchronized (registeringSync) { selector.wakeup(); // Wakes up a CURRENT or (important) NEXT select // !!! Might run into a deadlock "between" these lines if not using the lock !!! // To force it, insert Thread.sleep(1000); here channel.register(selector, ...); } 

该线程应该执行以下操作:

 public void run() { while (initialized) { if (selector.select() != 0) { // Blocks until "wakeup" // Iterate through selected keys } synchronized (registeringSync) { } // Cannot continue until "register" is complete } }