将ThreadLocal传播到从ExecutorService获取的新线程

我使用ExecutorService和Future( 这里是示例代码)在一个具有超时的单独线程中运行进程(线程“生成”发生在AOP方面)。

现在,主线程是Resteasy请求。 Resteasy使用一个或多个ThreadLocal变量来存储我需要在Rest方法调用中的某个时刻检索的一些上下文信息。 问题是,由于Resteasy线程在新线程中运行,因此ThreadLocal变量将丢失。

将Resteasy使用的任何ThreadLocal变量“传播”到新线程的最佳方法是什么? 似乎Resteasy使用多个ThreadLocal变量来跟踪上下文信息,我想“盲目地”将所有信息传递给新线程。

我已经查看了子类化ThreadPoolExecutor并使用beforeExecute方法将当前线程传递给池,但我找不到将ThreadLocal变量传递给池的方法。

有什么建议吗?

谢谢

与线程关联的ThreadLocal实例集保存在每个Thread私有成员中。 你唯一枚举这些的机会就是对Thread做一些反思; 这样,您可以覆盖线程字段的访问限制。

一旦你可以获得ThreadLocal的集合,你就可以使用ThreadPoolExecutorbeforeExecute()afterExecute()钩子在后台线程中复制,或者为你的任务创建一个Runnable包装器来拦截run()调用以设置unset必要的ThreadLocal实例。 实际上,后一种技术可能会更好,因为它可以为您提供一个方便的位置来存储任务排队时的ThreadLocal值。


更新:这是第二种方法的更具体的说明。 与我原来的描述相反,存储在包装器中的所有内容都是调用线程,在执行任务时会对其进行查询。

 static Runnable wrap(Runnable task) { Thread caller = Thread.currentThread(); return () -> { Iterable> vars = copy(caller); try { task.run(); } finally { for (ThreadLocal var : vars) var.remove(); } }; } /** * For each {@code ThreadLocal} in the specified thread, copy the thread's * value to the current thread. * * @param caller the calling thread * @return all of the {@code ThreadLocal} instances that are set on current thread */ private static Collection> copy(Thread caller) { /* Use a nasty bunch of reflection to do this. */ throw new UnsupportedOperationException(); } 

正如我理解你的问题,你可以看一下InheritableThreadLocal ,它意味着将ThreadLocal变量从Parent Thread上下文传递给Child Thread Context。

根据@erickson的回答我写了这段代码。 它适用于inheritableThreadLocals。 它使用与Thread构造函数中使用的方法相同的方法构建inheritableThreadLocals的列表。 当然我用reflection来做到这一点。 我也覆盖了执行者类。

 public class MyThreadPoolExecutor extends ThreadPoolExecutor { @Override public void execute(Runnable command) { super.execute(new Wrapped(command, Thread.currentThread())); } } 

包装:

  private class Wrapped implements Runnable { private final Runnable task; private final Thread caller; public Wrapped(Runnable task, Thread caller) { this.task = task; this.caller = caller; } public void run() { Iterable> vars = null; try { vars = copy(caller); } catch (Exception e) { throw new RuntimeException("error when coping Threads", e); } try { task.run(); } finally { for (ThreadLocal var : vars) var.remove(); } } } 

复制方法:

 public static Iterable> copy(Thread caller) throws Exception { List> threadLocals = new ArrayList<>(); Field field = Thread.class.getDeclaredField("inheritableThreadLocals"); field.setAccessible(true); Object map = field.get(caller); Field table = Class.forName("java.lang.ThreadLocal$ThreadLocalMap").getDeclaredField("table"); table.setAccessible(true); Method method = ThreadLocal.class .getDeclaredMethod("createInheritedMap", Class.forName("java.lang.ThreadLocal$ThreadLocalMap")); method.setAccessible(true); Object o = method.invoke(null, map); Field field2 = Thread.class.getDeclaredField("inheritableThreadLocals"); field2.setAccessible(true); field2.set(Thread.currentThread(), o); Object tbl = table.get(o); int length = Array.getLength(tbl); for (int i = 0; i < length; i++) { Object entry = Array.get(tbl, i); Object value = null; if (entry != null) { Method referentField = Class.forName("java.lang.ThreadLocal$ThreadLocalMap$Entry").getMethod( "get"); referentField.setAccessible(true); value = referentField.invoke(entry); threadLocals.add((ThreadLocal) value); } } return threadLocals; } 

下面是一个示例,将父线程中的当前LocaleContext传递给CompletableFuture跨越的子线程[默认使用ForkJoinPool]。

只需在Runnable块中的子线程中定义您想要做的所有事情。 因此,当CompletableFuture执行Runnable块时,它的子线程处于控制状态,并且你在Child的ThreadLocal中设置了父亲的ThreadLocal东西。

这里的问题不是整个ThreadLocal被复制过来。 仅复制LocaleContext。 由于ThreadLocal只能私有访问它所属的Thread,因此使用Reflection并尝试在Child中获取和设置是太多古怪的东西,可能导致内存泄漏或性能损失。

因此,如果您从ThreadLocal中了解了您感兴趣的参数,那么此解决方案的工作方式更加清晰。

  public void parentClassMethod(Request request) { LocaleContext currentLocale = LocaleContextHolder.getLocaleContext(); executeInChildThread(() -> { LocaleContextHolder.setLocaleContext(currentLocale); //Do whatever else you wanna do })); //Continue stuff you want to do with parent thread } private void executeInChildThread(Runnable runnable) { try { CompletableFuture.runAsync(runnable) .get(); } catch (Exception e) { LOGGER.error("something is wrong"); } } 

我不喜欢reflection方法。 替代解决方案是实现执行器包装器并将对象直接作为ThreadLocal上下文传递给传播父上下文的所有子线程。

 public class PropagatedObject { private ThreadLocal> data = new ThreadLocal<>(); //put, set, merge methods, etc } 

==>

 public class ObjectAwareExecutor extends AbstractExecutorService { private final ExecutorService delegate; private final PropagatedObject objectAbsorber; public ObjectAwareExecutor(ExecutorService delegate, PropagatedObject objectAbsorber){ this.delegate = delegate; this.objectAbsorber = objectAbsorber; } @Override public void execute(final Runnable command) { final ConcurrentHashMap parentContext = objectAbsorber.get(); delegate.execute(() -> { try{ objectAbsorber.set(parentContext); command.run(); }finally { parentContext.putAll(objectAbsorber.get()); objectAbsorber.clean(); } }); objectAbsorber.merge(parentContext); } 

如果查看ThreadLocal代码,您可以看到:

  public T get() { Thread t = Thread.currentThread(); ... } 

当前线程无法覆盖。

可能的解决方案:

  1. 看看java 7 fork / join机制(但我认为这是一个糟糕的方式)

  2. 查看背书机制来覆盖JVM中的ThreadLocal类。

  3. 尝试重写RESTEasy(您可以在IDE中使用Refactor工具替换所有ThreadLocal用法,它看起来很简单)