扩展java的ThreadLocal以允许在所有线程中重置值

看了这个问题之后 ,我想我想要包装ThreadLocal来添加重置行为。

我想要一个类似于ThreadLocal的东西,我可以从任何线程调用一个方法将所有值设置回相同的值。 到目前为止我有这个:

public class ThreadLocalFlag { private ThreadLocal flag; private List allValues = new ArrayList(); public ThreadLocalFlag() { flag = new ThreadLocal() { @Override protected Boolean initialValue() { Boolean value = false; allValues.add(value); return value; } }; } public boolean get() { return flag.get(); } public void set(Boolean value) { flag.set(value); } public void setAll(Boolean value) { for (Boolean tlValue : allValues) { tlValue = value; } } } 

我担心原语的自动装箱可能意味着我存储在列表中的副本将不会引用ThreadLocal引用的相同变量(如果我尝试设置它们)。 我还没有测试过这段代码,并且在我继续沿着这条路走下去之前,我正在寻找一些专家建议。

有人会问“你为什么这样做?”。 我正在一个框架中工作,其他线程回调到我的代码中,我没有对它们的引用。 我想定期更新他们使用的ThreadLocal变量中的值,因此执行该更新需要使用该变量的线程进行更新。 我只需要一种方法来通知所有这些线程他们的ThreadLocal变量是陈旧的。


我很高兴最近对这个三年前的问题有了新的批评,尽管我觉得它的语气略低于专业。 我提供的解决方案在此期间没有发生任何事故。 然而,必须有更好的方法来实现提出这个问题的目标,我邀请评论家提供明显更好的答案。 为此,我将尝试更清楚地解决我试图解决的问题。

正如我之前提到的,我正在使用一个框架,其中多个线程正在使用我的代码,在我的控制之外。 该框架是QuickFIX / J,我正在实现Application接口 。 该接口定义了用于处理FIX消息的钩子,在我的使用中,框架被配置为multithreading,因此可以同时处理与应用程序的每个FIX连接。

但是,QuickFIX / J框架仅为所有线程使用该接口的单个​​实例实例。 我无法控制线程如何启动,并且每个都在为不同的连接提供不同的配置细节和其他状态。 很自然地让一些经常被访问但很少更新的状态存在于各种ThreadLocal ,一旦框架启动了线程,它们就会加载它们的初始值。

在组织的其他地方,我们有库代码,允许我们注册回调,以通知在运行时更改的配置详细信息。 我想注册该回调,当我收到它时,我想让所有线程知道是时候重新加载那些ThreadLocal的值,因为它们可能已经改变了。 该回调来自我无法控制的线程,就像QuickFIX / J线程一样。

我的解决方案使用ThreadLocalFlag (一个包装的ThreadLocal )仅用于通知其他线程可能是时候更新它们的值。 回调调用setAll(true) ,QuickFIX / J线程在开始更新时调用set(false) 。 我已经淡化了ArrayList的并发问题,因为添加列表的唯一时间是在启动期间,并且我的用例小于列表的默认大小。

我想可以用其他的线程通信技术完成同样的任务,但是对于它正在做的事情,这看起来更实用。 我欢迎其他解决方案。

跨线程与ThreadLocal中的对象进行交互

我会事先说这是一个坏主意。 ThreadLocal是一个特殊的类 , 如果使用正确 ,它可以提供速度和线程安全的好处。 尝试使用ThreadLocal跨线程进行通信会破坏首先使用该类的目的。

如果需要跨多个线程访问对象,则有为此目的设计的工具,特别是java.util.collect.concurrent的线程安全集合,例如ConcurrentHashMap ,您可以使用Thread对象作为键来复制ThreadLocal ,像这样:

 ConcurrentHashMap map = new ConcurrentHashMap<>(); // pass map to threads, let them do work, using Thread.currentThread() as the key // Update all known thread's flags for(AtomicBoolean b : map.values()) { b.set(true); } 

更清晰,更简洁,并避免使用ThreadLocal ,而这种方式根本不是为其设计的。

通知线程他们的数据是陈旧的

我只需要一种方法来通知所有这些线程他们的ThreadLocal变量是陈旧的。

如果您的目标只是通知其他线程某些内容已发生变化,则根本不需要ThreadLocal 。 只需使用一个AtomicBoolean并与所有任务共享,就像你的ThreadLocal 。 顾名思义,对AtomicBoolean更新是primefaces和可见的交叉线程。 更好的方法是使用一个真正的同步辅助工具,如CyclicBarrierPhaser ,但对于简单的用例,使用AtomicBoolean没有任何害处。

创建可更新的“ ThreadLocal

所有这些都说,如果你真的想要实现一个全局可更新的ThreadLocal你的实现就会被打破。 事实上,你没有遇到问题只是一个巧合,未来的重构可能会引入难以诊断的错误或崩溃。 它“ 无故障地工作 ”只意味着你的测试不完整。

  • 首先, ArrayList不是线程安全的。 当多个线程可能与它交互时,你根本无法使用它(没有外部同步),即使它们会在不同时间这样做。 你现在没有看到任何问题只是巧合。
  • 将对象存储为List可防止我们删除过时的值。 如果你调用ThreadLocal.set()它将附加到你的列表而不删除以前的值,这会引入内存泄漏和潜在的意外副作用,如果你预计这些对象一旦线程终止就无法访问,通常是ThreadLocal实例的情况。 您的用例巧合地避免了这个问题,但仍然没有必要使用List

以下是IterableThreadLocal的实现,它可以安全地存储和更新ThreadLocal值的所有现有实例,并适用于您选择使用的任何类型:

 import java.util.Iterator; import java.util.concurrent.ConcurrentMap; import com.google.common.collect.MapMaker; /** * Class extends ThreadLocal to enable user to iterate over all objects * held by the ThreadLocal instance. Note that this is inherently not * thread-safe, and violates both the contract of ThreadLocal and much * of the benefit of using a ThreadLocal object. This class incurs all * the overhead of a ConcurrentHashMap, perhaps you would prefer to * simply use a ConcurrentHashMap directly instead? * * If you do really want to use this class, be wary of its iterator. * While it is as threadsafe as ConcurrentHashMap's iterator, it cannot * guarantee that all existing objects in the ThreadLocal are available * to the iterator, and it cannot prevent you from doing dangerous * things with the returned values. If the returned values are not * properly thread-safe, you will introduce issues. */ public class IterableThreadLocal extends ThreadLocal implements Iterable { private final ConcurrentMap map; public IterableThreadLocal() { map = new MapMaker().weakKeys().makeMap(); } @Override public T get() { T val = super.get(); map.putIfAbsent(Thread.currentThread(), val); return val; } @Override public void set(T value) { map.put(Thread.currentThread(), value); super.set(value); } /** * Note that this method fundamentally violates the contract of * ThreadLocal, and exposes all objects to the calling thread. * Use with extreme caution, and preferably only when you know * no other threads will be modifying / using their ThreadLocal * references anymore. */ @Override public Iterator iterator() { return map.values().iterator(); } } 

正如你可以看到的那样,它只不过是一个ConcurrentHashMap的包装器,并且会产生与直接使用一个相同的开销,但是隐藏在ThreadLocal的实现中,用户通常希望它是快速且线程安全的。 我实现它是为了演示目的,但我真的不建议在任何设置中使用它。

这样做不是一个好主意,因为线程本地存储的整个点是它包含的值的线程局部性 – 即你可以确定除了你自己的线程之外没有其他线程可以触及该值。 如果其他线程可以触及您的线程本地值,它将不再是“线程本地”,这将破坏线程本地存储的内存模型契约。

您必须使用除ThreadLocal之外的其他内容(例如ConcurrentHashMap )来存储值,或者您需要找到一种方法来安排有关线程的更新。

您可以使用google guava的地图制作工具创建一个静态的最终ConcurrentWeakReferenceIdentityHashmap ,其类型如下: Map>其中第二个地图是ConcurrentHashMap 。 这样你就可以非常接近ThreadLocal除了你可以迭代地图。

我对这个问题的答案质量感到失望; 我找到了自己的解决方案。

我今天写了我的测试用例,发现我问题中代码的唯一问题是Boolean 。 布尔值是不可变的,所以我的引用列表对我没有任何帮助。 我查看了这个问题 ,并将我的代码更改为使用AtomicBoolean ,现在一切都按预期工作。

 public class ThreadLocalFlag { private ThreadLocal flag; private List allValues = new ArrayList(); public ThreadLocalFlag() { flag = new ThreadLocal() { @Override protected AtomicBoolean initialValue() { AtomicBoolean value = new AtomicBoolean(); allValues.add(value); return value; } }; } public boolean get() { return flag.get().get(); } public void set(boolean value) { flag.get().set(value); } public void setAll(boolean value) { for (AtomicBoolean tlValue : allValues) { tlValue.set(value); } } } 

测试用例:

 public class ThreadLocalFlagTest { private static ThreadLocalFlag flag = new ThreadLocalFlag(); private static boolean runThread = true; @AfterClass public static void tearDownOnce() throws Exception { runThread = false; flag = null; } /** * @throws Exception if there is any issue with the test */ @Test public void testSetAll() throws Exception { startThread("ThreadLocalFlagTest-1", false); try { Thread.sleep(1000L); } catch (InterruptedException e) { //ignore } startThread("ThreadLocalFlagTest-2", true); try { Thread.sleep(1000L); } catch (InterruptedException e) { //ignore } startThread("ThreadLocalFlagTest-3", false); try { Thread.sleep(1000L); } catch (InterruptedException e) { //ignore } startThread("ThreadLocalFlagTest-4", true); try { Thread.sleep(8000L); //watch the alternating values } catch (InterruptedException e) { //ignore } flag.setAll(true); try { Thread.sleep(8000L); //watch the true values } catch (InterruptedException e) { //ignore } flag.setAll(false); try { Thread.sleep(8000L); //watch the false values } catch (InterruptedException e) { //ignore } } private void startThread(String name, boolean value) { Thread t = new Thread(new RunnableCode(value)); t.setName(name); t.start(); } class RunnableCode implements Runnable { private boolean initialValue; RunnableCode(boolean value) { initialValue = value; } @Override public void run() { flag.set(initialValue); while (runThread) { System.out.println(Thread.currentThread().getName() + ": " + flag.get()); try { Thread.sleep(4000L); } catch (InterruptedException e) { //ignore } } } } }