2011-08-31 129 views
19

我正在使用ExecutorService和Future运行具有超时的单独线程中的进程(示例代码here)(线程“产卵”发生在AOP方面)。将ThreadLocal传播到从ExecutorService获取的新线程

现在,主线程是Resteasy请求。 Resteasy使用一个或多个ThreadLocal变量来存储一些上下文信息,我需要在Rest方法调用中的某个位置检索这些上下文信息。问题是,由于Resteasy线程正在新线程中运行,因此ThreadLocal变量将失效。

什么是Resteasy使用ThreadLocal变量传播新线程的最佳方式? Resteasy似乎使用多个ThreadLocal变量来跟踪上下文信息,并且我想“盲目地”将所有信息传输到新线程。

我看过子类ThreadPoolExecutor并使用beforeExecute方法将当前线程传递给池,但我找不到将ThreadLocal变量传递给池的方法。

有什么建议吗?

感谢

+0

你可以重写第二段略有?它让我感到困惑。另外,beforeExecute有什么问题?你不能让它正常工作,或者你意识到它不适合你的需要? – toto2

回答

13

集合与线程相关联ThreadLocal实例都在私有成员举行每个Thread。列举这些的唯一机会是对Thread做一些思考;这样,您可以覆盖线程字段上的访问限制。

一旦你可以得到一套ThreadLocal,你可以在后台线程使用的ThreadPoolExecutorbeforeExecute()afterExecute()钩复制,或通过创建Runnable包装你的任务是拦截run()调用设置未设置必要的ThreadLocal实例。实际上,后一种技术可能会更好,因为它可以为您提供一个方便的位置,以便在任务排队时存储ThreadLocal值。


更新:下面是第二种方法的更具体的说明。与我的原始描述相反,存储在包装器中的所有内容都是调用线程,在执行任务时会被询问。

static Runnable wrap(Runnable task) 
{ 
    Thread caller = Thread.currentThread(); 
    return() -> { 
    Iterable<ThreadLocal<?>> vars = copy(caller); 
    try { 
     task.run(); 
    } 
    finally { 
     for (ThreadLocal<?> var : vars) 
     var.remove(); 
    } 
    }; 
} 

/** 
* For each {@code ThreadLocal} in the specified thread, copy the thread's 
* value to the current thread. 
* 
* @param caller the calling thread 
* @return all of the {@code ThreadLocal} instances that are set on current thread 
*/ 
private static Collection<ThreadLocal<?>> copy(Thread caller) 
{ 
    /* Use a nasty bunch of reflection to do this. */ 
    throw new UnsupportedOperationException(); 
} 
+0

你能举一个例子说明后一种技术是如何工作的吗? – Viraj

+0

@Viraj我添加了一些代码。这有帮助吗? – erickson

+0

是的。当然。 – Viraj

2

据我了解你的问题,你可以看看InheritableThreadLocal这是指通过从父线程上下文ThreadLocal变量子线程上下文

+15

不会工作,首先OP不能控制第三方库中的ThreadLocal创建。其次,'ExecutorService'重用线程,而'InheritableThreadLocal'只在直接产生新线程时才起作用。 –

-1

如果你看看ThreadLocal的代码,你可以看到:

public T get() { 
     Thread t = Thread.currentThread(); 
     ... 
    } 

当前线程不能被覆盖。

可能的解决方案:

  1. 了解Java 7叉/加入机制(但我认为这是一个坏的方式)

  2. endorsed机制覆盖在你的JVM ThreadLocal类。

  3. 尝试重写的RESTEasy(你可以使用重构工具,在你的IDE来代替所有的ThreadLocal的使用,它看起来像易)

1

基于@erickson的答案我写了这段代码。它正在为inheritableThreadLocals工作。它使用与Thread构造函数中使用的方法相同的方法构建inheritableThreadLocals的列表。当然,我使用反射来做到这一点。我也重写执行程序类。

public class MyThreadPoolExecutor extends ThreadPoolExecutor 
{ 
    @Override 
    public void execute(Runnable command) 
    { 
     super.execute(new Wrapped(command, Thread.currentThread())); 
    } 
} 

打包机:

private class Wrapped implements Runnable 
    { 
     private final Runnable task; 

     private final Thread caller; 

     public Wrapped(Runnable task, Thread caller) 
     { 
     this.task = task; 
     this.caller = caller; 
     } 

     public void run() 
     { 
     Iterable<ThreadLocal<?>> vars = null; 
     try 
     { 
      vars = copy(caller); 
     } 
     catch (Exception e) 
     { 
      throw new RuntimeException("error when coping Threads", e); 
     } 
     try { 
      task.run(); 
     } 
     finally { 
      for (ThreadLocal<?> var : vars) 
       var.remove(); 
     } 
     } 
    } 

复制方法:

public static Iterable<ThreadLocal<?>> copy(Thread caller) throws Exception 
    { 
     List<ThreadLocal<?>> threadLocals = new ArrayList<>(); 
     Field field = Thread.class.getDeclaredField("inheritableThreadLocals"); 
     field.setAccessible(true); 
     Object map = field.get(caller); 
     Field table = Class.forName("java.lang.ThreadLocal$ThreadLocalMap").getDeclaredField("table"); 
     table.setAccessible(true); 

     Method method = ThreadLocal.class 
       .getDeclaredMethod("createInheritedMap", Class.forName("java.lang.ThreadLocal$ThreadLocalMap")); 
     method.setAccessible(true); 
     Object o = method.invoke(null, map); 

     Field field2 = Thread.class.getDeclaredField("inheritableThreadLocals"); 
     field2.setAccessible(true); 
     field2.set(Thread.currentThread(), o); 

     Object tbl = table.get(o); 
     int length = Array.getLength(tbl); 
     for (int i = 0; i < length; i++) 
     { 
     Object entry = Array.get(tbl, i); 
     Object value = null; 
     if (entry != null) 
     { 
      Method referentField = Class.forName("java.lang.ThreadLocal$ThreadLocalMap$Entry").getMethod(
        "get"); 
      referentField.setAccessible(true); 
      value = referentField.invoke(entry); 
      threadLocals.add((ThreadLocal<?>) value); 
     } 
     } 
     return threadLocals; 
    } 
0

这里是通过在父线程由CompletableFuture所跨越的子线程的当前LocaleContext为例[缺省情况下它用于ForkJoinPool。

只需定义您想要在Runnable块内的子线程中执行的所有操作。所以,当CompletableFuture执行Runnable块时,它的子线程在控制中,并且你有在Parent的ThreadLocal中设置的父母的ThreadLocal东西。

这里的问题不是整个ThreadLocal被复制过来。只复制LocaleContext。由于ThreadLocal只能使用反射来访问它所属的Thread,并且试图在Child中获取和设置都是太多古怪的东西,可能会导致内存泄漏或性能下降。

所以如果你知道你从ThreadLocal感兴趣的参数,那么这个解决方案的工作方式更清洁。

public void parentClassMethod(Request request) { 
     LocaleContext currentLocale = LocaleContextHolder.getLocaleContext(); 
     executeInChildThread(() -> { 
       LocaleContextHolder.setLocaleContext(currentLocale); 
       //Do whatever else you wanna do 
      })); 

     //Continue stuff you want to do with parent thread 
} 


private void executeInChildThread(Runnable runnable) { 
    try { 
     CompletableFuture.runAsync(runnable) 
      .get(); 
    } catch (Exception e) { 
     LOGGER.error("something is wrong"); 
    } 
} 
0

我不喜欢Reflection方法。另一种解决方案是实现执行程序包装并将对象直接作为ThreadLocal上下文传递给传播父上下文的所有子线程。

public class PropagatedObject { 

    private ThreadLocal<ConcurrentHashMap<AbsorbedObjectType, Object>> data = new ThreadLocal<>(); 

    //put, set, merge methods, etc 

} 

==>

public class ObjectAwareExecutor extends AbstractExecutorService { 

    private final ExecutorService delegate; 
    private final PropagatedObject objectAbsorber; 

    public ObjectAwareExecutor(ExecutorService delegate, PropagatedObject objectAbsorber){ 
     this.delegate = delegate; 
     this.objectAbsorber = objectAbsorber; 
    } 
    @Override 
    public void execute(final Runnable command) { 

     final ConcurrentHashMap<String, Object> parentContext = objectAbsorber.get(); 
     delegate.execute(() -> { 
      try{ 
       objectAbsorber.set(parentContext); 
       command.run(); 
      }finally { 
       parentContext.putAll(objectAbsorber.get()); 
       objectAbsorber.clean(); 
      } 
     }); 
     objectAbsorber.merge(parentContext); 
    } 
相关问题