当前位置:首页 > Java技术 > 正文内容

ThreadLocal跨线程问题

canca1年前 (2024-07-06)Java技术366

1、问题

通常复杂的处理流程中,我们会使用一些异步处理的手段,那么这种场景下ThreadLocal即可能出现获取失败的问题。

public class ThreadLocalTest {

    public static void main(String[] args) {
        ThreadLocal<String> threadLocal = new ThreadLocal<>();
        threadLocal.set("A");
        System.out.println(Thread.currentThread().getName()+"-getThreadLocal:"+threadLocal.get());
        new Thread(()->{
            System.out.println(Thread.currentThread().getName()+"-getThreadLocal:"+threadLocal.get());
        }).start();
    }
}
/Library/Java/JavaVirtualMachines/jdk
Connected to the target VM,address:
main-getThreadLocal:A
Disconnected from the target VM,address
Thread-0-getThreadLocal:null

2、InheritableThreadLocal

直接使用ThreadLocal,在跨线程时时无法获取到ThreadLocal的。
在lang包还有一个继承自ThreadLocal的类InheritableThreadLocal

public class InheritableThreadLocal<T> extends ThreadLocal<T> {
    /**
     * Computes the child's initial value for this inheritable thread-local
     * variable as a function of the parent's value at the time the child
     * thread is created.  This method is called from within the parent
     * thread before the child is started.
     * <p>
     * This method merely returns its input argument, and should be overridden
     * if a different behavior is desired.
     *
     * @param parentValue the parent thread's value
     * @return the child thread's initial value
     */
    protected T childValue(T parentValue) {
        return parentValue;
    }

    /**
     * Get the map associated with a ThreadLocal.
     *
     * @param t the current thread
     */
    ThreadLocalMap getMap(Thread t) {
       return t.inheritableThreadLocals;
    }

    /**
     * Create the map associated with a ThreadLocal.
     *
     * @param t the current thread
     * @param firstValue value for the initial entry of the table.
     */
    void createMap(Thread t, T firstValue) {
        t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
    }
}

重写了getMap和createMap方法,让每次get、set操作都是对inheritableThreadLocals进行操作。
在Thread的init方法中有个判断,若是父线程的inheritableThreadLocals不为空,则将其复制到子线程。

if (parent.inheritableThreadLocals != null)
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
/* Stash the specified stack size in case the VM cares */

也就是说我们使用InheritableThreadLocal,只要新建线程就可以让ThreadLocal在子父线程之间传递。

public class ThreadLocalTest {

    public static void main(String[] args) {
        ThreadLocal<String> threadLocal = new InheritableThreadLocal<>();
        threadLocal.set("A");
        System.out.println(Thread.currentThread().getName()+"-getThreadLocal:"+threadLocal.get());
        new Thread(()->{
            System.out.println(Thread.currentThread().getName()+"-getThreadLocal:"+threadLocal.get());

            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"-getThreadLocal:"+threadLocal.get());
            }).start();
        }).start();
    }
}
Connected to the target VM,address:
main-getThreadLocal:A
Thread-0-getThreadLocal:A
Thread-1-getThreadLocal:A
Disconnected from the target VM,address

3、线程池复用Thread导致的问题

并不代表InheritableThreadLocal就可以保证子父线程之间正确传递ThreadLocal对象。inheritableThreadLocals的复制操作只有在新创建Thread对象的时候才会触发。而我们通常不会在项目中new Thread,而是使用线程池,线程池的Thread对象是复用的。
将上面的例子改成使用线程池
使用固定大小的线程池,当固定大小为2时没问题,两次使用的不是同一个Thread,每次都成功复制了正确的ThreadLocal。

public class ThreadLocalTest1 {

    public static void main(String[] args) throws InterruptedException {
        Executor threadPool = Executors.newFixedThreadPool(2);

        ThreadLocal<String> threadLocal = new InheritableThreadLocal<>();
        threadLocal.set("A");
        System.out.println(Thread.currentThread().getName()+"-getThreadLocal:"+threadLocal.get());


        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+"-getThreadLocal:"+threadLocal.get());
            }
        });

        threadLocal.set("B");

        Thread.sleep(200);
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+"-getThreadLocal:"+threadLocal.get());
            }
        });
    }
}
Connected to the target VM,address:'127.
main-getThreadLocal:A
pool-1-thread-1-getThreadLocal:A
pool-1-thread-2-getThreadLocal:B

而将线程池固定大小设为1时会发现,使用同一个Thread对象,第二次并不会触发重新复制ThreadLocal对象,还是以前的A。

public class ThreadLocalTest1 {

    public static void main(String[] args) throws InterruptedException {
        Executor threadPool = Executors.newFixedThreadPool(1);

        ThreadLocal<String> threadLocal = new InheritableThreadLocal<>();
        threadLocal.set("A");
        System.out.println(Thread.currentThread().getName()+"-getThreadLocal:"+threadLocal.get());


        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+"-getThreadLocal:"+threadLocal.get());
            }
        });

        threadLocal.set("B");

        Thread.sleep(200);
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+"-getThreadLocal:"+threadLocal.get());
            }
        });
    }
}
/Library/Java/JavaVirtualMachines/jdk1
Connected to the target VM,address:
main-getThreadLocal:A
pool-1-thread-1-getThreadLocal:A
pool-1-thread-1-getThreadLocal:A

4、解决办法

阿里的TransmittableThreadLocal
引入transmittable-thread-local依赖

<dependency>
	<groupId>com.alibaba</groupId>
	<artifactId>transmittable-thread-local</artifactId>
	<version>2.11.4</version>
</dependency>

使用TransmittableThreadLocal继续改造上面的例子

public class ThreadLocalTest2 {

    public static void main(String[] args) throws InterruptedException {
        Executor threadPool = TtlExecutors.getTtlExecutor(Executors.newFixedThreadPool(1));

        ThreadLocal<String> threadLocal = new TransmittableThreadLocal<>();
        threadLocal.set("A");
        System.out.println(Thread.currentThread().getName()+"-getThreadLocal:"+threadLocal.get());


        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+"-getThreadLocal:"+threadLocal.get());
            }
        });

        threadLocal.set("B");

        Thread.sleep(200);
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+"-getThreadLocal:"+threadLocal.get());
            }
        });
    }
}
Connected to the target VM,address:
main-getThreadLocal:A
pool-1-thread-1-getThreadLocal:A
pool-1-thread-1-getThreadLocal:B

两个关键点:
(1)需要使用transmittable-thread-local包中的方法包装线程池,否则即使使用TransmittableThreadLocal也不会起作用
(2)使用TransmittableThreadLocal对象来保存线程变量。
简单分析一下这个东东:
TransmittableThreadLocal继承自InheritableThreadLocal

123.png

TransmittableThreadLocal中有个holder的静态对象,对ThreadLocal进行get、set、remove等操作时其实是在对这个holder进行操作。用到了WeakHashMap,这个其实和JDK1.8以前的老版hashmap很相似,不同的是Entry对象是使用WeakReference包装的,这应该个原生ThreadLocal中的ThreadLocalMap使用WeakReference是一样的道理。(ThreadLocal为什么使用WeakReference

private static InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder = new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {
        protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() {
            return new WeakHashMap();
        }

        protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) {
            return new WeakHashMap(parentValue);
        }
    };

然后看包装线程池是在干什么

@Nullable
    public static Executor getTtlExecutor(@Nullable Executor executor) {
        return (Executor)(!TtlAgent.isTtlAgentLoaded() && null != executor && !(executor instanceof TtlEnhanced)?new ExecutorTtlWrapper(executor):executor);
    }

返回了一个ExecutorTtlWrapper对象,在调用execute时将Runnable对象换成了TtlRunnable。

class ExecutorTtlWrapper implements Executor, TtlWrapper<Executor>, TtlEnhanced {
    private final Executor executor;

    ExecutorTtlWrapper(@NonNull Executor executor) {
        this.executor = executor;
    }

    public void execute(@NonNull Runnable command) {
        this.executor.execute(TtlRunnable.get(command));
    }

    @NonNull
    public Executor unwrap() {
        return this.executor;
    }
}

TtlRunnable中改造了原生Runnable的run方法

public final class TtlRunnable implements Runnable, TtlWrapper<Runnable>, TtlEnhanced, TtlAttachments {
    private final AtomicReference<Object> capturedRef = new AtomicReference(Transmitter.capture());
    private final Runnable runnable;
    private final boolean releaseTtlValueReferenceAfterRun;
    private final TtlAttachmentsDelegate ttlAttachment = new TtlAttachmentsDelegate();

    private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
        this.runnable = runnable;
        this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
    }

    public void run() {
        Object captured = this.capturedRef.get();
        if(captured != null && (!this.releaseTtlValueReferenceAfterRun || this.capturedRef.compareAndSet(captured, (Object)null))) {
            Object backup = Transmitter.replay(captured);

            try {
                this.runnable.run();
            } finally {
                Transmitter.restore(backup);
            }

        } else {
            throw new IllegalStateException("TTL value reference is released after run!");
        }
    }
    ......
 }

Transmitter.capture()其实就是在获取TransmittableThreadLocal中holder的副本,对该副本使用AtomicReference进行包装,方便保证原子性。
调用run时,从capturedRef获取副本,使用cas将其更新为null,若cas失败则抛出异常IllegalStateException(“TTL value reference is released after run!”),使用cas来保证每次取到的都是最新的副本。
因此包装线程池的作用就是将以前每次新建Thread对象才拷贝inheritableThreadLocals的机制变成了每次新建Runnable的时候拷贝副本,从而保证线程池中子父线程之间ThreadLocal对象的传递。

这种在子父线程之间传递上下文的操作其实在很多框架中都有,如sleuth:
以前一直很好奇,为什么异步调用时调用链路依然是完整的,很神奇。sleuth中使用一个叫ExecutorBeanPostProcessor的后置处理器包装了所有的线程池,例如将ThreadPoolTaskExecutor包装为LazyTraceThreadPoolTaskExecutor。LazyTraceThreadPoolTaskExecutor重写了线程池的execute、submit等方法,当我们传入参数Runnable或者Callable时会被包装为TraceRunnable或TraceCallable。


扫描二维码推送至手机访问。

版权声明:本文由Ant.Master's Blog发布,如需转载请注明出处。

本文链接:https://iant.work/post/883.html

分享给朋友:

“ThreadLocal跨线程问题” 的相关文章

对象序列化与反序列化

    序列化,并不是JAVA独有的。因此,在这里我用比较通俗的话说了。序列化就是把一个对象转换成有规则的二进制流。而反序列化就是把有规则的二进制数据重整成一个对象。其好处不难看见:1.可以把一个对象保存在一个文件里。例如,下载软件。当您关闭了软件,下次再打开...

Socket与ServerSocket的问题

//服务器端:import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.io.PrintStream;import java.net.ServerSock...

Java语言的反射机制

    由于项目的需要,在项目中要实现即插即用的方式,也就是说可以动态地加载包,不用设置CLASSPATH路径。当项目发布时,不可能要用户来设置环境变量吧!因此,就要用到JAVA的反射机制了。昨天,我是在研究JAVA的JNI技术。由于没有时间,所以过几天才写JNI。今天...

JSP与Servlet的对应关系

以前在QQzone写下的文章现在贴到这里来了... 最近比较忙啊!现在抽身写一篇文章。是关于JSP与Servlet的对应关系的。希望对大家有所帮助。其实我也是刚刚学的......-------Servlet--------------JSP----------1.ServletContext&nbs...

JAVA获得一个文件夹大小

在JAVA里没有现成的方法获取一个文件夹的大小,那么我们可以用递归的方法,获取文件夹的大小。    import  java.util.*;  import  java.io.*;  class  GetFileSi...

IM技术(1)

    做项目了,NetCL今天开工了,在这些日子里,我会将自己研究的内容写下来。做个记录,以下是我在网上搜到的。关于管理用户状态的解决方案,当然,我都有一个方案。不过对客户端的任务有点重吧,我方法是客户端从服务器端获到一个用户在线状态后,接着就与服务器无关了。好友离线...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。