如何确保我不是同时在两个线程之间共享相同的套接字?

我有一个代码,我正在处理套接字,我需要确保我不在两个线程之间共享相同的套接字 。 在我的下面的代码中,我有一个后台线程,每60秒运行一次并调用updateLiveSockets()方法。 在updateLiveSockets()方法中,我迭代我拥有的所有套接字,然后通过调用SendToQueue类的send方法SendToQueue ping它们,并根据响应我将它们标记为活动或死亡。

现在所有的读者线程将同时调用getNextSocket()方法来获取下一个可用的套接字,因此它必须是线程安全的,我需要确保所有的读者线程都应该看到相同的SocketHolderSocket状态。

下面是我的SocketManager类:

 public class SocketManager { private static final Random random = new Random(); private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final Map<Datacenters, List> liveSocketsByDatacenter = new ConcurrentHashMap(); private final ZContext ctx = new ZContext(); // ... private SocketManager() { connectToZMQSockets(); scheduler.scheduleAtFixedRate(this::updateLiveSockets, 60, 60, TimeUnit.SECONDS); } // during startup, making a connection and populate once private void connectToZMQSockets() { Map<Datacenters, List> socketsByDatacenter = Utils.SERVERS; for (Map.Entry<Datacenters, List> entry : socketsByDatacenter.entrySet()) { List addedColoSockets = connect(entry.getValue(), ZMQ.PUSH); liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets); } } private List connect(List paddes, int socketType) { List socketList = new ArrayList(); // .... return socketList; } // this method will be called by multiple threads concurrently to get the next live socket // is there any concurrency or thread safety issue or race condition here? public Optional getNextSocket() { for (Datacenters dc : Datacenters.getOrderedDatacenters()) { Optional liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); if (liveSocket.isPresent()) { return liveSocket; } } return Optional.absent(); } private Optional getLiveSocket(final List listOfEndPoints) { if (!CollectionUtils.isEmpty(listOfEndPoints)) { // The list of live sockets List liveOnly = new ArrayList(listOfEndPoints.size()); for (SocketHolder obj : listOfEndPoints) { if (obj.isLive()) { liveOnly.add(obj); } } if (!liveOnly.isEmpty()) { // The list is not empty so we shuffle it an return the first element return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one } } return Optional.absent(); } // runs every 60 seconds to ping all the socket to make sure whether they are alive or not private void updateLiveSockets() { Map<Datacenters, List> socketsByDatacenter = Utils.SERVERS; for (Map.Entry<Datacenters, List> entry : socketsByDatacenter.entrySet()) { List liveSockets = liveSocketsByDatacenter.get(entry.getKey()); List liveUpdatedSockets = new ArrayList(); for (SocketHolder liveSocket : liveSockets) { Socket socket = liveSocket.getSocket(); String endpoint = liveSocket.getEndpoint(); Map holder = populateMap(); Message message = new Message(holder, Partition.COMMAND); // pinging to see whether a socket is live or not boolean status = SendToQueue.getInstance().send(message.getAddress(), message.getEncodedRecords(), socket); boolean isLive = (status) ? true : false; SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive); liveUpdatedSockets.add(zmq); } liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)); } } } 

这是我的SendToQueue类:

  // this method will be called by multiple threads concurrently to send the data public boolean sendAsync(final long address, final byte[] encodedRecords) { Optional liveSockets = SocketManager.getInstance().getNextSocket(); PendingMessage m = new PendingMessage(address, encodedRecords, liveSockets.get().getSocket(), true); cache.put(address, m); return doSendAsync(m, socket); } private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) { ZMsg msg = new ZMsg(); msg.add(pendingMessage.getEncodedRecords()); try { // send data on a socket LINE A return msg.send(socket); } finally { msg.destroy(); } } public boolean send(final long address, final byte[] encodedRecords, final Socket socket) { PendingMessage m = new PendingMessage(address, encodedRecords, socket, false); cache.put(address, m); try { if (doSendAsync(m, socket)) { return m.waitForAck(); } return false; } finally { // Alternatively (checks that address points to m): // cache.asMap().remove(address, m); cache.invalidate(address); } } 

问题陈述

现在你可以看到我在两个线程之间共享相同的套接字。 似乎getNextSocket()可以向thread A返回一个0MQ socket 。 同时, timer thread可以访问相同的0MQ socket来ping它。 在这种情况下, thread Atimer thread正在改变相同的0MQ socket ,这可能导致问题。 所以我试图找到一种方法,以便我可以防止不同的线程同时向同一个套接字发送数据并破坏我的数据。

所以我决定同步套接字,这样两个线程就不能同时访问同一个套接字。 以下是我在updateLiveSockets方法中所做的更改。 我通过以下方法在socket上同步:

  // runs every 60 seconds to ping all the socket to make sure whether they are alive or not private void updateLiveSockets() { Map<Datacenters, List> socketsByDatacenter = Utils.SERVERS; for (Map.Entry<Datacenters, List> entry : socketsByDatacenter.entrySet()) { List liveSockets = liveSocketsByDatacenter.get(entry.getKey()); List liveUpdatedSockets = new ArrayList(); for (SocketHolder liveSocket : liveSockets) { Socket socket = liveSocket.getSocket(); String endpoint = liveSocket.getEndpoint(); Map holder = populateMap(); Message message = new Message(holder, Partition.COMMAND); // using the socket as its own lock synchronized (socket) { // pinging to see whether a socket is live or not boolean status = SendToQueue.getInstance().execute(message.getAddress(), message.getEncodedRecords(), socket); boolean isLive = (status) ? true : false; SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive); liveUpdatedSockets.add(zmq); } } liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)); } } 

以下是我在doSendAsync方法中所做的更改。 在此我也在发送它之前在socket上同步。

  private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) { ZMsg msg = new ZMsg(); msg.add(pendingMessage.getEncodedRecords()); try { // send data on a socket LINE A by synchronizing on it synchronized (socket) { return msg.send(socket); } } finally { msg.destroy(); } } 

有什么方法可以确保我不在两个线程之间共享相同的套接字? 一般来说,我有大约60个套接字和20个线程访问这些套接字。

如果许multithreading使用相同的套接字,则资源利用率不高。 而且如果是msg.send(socket); 被阻止(技术上不应该)所有等待此套接字的线程都被阻止。 所以我想可能有更好的方法来确保每个线程同时使用不同的单个实时套接字而不是特定套接字上的同步。 还有我错过的任何角落案例或边缘案例都可能导致一些错误吗?

首先,您需要一种方法让客户通知您他们已经使用Socket完成了。 您可以添加一个允许它发出信号的方法。 这是合法的,它会起作用,但你必须依靠你的客户表现得很好。 或者更确切地说,使用套接字的程序员不会忘记返回它。 有一种模式可以帮助解决这个问题: 执行模式。 你创建了一个接受Consumer ,然后执行消费者,然后返回Socket本身,而不是发出一个Socket

 public void useSocket(Consumer socketUser) { Socket socket = getSocket(); try { socketUser.accept(socket); } finally { returnSocket(socket); } } 

现在让我们看看我们将如何实现getSocket()returnSocket() 。 显然,它涉及从某种集合中获取它们,并将它们返回到该集合。 Queue是一个很好的选择(正如其他人也注意到的)。 它允许从一侧获取它,并在另一侧返回,此外还有大量高效的线程安全实现,并且接收者和加法器通常不会相互争用。 既然你事先知道套接字的数量,我会选择一个ArrayBlockingQueue

此处另一个问题是您的实现返回Optional 。 如果没有可用的Socket ,我不确定你的客户会做什么,但是如果它正在等待并重试,我建议你只是让队列中的getSocket()阻塞。 实际上,我会尊重您的方法的这一方面,并​​考虑到可能没有可用的Socket 。 对于execute around方法,如果没有Socket可用,这将把它转换为useSocket()方法返回false

 private final BlockingQueue queue; public SocketPool(Set sockets) { queue = new ArrayBlockingQueue<>(sockets.size()); queue.addAll(sockets); } public boolean useSocket(Consumer socketUser) throws InterruptedException { Optional maybeSocket = getSocket(); try { maybeSocket.ifPresent(socketUser); return maybeSocket.isPresent(); } finally { maybeSocket.ifPresent(this::returnSocket); } } private void returnSocket(Socket socket) { queue.add(socket); } private Optional getSocket() throws InterruptedException { return Optional.ofNullable(queue.poll()); } 

那就是它,那就是你的SocketPool

啊,但接着是吝啬的一点:检查活着。 这很吝啬,因为你的活动检查实际上与你的常客有竞争。

为了解决这个问题,我建议如下:让您的客户报告他们获得的Socket是否存在。 由于检查实时性归结为使用Socket,这对您的客户来说应该是直截了当的。

因此,我们将使用Function而不是Consumer Function 。 如果函数返回false ,我们将认为Socket不再存在。 在这种情况下,我们不是将其添加回常规队列,而是将其添加到死套接字的集合中,我们将有一个计划任务,间歇性地重新检查死套接字。 由于这发生在单独的集合中,因此计划的检查不再与常规客户竞争。

现在,您可以使用将数据中心映射到SocketPool实例的Map创建SocketManager 。 此映射不需要更改,因此您可以将其设置为final并在SocketManager的构造函数中初始化它。

这是我对SocketPool初步代码(未经测试):

 class SocketPool implements AutoCloseable { private final BlockingQueue queue; private final Queue deadSockets = new ConcurrentLinkedQueue<>(); private final ScheduledFuture scheduledFuture; public SocketPool(Set sockets, ScheduledExecutorService scheduledExecutorService) { queue = new ArrayBlockingQueue<>(sockets.size()); queue.addAll(sockets); scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this::recheckDeadSockets, 60, 60, TimeUnit.SECONDS); } public boolean useSocket(Function socketUser) throws InterruptedException { Optional maybeSocket = getSocket(); boolean wasLive = true; try { wasLive = maybeSocket.map(socketUser).orElse(false); return wasLive && maybeSocket.isPresent(); } finally { boolean isLive = wasLive; maybeSocket.ifPresent(socket -> { if (isLive) { returnSocket(socket); } else { reportDead(socket); } }); } } private void reportDead(Socket socket) { deadSockets.add(socket); } private void returnSocket(Socket socket) { queue.add(socket); } private Optional getSocket() throws InterruptedException { return Optional.ofNullable(queue.poll()); } private void recheckDeadSockets() { for (int i = 0; i < deadSockets.size(); i++) { Socket socket = deadSockets.poll(); if (checkAlive(socket)) { queue.add(socket); } else { deadSockets.add(socket); } } } private boolean checkAlive(Socket socket) { // do actual live check with SendSocket class, or implement directly in this one return true; } @Override public void close() throws Exception { scheduledFuture.cancel(true); } } 

我会说这段代码有几个问题:

  1. getLiveSocket()可以为多个线程返回相同的套接字。
  2. java.util.Random不适用于多个线程。
  3. getNextSocket()中活动套接字的快照可能是陈旧的,因为并发调用updateLiveSockets()方法修改了该快照。
  4. 如果connectToZMQSockets()不检查套接字的活跃性,则由于scheduleAtFixedRate方法的延迟,60秒内没有活动套接字。

此外,没有用于检查套接字是否正在使用的标志,并且在线程完成其工作后,套接字是否返回池是不明确的。

考虑以下列方式简化代码:

  1. 你的类有彼此的循环引用,对我而言,它是一个信号,应该只有单个类。
  2. 我不认为定期检查所有套接字是否都处于活动状态是不正确的,因为它不能保证套接字状态在检查后和实际发送之前不会改变,更好的策略是validation特定套接字是否发送给它失败。
  3. 最好将套接字管理限制在线程安全数据结构中,而不是使用显式锁,例如在阻塞队列中。 这种策略可以很好地利用所有可用的套接字。

这是一个代码示例:

 public class SendToSocket { private final BlockingQueue queue; public SendToSocket() { this.queue = new LinkedBlockingQueue<>(); // collect all available sockets List sockets = new ArrayList<>(); for (Socket socket : sockets) { queue.add(socket); } } public boolean send(final byte[] reco) throws InterruptedException { // can be replaced with poll() method Socket socket = queue.take(); // handle exceptions if needed boolean status = sendInternal(socket, reco); if (!status) { // check whether socket is live boolean live = ping(socket); if (!live) { // log error return status; } } // return socket back to pool queue.add(socket); return status; } private boolean sendInternal(Socket socket, byte[] reco) { return true; } private boolean ping(Socket socket) { return true; } } 

正如我在您的另一个问题中解释的那样,您的问题的最佳解决方案是使用ConcurrentQueue

例如,这是如何删除不活动的套接字并保留活动的套接字。

  private final Map> liveSocketsByDatacenter = new ConcurrentHashMap<>(); 

//填满队列和地图

 // runs every 60 seconds to ping 70 sockets the socket to make sure whether they are alive or not (it does not matter if you ping more sockets than there are in the list because you are rotating the que) private void updateLiveSockets() { Map> socketsByDatacenter = Utils.SERVERS; for (Map.Entry> entry : socketsByDatacenter.entrySet()) { Queue liveSockets = liveSocketsByDatacenter.get(entry.getKey()); for (int i = 0; i<70; i++) { SocketHolder s = liveSockets.poll(); Socket socket = s.getSocket(); String endpoint = s.getEndpoint(); Map holder = populateMap(); Message message = new Message(holder, Partition.COMMAND); // pinging to see whether a socket is live or not boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket); boolean isLive = (status) ? true : false; SocketHolder zmq = new SocketHolder(socket, s.getContext(), endpoint, isLive); liveSockets.add(zmq); } } } 

您不需要锁定活动状态的刷新,因为更新布尔值是primefaces操作。 在将其从池中检出后,只需在后台线程中定期刷新活动。 您可能希望在Socket实例级别添加时间戳,以便在发送消息时添加,因此您可以再等待30秒来ping。 未使用的套接字需要比使用过的套接字更快地ping。

我想这个想法是synchronized块读取2个布尔值并设置一个布尔值,所以它应该立即返回。 您不希望在同步时发送,因为它将在很长时间内阻止其他线程。

我见过的任何类型的同步实际上只需要两个或三个primefaces操作的同步

 boolean got = false; synchronized(obj) { if(alive && available) { available = false; got = true; } } if(got) { ... // all thread safe because available must be false available = true; // atomic, no need to synchronize when done } 

通常,您可以通过小心进行primefaces更新的顺序来找出完全避免同步的方法。

例如,您可以通过使用映射而不是列表来存储套接字来完成没有同步的工作。 我不得不考虑它,但你可以使这个线程安全,绝对没有同步,然后它会快得多。

我永远不会考虑使用像Hashtable这样的线程安全的集合来代替HashMap。

 class Socket { Socket() { alive = true; available = true; last = System.currentTimeMillis(); } private synchronized boolean tryToGet() { // should return pretty fast - only 3 atomic operations if(alive && available) { available = false; return true; } return false; } public boolean send() { if(tryToGet()) { // do it last = System.currentTimeMillis(); available = true; // no need to lock atomic operation return true; } return false; } private boolean ping() { // ... } public void pingIfNecessary() { // long update may not be atomic, cast to int if not if(alive && (System.currentTimeMillis() - last) > 30000) { if(tryToGet()) { // other pingIfNecessary() calls have to wait if(ping()) { last = System.currentTimeMillis(); } else { alive = false; } available = true; } } } private boolean alive; private boolean available; private long last; }; void sendUsingPool(String s) { boolean sent = false; while(!sent) { for(Socket socket : sockets) { if(socket.send(s)) { sent = true; break; } } if(!sent) { // increase this number if you want to be nicer try { Thread.sleep(1); } catch (Exception e) { } } } } public void run() { while(true) { for(Socket socket : sockets) { socket.pingIfNecessary(); } try { Thread.sleep(100); } catch (Exception e) { } } }