0
点赞
收藏
分享

微信扫一扫

从开源框架学习设计模式之策略模式应用

1.

A 拨云见日,抓住问题本质

(本源问题)用户问题:“我想支付”

(经营视角)业务问题:支持一切可以稳定的、低成本的、高效率的第三方支付工具,

(体系结构)产品问题:支付需要逆向流程、异常流程、对账模块、清算模块等

(架构代码)技术问题:非功能性的需求——高并发、可用性(成本),实现第三方支付的链路

运维

B、明确目标,架构恰到好处

Keep it simpe and smile

架构的理念是大道至简:解决问题

如何让我们的系统有可扩展性(研发)和可维护性(运营、维护)、高可用性

如何让我们的系统能够恰到好处地解决问题,

如何让我们的系统能够运行3-5年不重构(用户量、技术架构趋势、重构成本)

C、洞悉原则,设计一针见血

一切重复的代码都可以抽象

重复代码的危害性:

不一致性

代码冗余

易出BUG 新价值-学习成本-维护成本<老组件价值

提升软件的可扩展性,可维护性需要抽象思维和归纳思维的集中发力

一级扩展性——流程的可扩展性 面向服务架构(SOA)、微服务架构、工作流程引擎

二级扩展性——业务规则的可扩展性 风控系统 机器人 规则引擎

三级可扩展性——实现细节的可扩展性 设计模式、注解与SPI机制、 OSGI JDK 9 模块化

一切的设计模式都是为了寻找变化点、封装隔离变化点

2.2、成就优秀框架的本质探讨

A、自我实现的框架全家福

B、使用过的优秀框架大阅兵C、自研的框架与优秀框架的差距

 

2.3、学习优秀框架变得更优秀

3、ThreadPoolExecutor中的策略模式应用分析

JDK中场景的策略模式如线程池的拒绝策略,就是个典型的策略模式,这个也是面试常问话题,比如有

哪几个参数,他们的策略是啥,优缺点是啥,为什么需要,如何自定义实现适合不同业务场景的线程池

拒绝策略,这也是本文的授课核心关注点。

从开源框架学习设计模式之策略模式应用_线程池

 

 

从开源框架学习设计模式之策略模式应用_线程池_02

 

 

通过Callable/Future 实现带返回值的线程;

所以,submit可以实现带返回值的任务(阻塞获取返回值),excute不支持返回值;

excute发生错误会抛出异常,submit不会抛出异常;

ThreadPoolExecutor与RejectedExecutionHandler的关系

从开源框架学习设计模式之策略模式应用_ide_03

 

 

从开源框架学习设计模式之策略模式应用_可扩展性_04

 

 

从开源框架学习设计模式之策略模式应用_ide_05

 

 

从开源框架学习设计模式之策略模式应用_线程池_06

 

 

从开源框架学习设计模式之策略模式应用_线程池_07

 

 

从开源框架学习设计模式之策略模式应用_可扩展性_08

 

 

从开源框架学习设计模式之策略模式应用_线程池_09

 

 

从开源框架学习设计模式之策略模式应用_线程池_10

 

 

从开源框架学习设计模式之策略模式应用_ide_11

 

3.4、自定义线程池的缓存策略——记录并告警提交被拒绝的任务
this.threadName = threadName;
this.url = url;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d,
max: %d, largest: %d), Task: %d (completed: %d)," +
" Executor status:(isShutdown:%s, isTerminated:%s,
isTerminating:%s), in %s://%s:%d!",
threadName, e.getPoolSize(), e.getActiveCount(),
e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(),
e.isTerminated(), e.isTerminating(),
url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg);
dumpJStack();
throw new RejectedExecutionException(msg);
}
private void dumpJStack() {
//省略实现
}
}
private static final class NewThreadRunsPolicy implements
RejectedExecutionHandler {
NewThreadRunsPolicy() {
super();
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
final Thread t = new Thread(r, "Temporary task executor");
t.start();
} catch (Throwable e) {
throw new RejectedExecutionException(
"Failed to start a new thread", e);
}
}
}
package com.kkb.dp.pattern.strategy.ThreadPoolExecutor;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Description: 自定义非阻塞线程池
* @Date: 2008/2/14 13:50
* @Author Joel
**/
@Slf4j
public class CustomNoBlockThreadPoolExecutor {
private ThreadPoolExecutor pool = null;
/**
* 线程池初始化方法
*
* corePoolSize 核心线程池大小----10
* maximumPoolSize 最大线程池大小----30
* keepAliveTime 线程池中超过corePoolSize数目的空闲线程最大存活时间----30+单位
TimeUnit
* TimeUnit keepAliveTime时间单位----TimeUnit.MINUTES
* workQueue 阻塞队列----new ArrayBlockingQueue<Runnable>(10)====10容量的阻塞队

* threadFactory 新建线程工厂----new CustomThreadFactory()====定制的线程工厂
* rejectedExecutionHandler 当提交任务数超过maxmumPoolSize+workQueue之和时,
* 即当提交第41个任务时(前面线程都没有执行完,此测试方法中用
sleep(100)),
* 任务会交给RejectedExecutionHandler来处理
*/
public void init() {
pool = new ThreadPoolExecutor(
10,
30,
30,
TimeUnit.MINUTES,
new LinkedBlockingQueue<Runnable>());
/* pool = new ThreadPoolExecutor(
10,
30,
30,
TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(10));*/
/* pool = new ThreadPoolExecutor(
10,
30,
30,
TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(10),
new CustomThreadFactory(),
new CustomNoBlockRejectedExecutionHandler());*/
}
public void destory() {
if(pool != null) {
pool.shutdownNow();
}
}
private ExecutorService getCustomThreadPoolExecutor() {
return this.pool;
}private class CustomThreadFactory implements ThreadFactory {
private AtomicInteger count = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
String threadName =
CustomNoBlockThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);
System.out.println(threadName);
t.setName(threadName);
return t;
}
}
/**
* 自定义线程池任务拒绝处理器
*/
public class CustomNoBlockRejectedExecutionHandler implements
RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 记录异常
// 报警处理等
log.error("error.............");
}
}
// 测试构造的线程池
public static void main(String[] args) {
CustomNoBlockThreadPoolExecutor exec = new
CustomNoBlockThreadPoolExecutor();
// 1.初始化
exec.init();
ExecutorService pool = exec.getCustomThreadPoolExecutor();
for(int i=1; i<Integer.MAX_VALUE; i++) {
log.info("提交第" + i + "个任务!");
pool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
log.warn("InterruptedException :
{}",e.fillInStackTrace().getMessage());
}
log.info("id 为{},name 为{}的Thread is
running=====",Thread.currentThread().getId(),Thread.currentThread().getName());
}
});
}
/* for(int i=1; i<100; i++) {
log.info("提交第" + i + "个任务!");
pool.execute(new Runnable() {
@Overridepublic void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
log.warn("InterruptedException :
{}",e.fillInStackTrace().getMessage());
//log.warn("InterruptedException :
{}",e.getLocalizedMessage());
//log.warn("InterruptedException :
{}",e.getCause().getMessage());
}
log.info("id 为{},name 为{}的Thread is
running=====",Thread.currentThread().getId(),Thread.currentThread().getName());
}
});
}*/
// 2.销毁----此处不能销毁,因为任务没有提交执行完,如果销毁线程池,任务也就无法执行了
// exec.destory();
try {
Thread.sleep(10000);
log.info("测试任务已完成");
} catch (InterruptedException e) {
log.warn("InterruptedException :
{}",e.fillInStackTrace().getMessage());
}
}
}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 



举报

相关推荐

0 条评论