Transmittable-Thread-Local:阿里开源的线程间上下文传递解决方案

2021/03/18

以下文章来源于TopCoder

正文

TTL(transmittable-thread-local)是一个线程间传递ThreadLocal,异步执行时上下文传递的解决方案。整个库的核心是构建在TransmittableThreadLocal类(继承并加强InheritableThreadLocal类)之上,同时包含线程池修饰(ExecutorService/ForkJoinPool/TimerTask)以及Java Agent支持,代码小于1k行,短小精悍。

在往下看之前,最好大致看下 https://github.com/alibaba/transmittable-thread-local 文档,效果会更好。JDK的InheritableThreadLocal类可以完成父线程到子线程的值传递。但对于使用线程池等会池化复用线程的组件的情况,线程由线程池创建好,并且线程是池化起来反复使用的;这时父子线程关系的ThreadLocal值传递已经没有意义,应用需要的实际上是把 任务提交给线程池时的ThreadLocal值传递任务执行时。原理是使用TtlRunnable/Ttlcallable包装了Runnable/Callable类:

  1. 在TtlRunnable/Ttlcallable初始化时capture TransmittableThreadLocal变量

  2. 在run方法调用runnable.run()前进行replay,设置到当前线程ThreadLocal

  3. 在run方法调用runnable.run()后进行restore,上下文还原,也就是replay的反向操作

注意,步骤1和步骤2/3不是在同一个线程中执行的。

既然TTL的TransmittableThreadLocal是继承并加强InheritableThreadLocal类的,那么首先需要分析下InheritableThreadLocal是什么东东,源码如下:

public class InheritableThreadLocal<T> extends ThreadLocal<T> {
    /**
    * 新建线程时,如果当前inheritableThreadLocals非空,则会获取当前inheritableThreadLocals传递给新线程
    */
    protected T childValue(T parentValue) {
        return parentValue;
    }
    /**
    * InheritableThreadLocal变量的set/get/remove操作都是在inheritableThreadLocals上
    */
    ThreadLocalMap getMap(Thread t) {
        return t.inheritableThreadLocals;
    }
    /**
    * 创建inheritableThreadLocals
    */
    void createMap(Thread t, T firstValue) {
        t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
    }
}

Thread类中有两个ThreadLocal相关的ThreadLocalMap属性,如下:

ThreadLocal.ThreadLocalMap threadLocals:ThreadLocal变量使用
ThreadLocal.ThreadLocalMap inheritableThreadLocals:InheritableThreadLocal变量使用

新建线程时,将当前线程的inheritableThreadLocals传递给新线程,这里的传递是对InheritableThreadLocal变量的数据做浅拷贝(引用复制),这样新线程可以使用同一个InheritableThreadLocal变量查看上一个线程的数据。

下面分析下使用InheritableThreadLocal的一个demo:

void testInheritableThreadLocal_线程池() throws InterruptedException {
final InheritableThreadLocal<String> parent = new InheritableThreadLocal<>();
parent.set("value-set-in-parent");

    ExecutorService executor = Executors.newFixedThreadPool(1);
    executor.submit(() -> System.out.println(Thread.currentThread().getName() + ": " + parent.get()));
}
// 输出结果:pool-1-thread-1: value-set-in-parent

上面代码在submit任务时会伴随着(线程池工作)线程的创建,会继承当前线程的InheritableThreadLocal,所以会有上述输出结果。如果将代码改成下面的样子,会有什么不同呢?

void testInheritableThreadLocal_线程池() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(1);
executor.submit(() -> {}); // 先进行工作线程创建

    final InheritableThreadLocal<String> parent = new InheritableThreadLocal<>();
    parent.set("value-set-in-parent");
    executor.submit(() -> System.out.println(Thread.currentThread().getName() + ": " + parent.get()));
}
// 输出结果:pool-1-thread-1: null

因为创建线程时当前线程并没有inheritableThreadLocals,所以线程池中线程打印结果为null。这种场景下如何才能获取到parent变量的数据呢?这时就该TTL出场了,将上述代码改成TTL方式如下:

void testTtlInheritableThreadLocal_线程池() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(1);
executor.submit(() -> {}); // 先进行工作线程创建

    // 使用TTL
    final TransmittableThreadLocal<String> parent = new TransmittableThreadLocal<>();
    parent.set("value-set-in-parent");
    // 将Runnable通过TtlRunnable包装下
    executor.submit(TtlRunnable.get(() -> System.out.println(Thread.currentThread().getName() + ": " + parent.get())));
}
// 输出结果:pool-1-thread-1: value-set-in-parent

下面以TtlRunnable.get()为起点分析TTL的设计实现,TtlRunnable.get源码如下(TtlRunnable.get流程对应的初始化时capture操作,保存快照。TtlCallable和TtlRunnable流程类似):

public static TtlRunnable get(@Nullable Runnable runnable) {
    return get(runnable, false, false);
}
public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
    if (runnable instanceof TtlEnhanced) {
    // 幂等时直接返回,否则执行会产生问题,直接抛异常
        if (idempotent) return (TtlRunnable) runnable;
        else throw new IllegalStateException("Already TtlRunnable!");
}
return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun);
}
private TtlRunnable(@Nonnull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
    this.capturedRef = new AtomicReference<Object>(capture());
    this.runnable = runnable;
    this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}
public static Object capture() {
Map<TransmittableThreadLocal<?>, Object> captured = new HashMap<TransmittableThreadLocal<?>, Object>();
// 从holder获取所有threadLocal,存到captured,这里相当于对当前线程holder做一个快照保存
// 到TtlRunnable实例属性中,在执行TtlRunnable时进行回放
for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) {
    captured.put(threadLocal, threadLocal.copyValue());
}
return captured;
}

在新建TtlRunnable过程中,会保存下TransmittableThreadLocal.holder到captured,记录到TtlRunnable实例中的capturedRef字段,TransmittableThreadLocal.holder类型是:

// Note about holder:
// 1. The value of holder is type Map<TransmittableThreadLocal<?>, ?> (WeakHashMap implementation),
//    but it is used as *set*. 因为没有WeakSet的原因
// 2. WeakHashMap support null value.
private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder =
    new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() {
        @Override
        protected Map<TransmittableThreadLocal<?>, ?> initialValue() {
            return new WeakHashMap<TransmittableThreadLocal<?>, Object>();
                    }
        @Override
        protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) {
            return new WeakHashMap<TransmittableThreadLocal<?>, Object>(parentValue);
        }
};

从上面代码我们知道初始化TtlRunnable时已经将TransmittableThreadLocal保存下来了,那么什么时候应用到当前线程ThreadLocal中呢,这是就需要看下TtlRunnable.run方法:

public void run() {
    Object captured = capturedRef.get();
    // captured不应该为空,releaseTtlValueReferenceAfterRun为true时设置capturedRef为null,防止当前Runnable重复执行
    if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
        throw new IllegalStateException("TTL value reference is released after run!");
    }
    
        // captured进行回放,应用到当前线程中
        Object backup = replay(captured);
        try {
            runnable.run();
        } finally {
            restore(backup);
        }
}

注意,TTL中的replay操作是以captured为当前inheritableThreadLocals的(处理逻辑是在TtlRunable run时,会以TtlRunnable.get时间点获取的captured(类似TTL快照)为准,holder中不在captured的先移除,在的会被替换)。关于replay中的clean extra TTL value讨论可以参考:https://github.com/alibaba/transmittable-thread-local/issues/134。回放captured和执行完runnable.run之后,再restore恢复到原来inheritableThreadLocals的状态。

说完了TTL中的capture、replay和restore流程,再看下官方提供的这个时序图,是不是感觉清晰很多。

除了通过TtlRunable.get()修饰用户自定义的task之外,还可以修饰线程池和使用Java Agent修饰JDK线程池实现类的方式实现TTL功能。

修饰线程池省去每次Runnable和Callable传入线程池时的修饰,这个逻辑可以在线程池中完成,其实就是在提交task时调用TtlRunable.get()修饰下。通过工具类com.alibaba.ttl.threadpool.TtlExecutors完成,有下面的方法:

  • getTtlExecutor:修饰接口Executor

  • getTtlExecutorService:修饰接口ExecutorService

  • getTtlScheduledExecutorService:修饰接口ScheduledExecutorService

使用Java Agent来修饰JDK线程池实现类,这种方式,实现线程池的传递是透明的,代码中没有修饰Runnable或是线程池的代码。即可以做到应用代码无侵入。关于 无侵入 的更多说明参见文档 Java Agent方式对应用代码无侵入。

使用Java Agent 实例如下:

// 启动参数中需加上 -javaagent:path/transmittable-thread-local-x.x.x.jar
ExecutorService executor = Executors.newFixedThreadPool(1);
executor.submit(() -> {});

final TransmittableThreadLocal<String> parent = new TransmittableThreadLocal<>();
parent.set("value-set-in-parent");

executor.submit(() -> System.out.println(Thread.currentThread().getName() + ": " + parent.get()));
executor.shutdown();

TTL代码实现来看,确实短小精悍,值得花几个小时看下源码。通过看源码,我发现了,可以通过new ThreadLocal对象时,直接重写其initialValue方法,可以在threadLocal.get为空时初始化一个值,使用示例如下:

ThreadLocal<String> local = new ThreadLocal<String>() {
@Override
protected String initialValue() {
        return "init";
    }
};

System.out.println(local.get()); // init

local.set("hello world");
System.out.println(local.get()); // hello world

local.remove();
System.out.println(local.get()); // init

** 推荐阅读 **

Post Directory