不要同时在两个线程之间共享相同的套接字

我有大约60个套接字和20个线程,我想确保每个线程每次都在不同的套接字上工作,所以我不想在两个线程之间共享相同的套接字。

在我的SocketManager类中,我有一个后台线程,每60秒运行一次并调用updateLiveSockets()方法。 在updateLiveSockets()方法中,我迭代我拥有的所有套接字,然后通过调用SendToQueue类的send方法SendToQueue ping它们,并根据响应我将它们标记为活动或死亡。 在updateLiveSockets()方法中,我总是需要迭代所有套接字并ping它们以检查它们是活的还是死的。

现在,所有读者线程将同时调用SocketManager类的getNextSocket()方法,以获得下一个可用的套接字以在该套接字上发送业务消息。 所以我有两种类型的消息,我在套接字上发送:

  • 一个是套接字上的ping消息。 这只是从调用SocketManager类中的updateLiveSockets()方法的计时器线程发送的。
  • 其他是套接字上的business消息。 这是在SendToQueue类中完成的。

因此,如果pinger线程正在ping一个套接字以检查它们是否存在,那么没有其他业务线程应该使用该套接字。 类似地,如果业务线程使用套接字在其上发送数据,那么pinger线程不应该ping该套接字。 这适用于所有套接字。 但我需要确保在updateLiveSockets方法中,每当我的后台线程启动时,我们都会ping所有可用的套接字,以便我们可以确定哪个套接字是活的还是死的。

下面是我的SocketManager类:

 public class SocketManager { private static final Random random = new Random(); private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final Map<Datacenters, List> liveSocketsByDatacenter = new ConcurrentHashMap(); private final ZContext ctx = new ZContext(); // ... private SocketManager() { connectToZMQSockets(); scheduler.scheduleAtFixedRate(new Runnable() { public void run() { updateLiveSockets(); } }, 60, 60, TimeUnit.SECONDS); } // during startup, making a connection and populate once private void connectToZMQSockets() { Map<Datacenters, List> socketsByDatacenter = Utils.SERVERS; for (Map.Entry<Datacenters, List> entry : socketsByDatacenter.entrySet()) { List addedColoSockets = connect(entry.getValue(), ZMQ.PUSH); liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets); } } private List connect(List paddes, int socketType) { List socketList = new ArrayList(); // .... return socketList; } // this method will be called by multiple threads concurrently to get the next live socket // is there any concurrency or thread safety issue or race condition here? public Optional getNextSocket() { for (Datacenters dc : Datacenters.getOrderedDatacenters()) { Optional liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); if (liveSocket.isPresent()) { return liveSocket; } } return Optional.absent(); } private Optional getLiveSocket(final List listOfEndPoints) { if (!listOfEndPoints.isEmpty()) { // The list of live sockets List liveOnly = new ArrayList(listOfEndPoints.size()); for (SocketHolder obj : listOfEndPoints) { if (obj.isLive()) { liveOnly.add(obj); } } if (!liveOnly.isEmpty()) { // The list is not empty so we shuffle it an return the first element return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one } } return Optional.absent(); } // runs every 60 seconds to ping all the available socket to make sure whether they are alive or not private void updateLiveSockets() { Map<Datacenters, List> socketsByDatacenter = Utils.SERVERS; for (Map.Entry<Datacenters, List> entry : socketsByDatacenter.entrySet()) { List liveSockets = liveSocketsByDatacenter.get(entry.getKey()); List liveUpdatedSockets = new ArrayList(); for (SocketHolder liveSocket : liveSockets) { Socket socket = liveSocket.getSocket(); String endpoint = liveSocket.getEndpoint(); Map holder = populateMap(); Message message = new Message(holder, Partition.COMMAND); // pinging to see whether a socket is live or not boolean isLive = SendToQueue.getInstance().send(message.getAddress(), message.getEncodedRecords(), socket); SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive); liveUpdatedSockets.add(zmq); } liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)); } } } 

这是我的SendToQueue类:

  // this method will be called by multiple reader threads (around 20) concurrently to send the data public boolean sendAsync(final long address, final byte[] encodedRecords) { PendingMessage m = new PendingMessage(address, encodedRecords, true); cache.put(address, m); return doSendAsync(m); } private boolean doSendAsync(final PendingMessage pendingMessage) { Optional liveSocket = SocketManager.getInstance().getNextSocket(); if (!liveSocket.isPresent()) { // log error return false; } ZMsg msg = new ZMsg(); msg.add(pendingMessage.getEncodedRecords()); try { // send data on a socket LINE A return msg.send(liveSocket.get().getSocket()); } finally { msg.destroy(); } } public boolean send(final long address, final byte[] encodedRecords, final Socket socket) { PendingMessage m = new PendingMessage(address, encodedRecords, socket, false); cache.put(address, m); try { if (doSendAsync(m, socket)) { return m.waitForAck(); } return false; } finally { cache.invalidate(address); } } 

问题陈述

现在您可以看到我在两个线程之间共享相同的套接字。 似乎SocketManager类中的getNextSocket()可以向Thread A返回一个0MQ socket 。 同时, timer thread可以访问相同的0MQ socket来ping它。 在这种情况下, Thread Atimer thread正在改变相同的0MQ socket ,这可能导致问题。 所以我试图找到一种方法,以便我可以防止不同的线程同时向同一个套接字发送数据并破坏我的数据。

我能想到的一个解决方案是在发送数据时在套接字上使用synchronization ,但如果许multithreading使用相同的套接字,则资源利用率不高。 而且如果是msg.send(socket); 被阻止(技术上不应该)所有等待此套接字的线程都被阻止。 所以我想可能有更好的方法来确保每个线程同时使用不同的单个实时套接字而不是特定套接字上的同步。

所以我试图找到一种方法,以便我可以防止不同的线程同时向同一个套接字发送数据并破坏我的数据。

肯定有很多不同的方法可以做到这一点。 对我来说,这似乎是一个正确的使用BlockingQueue 。 业务线程将从队列中获取一个套接字,并保证没有其他人使用该套接字。

 private final BlockingQueue socketHolderQueue = new LinkedBlockingQueue<>(); ... public Optional getNextSocket() { SocketHolder holder = socketHolderQueue.poll(); return holder; } ... public void finishedWithSocket(SocketHolder holder) { socketHolderQueue.put(holder); } 

我认为在你提到的原因上同步套接字不是一个好主意 – ping线程将阻塞业务线程。

有许多方法可以处理ping线程逻辑。 我会在最后一次使用时存储你的Socket ,然后你的ping线程可以经常从同一个BlockingQueue取出每个套接字,测试它,并在测试后将每个套接字放回到队列的末尾。

 public void testSockets() { // one run this for as many sockets as are in the queue int numTests = socketHolderQueue.size(); for (int i = 0; i < numTests; i++) { SocketHolder holder = socketHolderQueue.poll(); if (holder == null) { break; } if (socketIsOk(socketHolder)) { socketHolderQueue.put(socketHolder); } else { // close it here or something } } } 

您还可以使用getNextSocket()代码使队列中的线程出列,然后检查计时器并将它们放在测试队列上以供ping线程使用,然后从队列中取出下一个。 业务线程永远不会与ping线程同时使用相同的套接字。

根据您要测试套接字的时间,您还可以在业务线程将其返回到队列后重置计时器,以便ping线程在X秒无用后测试套接字。

看起来你应该考虑在这里使用try-with-resourcefunction。 您有SocketHolder或Option类实现AutoCloseable接口。 例如,让我们假设Option实现了这个接口。 然后,Option close方法将实例添加回容器。 我创建了一个简单的例子来说明我的意思。 它并不完整,但它可以让您了解如何在代码中实现它。

  public class ObjectManager implements AutoCloseable { private static class ObjectManagerFactory { private static ObjectManager objMgr = new ObjectManager(); } private ObjectManager() {} public static ObjectManager getInstance() { return ObjectManagerFactory.objMgr; } private static final int SIZE = 10; private static BlockingQueue objects = new LinkedBlockingQueue(); private static ScheduledExecutorService sch; static { for(int cnt = 0 ; cnt < SIZE ; cnt++) { objects.add(new AutoCloseable() { @Override public void close() throws Exception { System.out.println(Thread.currentThread() + " - Adding object back to pool:" + this + " size: " + objects.size()); objects.put(this); System.out.println(Thread.currentThread() + " - Added object back to pool:" + this); } }); } sch = Executors.newSingleThreadScheduledExecutor(); sch.scheduleAtFixedRate(new Runnable() { @Override public void run() { // TODO Auto-generated method stub updateObjects(); } }, 10, 10, TimeUnit.MICROSECONDS); } static void updateObjects() { for(int cnt = 0 ; ! objects.isEmpty() && cnt < SIZE ; cnt++ ) { try(AutoCloseable object = objects.take()) { System.out.println(Thread.currentThread() + " - updateObjects - updated object: " + object + " size: " + objects.size()); } catch (Throwable t) { // TODO Auto-generated catch block t.printStackTrace(); } } } public AutoCloseable getNext() { try { return objects.take(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); return null; } } public static void main(String[] args) { try (ObjectManager mgr = ObjectManager.getInstance()) { for (int cnt = 0; cnt < 5; cnt++) { try (AutoCloseable o = mgr.getNext()) { System.out.println(Thread.currentThread() + " - Working with " + o); Thread.sleep(1000); } catch (Throwable t) { t.printStackTrace(); } } } catch (Throwable tt) { tt.printStackTrace(); } } @Override public void close() throws Exception { // TODO Auto-generated method stub ObjectManager.sch.shutdownNow(); } } 

我会在这里提出一些观点。 在getNextSocket方法中, getOrderedDatacenters方法将始终返回相同的有序列表,因此您始终从头到尾从相同的数据中心中选择(这不是问题)。

你如何保证两个线程不会从getNextSocket获得相同的liveSocket

你在这里说的是真的:

同时,计时器线程可以访问相同的0MQ套接字来ping它。

我认为这里的主要问题是你不区分免费套接字和保留套接字。

您所说的一个选项是在每个套接字上同步。 另一个选择是保留一个保留套接字列表,当你想要获得下一个套接字或更新套接字时,只能从免费套接字中选择。 您不想更新已保留的套接字。

如果符合您的需求,您也可以在这里查看。

操作系统软件工程中有一个概念称为关键部分 。 当两个或多个进程共享数据并且它们被并发执行时,会出现一个关键部分,在这种情况下,如果有另一个进程访问这些数据,则任何进程都不应修改甚至读取此共享数据。 因此,当一个进程进入临界区时,它应该通知所有其他并发执行的进程它当前正在修改临界区,因此所有其他进程应该被阻塞 – 等待 – 进入这个关键部分。 你会问谁组织了什么进程,这是另一个叫做进程调度的问题,它控制进入这个关键部分的进程和操作系统为你做的事情。

所以你最好的解决方案是使用一个信号量,其中信号量的套接字数量 ,在你的情况下,我认为你有一个套接字,所以你将使用一个信号量 – 二进制信号量 – 用信号量值= 1初始化,那么你的代码应分为四个主要部分: 临界区条目临界区临界区退出

  • 关键部分条目:流程进入关键部分并阻止所有其他流程。 信号量将允许一个Process-Thread-进入临界区 – 使用套接字 – 信号量的值将递减 – 等于零。
  • 关键部分:流程应该执行的关键部分代码。
  • 关键部分退出:释放关键部分以进入另一个流程的流程。 信号量值将增加 – 等于1 – 允许另一个进程进入
  • 剩余部分:除了之前的3个部分之外的所有代码的其余部分。

现在您只需要打开任何有关信号量的Java教程来了解如何在Java中应用信号量,这非常简单。

Mouhammed Elshaaer是对的,但另外你也可以使用任何并发集合,例如ConcurrentHashMap,你可以在其中跟踪每个线程在不同的套接字上工作(例如ConcurrentHashMap key:socket hash code,value:thread hash code或smth else)。 至于我这是一个有点愚蠢的解决方案,但它可以用来。

对于访问同一个套接字的线程(线程A和定时器线程)的问题,我会为每个数据中心保留2个套接字列表:

  • 列表A:未使用的套接字
  • list B:正在使用的套接字

  • 调用synchronisation getNextSocket()从列表A中找到一个未使用的套接字,将其从列表A中删除并将其添加到列表B中;
  • 在收到已发送消息(业务或ping)的响应/ ACK时调用synchronisation returnSocket(Socket),将套接字从列表B移回列表A. 在“发送消息”周围放置一个try {} finally {}块即使存在exception,也要确保套接字将被放回列表A.

我有一个简单的解决方案可能会帮助你。 我不知道在Java中是否可以为每个socket添加自定义属性。 在Socket.io中你可以。 所以我想考虑一下(如果没有,我会删除这个答案)

您将向每个套接字添加一个名为locked布尔属性。 因此,当您的线程检查第一个套接字时, locked属性将为True 。 ping THIS套接字时,任何其他线程都会检查locked属性是否为False 。 如果没有,请使用getNextSocket

所以,在这下面的……

 ... for (SocketHolder liveSocket : liveSockets) { Socket socket = liveSocket.getSocket(); ... 

您将检查套接字是否已锁定。 如果 ,请终止此线程或中断此线程或转到下一个套接字。 (我不知道你是怎么称它为Java)。

所以这个过程是……

  • 线程获得一个解锁的套接字
  • 线程将此socket.locked设置为True
  • 线程ping socket并做你想要的任何东西
  • 线程将此socket.locked设置为False
  • 线程转到下一个。

对不起我的英文不好:)