在单个后台线程定期修改它的同时读取Map

我有一个类,我在updateLiveSockets()方法中每隔30秒从一个后台线程填充一个地图liveSocketsByDatacenter然后我有一个方法getNextSocket() ,它将由多个读者线程调用以获得一个可用的实时套接字获取此信息的相同地图。

 public class SocketManager { private static final Random random = new Random(); private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final AtomicReference<Map<Datacenters, List>> liveSocketsByDatacenter = new AtomicReference(Collections.unmodifiableMap(new HashMap())); private final ZContext ctx = new ZContext(); // Lazy Loaded Singleton Pattern private static class Holder { private static final SocketManager instance = new SocketManager(); } public static SocketManager getInstance() { return Holder.instance; } private SocketManager() { connectToZMQSockets(); scheduler.scheduleAtFixedRate(new Runnable() { public void run() { updateLiveSockets(); } }, 30, 30, TimeUnit.SECONDS); } // during startup, making a connection and populate once private void connectToZMQSockets() { Map<Datacenters, ImmutableList> socketsByDatacenter = Utils.SERVERS; // The map in which I put all the live sockets Map<Datacenters, List> updatedLiveSocketsByDatacenter = new HashMap(); for (Map.Entry<Datacenters, ImmutableList> entry : socketsByDatacenter.entrySet()) { List addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH); updatedLiveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(addedColoSockets)); } // Update the map content this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(updatedLiveSocketsByDatacenter)); } private List connect(Datacenters colo, List addresses, int socketType) { List socketList = new ArrayList(); for (String address : addresses) { try { Socket client = ctx.createSocket(socketType); // Set random identity to make tracing easier String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt()); client.setIdentity(identity.getBytes(ZMQ.CHARSET)); client.setTCPKeepAlive(1); client.setSendTimeOut(7); client.setLinger(0); client.connect(address); SocketHolder zmq = new SocketHolder(client, ctx, address, true); socketList.add(zmq); } catch (Exception ex) { // log error } } return socketList; } // this method will be called by multiple threads to get the next live socket // is there any concurrency or thread safety issue or race condition here? public Optional getNextSocket() { // For the sake of consistency make sure to use the same map instance // in the whole implementation of my method by getting my entries // from the local variable instead of the member variable Map<Datacenters, List> liveSocketsByDatacenter = this.liveSocketsByDatacenter.get(); Optional liveSocket = Optional.absent(); List dcs = Datacenters.getOrderedDatacenters(); for (Datacenters dc : dcs) { liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); if (liveSocket.isPresent()) { break; } } return liveSocket; } // is there any concurrency or thread safety issue or race condition here? private Optional getLiveSocketX(final List endpoints) { if (!CollectionUtils.isEmpty(endpoints)) { // The list of live sockets List liveOnly = new ArrayList(endpoints.size()); for (SocketHolder obj : endpoints) { if (obj.isLive()) { liveOnly.add(obj); } } if (!liveOnly.isEmpty()) { // The list is not empty so we shuffle it an return the first element Collections.shuffle(liveOnly); return Optional.of(liveOnly.get(0)); } } return Optional.absent(); } // Added the modifier synchronized to prevent concurrent modification // it is needed because to build the new map we first need to get the // old one so both must be done atomically to prevent concistency issues private synchronized void updateLiveSockets() { Map<Datacenters, ImmutableList> socketsByDatacenter = Utils.SERVERS; // Initialize my new map with the current map content Map<Datacenters, List> liveSocketsByDatacenter = new HashMap(this.liveSocketsByDatacenter.get()); for (Entry<Datacenters, ImmutableList> entry : socketsByDatacenter.entrySet()) { List liveSockets = liveSocketsByDatacenter.get(entry.getKey()); List liveUpdatedSockets = new ArrayList(); for (SocketHolder liveSocket : liveSockets) { // LINE A Socket socket = liveSocket.getSocket(); String endpoint = liveSocket.getEndpoint(); Map holder = populateMap(); Message message = new Message(holder, Partition.COMMAND); boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket); boolean isLive = (status) ? true : false; // is there any problem the way I am using `SocketHolder` class? SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive); liveUpdatedSockets.add(zmq); } liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)); } this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSocketsByDatacenter)); } } 

正如你在我class上看到的那样:

  • 从每30秒运行一个后台线程,我使用updateLiveSockets()方法中的所有实时套接字填充liveSocketsByDatacenter映射。
  • 然后从多个线程,我调用getNextSocket()方法给我一个可用的实时套接字,它使用liveSocketsByDatacenter映射来获取所需的信息。

我的代码工作正常,没有任何问题,并希望看看是否有更好或更有效的方法来编写它。 我还希望得到关于线程安全问题或任何竞争条件的意见,如果有的话,但到目前为止我还没有看到任何,但我可能是错的。

我主要担心的是updateLiveSockets()方法和getLiveSocketX()方法。 我正在迭代liveSockets ,这是一个在LINE A的SocketHolder List ,然后创建一个新的SocketHolder对象并添加到另一个新列表。 这可以吗?

注意: SocketHolder是一个不可变类。 你可以忽略ZeroMQ东西。

您使用以下同步技术。

  1. 带有实时套接字数据的映射位于primefaces引用之后,这样可以安全地切换映射。
  2. updateLiveSockets()方法是同步的(隐式地),这将防止同时通过两个线程切换映射。
  3. 如果在getNextSocket()方法期间发生切换,则在使用时对地图进行本地引用以避免混淆。

它现在是线程安全吗?

线程安全始终取决于共享可变数据是否存在正确的同步。 在这种情况下,共享可变数据是数据中心到其SocketHolders列表的映射。

地图位于AtomicReference中并制作本地副本以供使用的事实是地图上的足够同步。 您的方法采用地图版本并使用它,由于AtomicReference的性质,切换版本是线程安全的。 这也可以通过使map的成员字段为volatile ,因为您所做的只是更新引用(您不对其执行任何check-then-act操作)。

由于scheduleAtFixedRate()保证传递的Runnable不会与它自己同时运行,因此不需要在updateLiveSockets()synchronized ,但它也不会造成任何真正的伤害。

所以是的,这个类是线程安全的,因为它是。

但是,并不完全清楚SocketHolder可以同时被多个线程使用。 实际上,这个类只是尝试通过选择一个随机的实时来最小化SocketHolder的并发使用(不需要随机抽取整个数组来选择一个随机索引)。 它实际上没有阻止并发使用。

可以提高效率吗?

我相信它可以。 在查看updateLiveSockets()方法时,似乎它构建了完全相同的映射,除了SocketHolder可能具有不同的isLive标志值。 这使我得出结论,我只想切换地图中的每个列表,而不是切换整个地图。 并且为了以线程安全的方式更改映射中的条目,我可以使用ConcurrentHashMap

如果我使用ConcurrentHashMap ,并且不切换地图,而是切换地图中的值,我可以摆脱AtomicReference

要更改映射,我可以构建新列表并将其直接放入映射中。 这更有效率,因为我更快地发布数据,并且我创建了更少的对象,而我的同步只是建立在现成的组件上,这有利于可读性。

这是我的构建(为简洁起见,省略了一些不太相关的部分)

 public class SocketManager { private static final Random random = new Random(); private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final Map> liveSocketsByDatacenter = new ConcurrentHashMap<>(); // use ConcurrentHashMap private final ZContext ctx = new ZContext(); // ... private SocketManager() { connectToZMQSockets(); scheduler.scheduleAtFixedRate(this::updateLiveSockets, 30, 30, TimeUnit.SECONDS); } // during startup, making a connection and populate once private void connectToZMQSockets() { Map> socketsByDatacenter = Utils.SERVERS; for (Map.Entry> entry : socketsByDatacenter.entrySet()) { List addedColoSockets = connect(entry.getValue(), ZMQ.PUSH); liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets); // we can put it straight into the map } } // ... // this method will be called by multiple threads to get the next live socket // is there any concurrency or thread safety issue or race condition here? public Optional getNextSocket() { for (Datacenters dc : Datacenters.getOrderedDatacenters()) { Optional liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); // no more need for a local copy, ConcurrentHashMap, makes sure I get the latest mapped List if (liveSocket.isPresent()) { return liveSocket; } } return Optional.absent(); } // is there any concurrency or thread safety issue or race condition here? private Optional getLiveSocket(final List listOfEndPoints) { if (!CollectionUtils.isEmpty(listOfEndPoints)) { // The list of live sockets List liveOnly = new ArrayList<>(listOfEndPoints.size()); for (SocketHolder obj : listOfEndPoints) { if (obj.isLive()) { liveOnly.add(obj); } } if (!liveOnly.isEmpty()) { // The list is not empty so we shuffle it an return the first element return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one } } return Optional.absent(); } // no need to make this synchronized private void updateLiveSockets() { Map> socketsByDatacenter = Utils.SERVERS; for (Map.Entry> entry : socketsByDatacenter.entrySet()) { List liveSockets = liveSocketsByDatacenter.get(entry.getKey()); List liveUpdatedSockets = new ArrayList<>(); for (SocketHolder liveSocket : liveSockets) { // LINE A Socket socket = liveSocket.getSocket(); String endpoint = liveSocket.getEndpoint(); Map holder = populateMap(); Message message = new Message(holder, Partition.COMMAND); boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket); boolean isLive = (status) ? true : false; SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive); liveUpdatedSockets.add(zmq); } liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)); // just put it straigth into the map, the mapping will be updated in a thread safe manner. } } } 

如果SocketHolderDatacenters,是不可变的,那么你的程序看起来很好。 不过,这里有一些小的反馈。

1. AtomicReference的用法

AtomicReference>> liveSocketsByDatacenter

此成员变量不需要包含在AtomicReference中。 你没有用它进行任何primefacesCAS操作。 您可以简单地声明一个volative Map> ,并在阅读它时,只需创建一个本地引用即可。 这足以保证对新Map的引用的primefaces交换。

2.同步方法

private synchronized void updateLiveSockets()

从单个线程执行器调用此方法,因此不需要同步它。

3.一些简化

  • 根据您当前对此类的使用情况,您似乎可以过滤掉updateLiveSockets中不存在的套接字,避免每次客户端调用getNextSocket时进行过滤

  • 您可以通过Set datacenters = Utils.SERVERS.keySet()替换Map> socketsByDatacenter = Utils.SERVERS并使用这些键。

    4. Java 8

如果可能的话,切换到Java 8. Streams和Java8的Optional会删除大量的样板代码并使代码更容易阅读。