java并发:很多作家,一个读者

我需要在我的软件中收集一些统计信息,我正在努力使其快速正确,这对我来说并不容易(对我而言!)

到目前为止,我的代码首先是两个类,一个StatsService和一个StatsHarvester

public class StatsService { private Map stats = new HashMap(1000); public void notify ( String key ) { Long value = 1l; synchronized (stats) { if (stats.containsKey(key)) { value = stats.get(key) + 1; } stats.put(key, value); } } public Map getStats ( ) { Map copy; synchronized (stats) { copy = new HashMap(stats); stats.clear(); } return copy; } } 

这是我的第二个类,一个不时收集统计数据并将它们写入数据库的收集器。

 public class StatsHarvester implements Runnable { private StatsService statsService; private Thread t; public void init ( ) { t = new Thread(this); t.start(); } public synchronized void run ( ) { while (true) { try { wait(5 * 60 * 1000); // 5 minutes collectAndSave(); } catch (InterruptedException e) { e.printStackTrace(); } } } private void collectAndSave ( ) { Map stats = statsService.getStats(); // do something like: // saveRecords(stats); } } 

在运行时,它将有大约30个并发运行的线程,每个线程调用notify(key)大约100次。 只有一个StatsHarvester正在调用statsService.getStats()

所以我有很多作家,只有一位读者。 拥有准确的统计数据会很好,但我不关心是否有一些记录在高并发性上丢失了。

读者应每5分钟或任何合理的时间运行。

写作应该尽可能快。 阅读应该很快,但如果它每5分钟锁定约300毫秒,那就好了。

我已经阅读了很多文档(实践中的Java并发,有效的Java等),但我有强烈的感觉,我需要你的建议才能做到正确。

我希望我说明我的问题清楚,足够短,以获得有价值的帮助。


编辑

感谢大家的详细和有用的答案。 正如我所料,有不止一种方法可以做到这一点。

我测试了大部分提案(我理解的那些)并将测试项目上传到谷歌代码以供进一步参考(maven项目)

http://code.google.com/p/javastats/

我测试过我的StatsService的不同实现

  • HashMapStatsService(HMSS)
  • ConcurrentHashMapStatsService(CHMSS)
  • LinkedQueueStatsService(LQSS)
  • GoogleStatsService(GSS)
  • ExecutorConcurrentHashMapStatsService(ECHMSS)
  • ExecutorHashMapStatsService(EHMSS)

我用x个线程测试它们,每次调用通知y次,结果以ms为单位

  10,100 10,1000 10,5000 50,100 50,1000 50,5000 100,100 100,1000 100,5000 GSS 1 5 17 7 21 117 7 37 254 Summe: 466 ECHMSS 1 6 21 5 32 132 8 54 249 Summe: 508 HMSS 1 8 45 8 52 233 11 103 449 Summe: 910 EHMSS 1 5 24 7 31 113 8 67 235 Summe: 491 CHMSS 1 2 9 3 11 40 7 26 72 Summe: 171 LQSS 0 3 11 3 16 56 6 27 144 Summe: 266 

此时我想我将使用ConcurrentHashMap,因为它提供了良好的性能,同时它很容易理解。

感谢您的输入! Janning

正如杰克所说,你可以使用包含ConcurrentHashMap和AtomicLong的java.util.concurrent库。 你可以把AtomicLong放进去,如果没有,你可以增加值。 由于AtomicLong是线程安全的,因此您可以在不担心并发问题的情况下增加变量。

 public void notify(String key) { AtomicLong value = stats.get(key); if (value == null) { value = stats.putIfAbsent(key, new AtomicLong(1)); } if (value != null) { value.incrementAndGet(); } } 

这应该是快速和线程安全的

编辑:重新调整,因此最多只有两次查找。

为什么不使用java.util.concurrent.ConcurrentHashMap ? 它在内部处理所有内容,避免地图上无用的锁定并为您节省大量工作:您不必关心get和put上的同步。

从文档:

一个哈希表,支持检索的完全并发和可更新的预期并发性。 该类遵循与Hashtable相同的function规范,并包括与Hashtable的每个方法相对应的方法版本。 但是,即使所有操作都是线程安全的,检索操作也不需要锁定 ,并且不支持以阻止所有访问的方式锁定整个表。

您可以指定其并发级别

更新操作之间允许的并发性由可选的concurrencyLevel构造函数参数(缺省值16)引导,该参数用作内部大小调整的提示 。 该表在内部进行分区,以尝试允许指定数量的并发更新而不会发生争用。 因为散列表中的放置基本上是随机的,所以实际的并发性会有所不同。 理想情况下,您应该选择一个值来容纳与同时修改表一样多的线程 。 使用比您需要的更高的值会浪费空间和时间,而显着更低的值可能导致线程争用。 但是,在一个数量级内过高估计和低估通常不会产生明显的影响。 当知道只有一个线程会修改而其他所有线程只能读取时,值为1是合适的。 此外,调整此哈希表或任何其他类型的哈希表是一个相对较慢的操作,因此,在可能的情况下,最好在构造函数中提供预期表大小的估计值。

正如评论中所建议的那样仔细阅读ConcurrentHashMap的文档,特别是当它说明primefaces操作或非primefaces操作时。

为了保证primefaces性,你应该考虑哪些操作是primefaces的,从ConcurrentMap接口你会知道:

 V putIfAbsent(K key, V value) V replace(K key, V value) boolean replace(K key,V oldValue, V newValue) boolean remove(Object key, Object value) 

可以安全使用。

我建议看看Java的util.concurrent库。 我认为你可以更清洁地实现这个解决方案。 我认为你根本不需要地图。 我建议使用ConcurrentLinkedQueue实现它。 每个“生产者”都可以自由地写入此队列而不必担心其他人。 它可以将一个对象放在队列中,并提供其统计数据。

收割机可以消耗队列,不断地拉出数据并处理它。 然后它可以存储它需要它。

Chris Dail的回答看起来很好。

另一种选择是使用并发Multiset 。 Google Collections库中有一个。 你可以使用如下:

 private Multiset stats = ConcurrentHashMultiset.create(); public void notify ( String key ) { stats.add(key, 1); } 

查看源代码 ,这是使用ConcurrentHashMap实现的,并使用putIfAbsentputIfAbsent的三参数版本来检测并发修改和重试。

解决问题的另一种方法是通过线程限制来利用(普通的)线程安全性。 基本上创建一个后台线程,负责读取和写入。 它在可伸缩性和简单性方面具有非常好的特性。

我们的想法是,不是所有线程都试图直接更新数据,而是为后台线程处理产生“更新”任务。 假设处理更新中的一些滞后是可以容忍的,那么相同的线程也可以执行读取任务。

这个设计非常好用,因为线程将不再需要竞争锁来更新数据,并且由于地图仅限于单个线程,因此您可以简单地使用普通的HashMap来执行get / put等。在实现方面,这意味着创建一个单线程执行程序,并提交写入任务,这些任务也可以执行可选的“collectAndSave”操作。

代码草图可能如下所示:

 public class StatsService { private ExecutorService executor = Executors.newSingleThreadExecutor(); private final Map stats = new HashMap(); public void notify(final String key) { Runnable r = new Runnable() { public void run() { Long value = stats.get(key); if (value == null) { value = 1L; } else { value++; } stats.put(key, value); // do the optional collectAndSave periodically if (timeToDoCollectAndSave()) { collectAndSave(); } } }; executor.execute(r); } } 

有一个与执行程序关联的BlockingQueue,为StatsService生成任务的每个线程都使用BlockingQueue。 关键点在于: 此操作的锁定持续时间应比原始代码中的锁定持续时间得多,因此争用应该少得多。 总的来说,它应该会带来更好的吞吐量和延迟。

另一个好处是,由于只有一个线程可以读取和写入映射,因此可以使用普通的HashMap和原始long类型(不涉及ConcurrentHashMap或primefaces类型)。 这也简化了实际处理它的代码。

希望能帮助到你。

你看过ScheduledThreadPoolExecutor吗? 您可以使用它来安排您的编写者,这些编写者都可以写入并发集合,例如@Chris Dail提到的ConcurrentLinkedQueue 。 您可以根据需要从Queue中单独调度作业,Java SDK应该处理几乎所有的并发问题,不需要手动锁定。

如果我们忽略了收获部分并专注于写作,该程序的主要瓶颈是统计数据被锁定在非常粗略的粒度级别。 如果两个线程想要更​​新不同的密钥,它们必须等待。

如果您事先知道了该组键,并且可以预先初始化该映射,以便在更新线程到达时保证密钥存在,您将能够锁定累加器变量而不是整个映射,或者使用一个线程安全的累加器对象。

不是自己实现,而是专门为并发而设计的地图实现,并为您进行更精细的锁定。

但有一点需要注意,因为您需要在大致相同的时间内锁定所有累加器。 如果使用现有的并发友好映射,则可能存在用于获取快照的构造。

使用ReentranReadWriteLock实现这两种方法的另一种方法。 如果需要清除计数器,此实现可以防止getStats方法中的竞争条件。 它还从getStats中删除了可变的AtomicLong,并使用了一个不可变的Long。

 public class StatsService { private final Map stats = new HashMap(1000); private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private final Lock r = rwl.readLock(); private final Lock w = rwl.writeLock(); public void notify(final String key) { r.lock(); AtomicLong count = stats.get(key); if (count == null) { r.unlock(); w.lock(); count = stats.get(key); if(count == null) { count = new AtomicLong(); stats.put(key, count); } r.lock(); w.unlock(); } count.incrementAndGet(); r.unlock(); } public Map getStats() { w.lock(); Map copy = new HashMap(); for(Entry entry : stats.entrySet() ){ copy.put(entry.getKey(), entry.getValue().longValue()); } stats.clear(); w.unlock(); return copy; } } 

我希望这有帮助,欢迎任何评论!

以下是如何在对被测线程的性能影响最小的情况下执行此操作。 这是Java中可能的最快解决方案,无需借助特殊的硬件寄存器来进行性能计数。

让每个线程独立于其他线程输出其统计信息,即没有同步,对某些统计信息对象。 使包含计数的字段变为volatile,因此它是内存屏蔽的:

 class Stats { public volatile long count; } class SomeRunnable implements Runnable { public void run() { doStuff(); stats.count++; } } 

拥有另一个拥有对所有Stats对象的引用的线程,定期绕过它们并在所有线程中累加计数:

 public long accumulateStats() { long count = previousCount; for (Stats stat : allStats) { count += stat.count; } long resultDelta = count - previousCount; previousCount = count; return resultDelta; } 

此收集线程还需要添加sleep()(或其他一些节流)。 例如,它可以定期向控制台输出计数/秒,以便为您提供应用程序执行情况的“实时”视图。

这可以尽可能地避免同步开销。

要考虑的另一个技巧是将Stats对象填充到128(或SandyBridge或更高版本上的256个字节),以便将不同的线程计数保持在不同的缓存行上,或者在CPU上存在缓存争用。

当只有一个线程读取而一个写入时,您不需要锁或primefaces,挥发性就足够了。 当统计读取器线程与正在测量的线程的CPU缓存行交互时,仍会存在一些线程争用。 这是无法避免的,但这是对运行线程影响最小的方法; 可能每秒读一次或更少的统计数据。