0
点赞
收藏
分享

微信扫一扫

spring异步线程传递线程上下文

程序员知识圈 2022-03-11 阅读 26

我们直接使用@Async注解,当然你也可以直接使用线程池,效果是一样的
首先我们创建ContextDecorator实现TaskDecorator接口

package com.qimo.omsa.demo.thread;

import java.util.Map;
import org.springframework.core.task.TaskDecorator;

/**
 * @Description TODO
 * @Author 姚仲杰#80998699
 * @Date 2022/1/30 13:55
 */
public class ContextDecorator implements TaskDecorator {
    
    @Override
    public Runnable decorate(Runnable runnable) {
        return new ContextDecorator.CopyContextToSubThread(runnable);
    }
    
    public static class CopyContextToSubThread implements Runnable{
        private Runnable runnable;
        private Map<String, String> contextMap;
        
        CopyContextToSubThread(Runnable runnable){
            this.runnable=runnable;
            this.contextMap=SystemContext.getContextMap();
        }
        @Override
        public void run() {
            try{
                if (contextMap != null) {
                    SystemContext.setContextMap(contextMap);
                }
                this.runnable.run();
            }finally {
                SystemContext.clean();
            }
        }
    }
}

接着我们配置线程池

package com.qimo.omsa.demo.thread;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
 * @Description TODO
 * @Author 姚仲杰#80998699
 * @Date 2022/3/7 17:59
 */
@Configuration
@EnableAsync()
public class DecoratorThreadPool {
    @Bean
    ThreadPoolTaskExecutor threadPoolTaskExecutor(){
        ThreadPoolTaskExecutor threadPoolTaskExecutor=new ThreadPoolTaskExecutor();
        //核心线程数
        threadPoolTaskExecutor.setCorePoolSize(10);
        //最大线程数
        threadPoolTaskExecutor.setMaxPoolSize(10);
        //设置线程装饰器
        threadPoolTaskExecutor.setTaskDecorator(new ContextDecorator());
        //设置阻塞队列容量
        threadPoolTaskExecutor.setQueueCapacity(500);
        //设置线程前缀
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }
    
}

写个ThreadLocal,一般线程上下文我们要稍微控制下大小。

package com.qimo.omsa.demo.thread;

import java.util.HashMap;
import java.util.Map;

/**
 * @Description TODO
 * @Author 姚仲杰#80998699
 * @Date 2022/3/7 17:37
 */
public class SystemContext {
    private static transient ThreadLocal<Map<String,String>> contextMap=new ThreadLocal<>();
    private static Integer MAX_CAPACITY = 200;
    private static Integer MAX_SIZE = Integer.MAX_VALUE;
    
    public static String get(String key) {
        Map<String, String> contextMap = getContextMap();
        return contextMap == null ? null : contextMap.get(key);
    }
    
    public static String put(String key, String value) {
        if (key != null && value != null) {
            if (key.length() > MAX_SIZE) {
                throw new RuntimeException("key is more than " + MAX_SIZE + ", i can't put it into the context map");
            } else if (value.length() > MAX_SIZE) {
                throw new RuntimeException("value is more than " + MAX_SIZE + ", i can't put it into the context map");
            } else {
                Map<String, String> contextMap = getContextMap();
                if (contextMap == null) {
                    contextMap = new HashMap();
                    setContextMap((Map)contextMap);
                }
                
                if (((Map)contextMap).size() > MAX_CAPACITY) {
                    throw new RuntimeException("the context map is full, can't put anything");
                } else {
                    return (String)((Map)contextMap).put(key, value);
                }
            }
        } else {
            throw new RuntimeException("key:" + key + " or value:" + value + " is null,i can't put it into the context map");
        }
    }
    
    
    public static Map<String, String> getContextMap() {
        return (Map)contextMap.get();
    }
    
    public static void setContextMap(Map<String, String> contextMap) {
        SystemContext.contextMap.set(contextMap);
    }
    
    public static void clean() {
        contextMap.remove();
    }
}

接着我们再写个测试类controllerservice

package com.qimo.omsa.demo.thread;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Description TODO
 * @Author 姚仲杰#80998699
 * @Date 2022/3/7 18:14
 */
@RestController
public class ThreadControllerTest {
    Logger logger= LoggerFactory.getLogger(ThreadControllerTest.class);
    
    @Autowired
    ThreadServiceTest threadServiceTest;
    
    @GetMapping("/thread/test")
    public String testThread(){
        SystemContext.put("xxxxx","xxxxxxx");
        logger.info("aaaaaaaa");
        threadServiceTest.subThread();
        return "success";
    }
}

service

package com.qimo.omsa.demo.thread;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

/**
 * @Description TODO
 * @Athor 姚仲杰#80998699
 * @Date 2022/3/7 18:24
 */
@Service
public class ThreadServiceTest {
    Logger logger= LoggerFactory.getLogger(ThreadControllerTest.class);
    @Async
    public void subThread(){
        logger.info(SystemContext.get("xxxxx"));
    }
}

然后启动项目,请求接口你会发现如下日志

#这个是request的线程日志,线程名字为http-nio-8080-exec-1
2022-03-07 19:08:30.768 +0800 [TID: N/A] [http-nio-8080-exec-1] INFO  com.qimo.omsa.demo.thread.ThreadControllerTest:26 aaaaaaaa
#而这个是异步线程的日志即subThread方法打印的
2022-03-07 19:08:39.015 +0800 [TID: N/A] [threadPoolTaskExecutor-1] INFO  com.qimo.omsa.demo.thread.ThreadControllerTest:19 xxxxxxx

你会发现你在http-nio-8080-exec-1中放进去的xxxxxxx,在threadPoolTaskExecutor-1子线程中能取到了。

原理其实很简单,就是使用了一个装饰器模式装饰了下线程。
我们来查看ThreadPoolTaskExecutor中的源码

@Override
	protected ExecutorService initializeExecutor(
			ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

		BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);

		ThreadPoolExecutor executor;
		if (this.taskDecorator != null) {
			executor = new ThreadPoolExecutor(
					this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
					queue, threadFactory, rejectedExecutionHandler) {
				@Override
				public void execute(Runnable command) {
					Runnable decorated = taskDecorator.decorate(command);
					if (decorated != command) {
						decoratedTaskMap.put(decorated, command);
					}
					super.execute(decorated);
				}
			};
		}
		else {
			executor = new ThreadPoolExecutor(
					this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
					queue, threadFactory, rejectedExecutionHandler);

		}

		if (this.allowCoreThreadTimeOut) {
			executor.allowCoreThreadTimeOut(true);
		}

		this.threadPoolExecutor = executor;
		return executor;
	}

此方法复写了ThreadPoolExecutorexecute 方法,实际上也就对Runnable进行了一个装饰,并且把怎么装饰的扩展留给了程序员去实现,所以我们实现了TaskDecorator接口之后并把他配置给ThreadPoolTaskExecutor,它就会根据我们自定义的逻辑进行装饰了。

public class ContextDecorator implements TaskDecorator {
    //装饰器我们接收一个Runnable,并且在它之前做一些什么事情,例如把父线程的SystemContext传递过来,然后在run里面再把contextMap给到当前线程。然后再执行实际的this.runnable.run();
    @Override
    public Runnable decorate(Runnable runnable) {
        return new ContextDecorator.CopyContextToSubThread(runnable);
    }
    
    public static class CopyContextToSubThread implements Runnable{
        private Runnable runnable;
        private Map<String, String> contextMap;
        
        CopyContextToSubThread(Runnable runnable){
            this.runnable=runnable;
            this.contextMap=SystemContext.getContextMap();
        }
        @Override
        public void run() {
            try{
                if (contextMap != null) {
                    SystemContext.setContextMap(contextMap);
                }
                this.runnable.run();
            }finally {
                SystemContext.clean();
            }
        }
    }
}

我们知道runnable只是一个接口,需要将runnable装给Thread,最终等thread.start()方法被调用时,由native的start0() 方法回调run()方法来执行,所以,以上就是把原来的run方法当作一个普通方法进行装饰。而真正线程的方法为我们装饰器的run方法,spring就是巧妙的利用了这一点。当然我们在使用完ThreadLocal之后记得回收避免数据污染和内存泄漏。

举报

相关推荐

0 条评论