0
点赞
收藏
分享

微信扫一扫

多线程案例及常用模式

一.单例模式——经典的设计模式

什么是单例模式:就是规定一个类只能创建一个对象,也就是保证某个类在程序中只存在唯一一个实例,而不会创建出多个实例

根据对象创建的时机不同,可以分为饿汉模式和懒汉模式

1.饿汉模式

在类加载时,就创建好那唯一一个对象:

这样可以获得instance这个实例吗?

显然:如果代码这样写,要想获得这个实例,就得先创建一个对象,然后调用getInstance方法,这样的话就不是单例模式了,所以,应该将此方法设定为static方法,直接用类来调用:

可是这样的话,我要是想再创建一个对象,还是能够创建出来,那该怎么办?把构造方法设为private:

这就是完整的饿汉模式了

线程安全问题考虑

饿汉模式是否会涉及到线程安全问题呢?考虑一下:如果多个线程调用getInstance,就会又多个cpu读取内存的过程,这个过程中,没人修改的了该instance的指向,所以就不会涉及到线程安全问题

2.懒汉模式

懒汉模式就是类加载时不急着创建对象,而是在随时要用时才去创建对象

注意,instance还得是static修饰的,只不过不分配内存,省下了创建实例的开销。

线程安全问题考虑

懒汉模式会涉及到线程安全问题吗?答案是会涉及到,当多个线程同时调用getInstance时,起初都判断instance为空,所以都去修改instance指向了,这样,每个线程得到的地址就会不一样,但最终instance只有一个正确的指向。那么如何修改呢?

懒汉模式的多线程版本

保证代码原子性:

上面谈到了多个线程调用时,可能出现误判的情况,这是因为if的判断和new操作不是原子的,所以我们应该给该操作加锁:

这样可以吗?可以的,这里的locker是static的,也就是说,一个类只有一份,可以保证不同的线程是对同一个对象加锁。但是,这样写的话就违背了懒汉模式的初衷了,这样写就又是在类加载时就创建了一个对象,又不能省下开销了,所以该怎么弄锁对象呢?用反射,通过反射来创建该类的对象,对该类的实例进行加锁:

注意反射创建的对象可不是SingleTonLazy类的,而是Class类的!!所以没有违背单例模式。

也可以写为:

降低锁竞争的频率:

如上这样写代码,就会出现一个问题:只要每次一调用getInstance,就必须得先上锁再去判断是否要new,然后还会有锁竞争,开销特别大。但其实,只有第一个创建instance时才会涉及到线程安全问题,以后都回不去修改instance的指向,就不会有线程安全问题,就没必要加锁了。所以,我们可以先判断一下是否为空,如果为空,就可能需要new,就可能有线程安全问题,就得加锁;如果不为空,就不涉及到线程安全问题,就没必要加锁。所以代码修改如下:

解决编译器优化导致的指令重排序问题:

这里,一谈到多线程,就不得不考虑编译器为例提高效率而做出的优化操作(在单线程模式下,编译器锁做出的逻辑判断不会出错,这时为了提高效率而对代码的执行顺序做出的修改也不会出错;但是多线程下,由于看不到另一个现成的工作,所以优化可能会出错)。

我们知道new操作是分为三步进行的,这就可能会触发指令重排序的优化。new操作的三步为:

1.申请内存空间

2.在内存上构造对象

3.把内存地址赋给instance引用

这里,可能会有指令重排序,从123变成132.在单线程模式下没问题,但在多线程模式下就有问题了,132的话,第一个线程的对象还没new出来,就把地址给了instance,这导致第二个线程一位已经有了对象,直接返回了instance,就有可能在线程1执行第三步之前将instance返回,导致拿到了未初始化的非法对象,并去访问它的属性,导致出错

所以要想办法不进行指令重排序,所以给instance加上volatile

最终代码如下:

3.总结

1.instance应该是static修饰的

2.构造方法设为private

3.getInstance设为static保证可以用类调用该方法

4.饿汉模式不涉及到线程安全问题

5.懒汉模式涉及到线程安全问题,解决方案:加锁;在加锁之前判断,避免反复加锁;使用volatile避免指令重排序

二.阻塞队列

阻塞队列是一种特殊的队列,也遵循先进先出的原则。与普通队列不同的是,它是一种线程安全的数据结构,有以下阻塞特性:

1.当队列为满时,继续入队列就会触发阻塞,直到有其他线程从队列中出去

2.当队列为空时,继续出队列就会触发阻塞,知道有其他线程进入队列

该阻塞队列的意义就是实现生产者消费者模型

1.什么是生产者消费者模型

生产者将生产的产品放到仓库中,消费者就从仓库中取产品。当产品放满仓库时,就会阻塞等待停止生产,直到有顾客开始取产品;当消费者把产品取完时,消费者就开始等,直到生产者生产出产品

生产者消费者模型的优点一:解耦合

耦合描述的是俩个模块之间的联系程度,解耦合就是说俩个模块之间的想和影响变小了

例如现在有服务器A和服务器B配合进行工作,A接收到了客户端发出的请求,A把请求发给了B,B再把响应返回给A,此时,若B出现了问题,A就也会受影响,若要再天哥服务器C代替B的工作,A这边的代码就得改动,还是会收到影响,所以说,此时AB的耦合程度就高

但要是A和B 不直接交互,而是通过一个队列来交互,A把任务放到队列中,B从队列中拿取任务。当队列中任务满时,A就不再接任务,直到B把任务处理;当队列中空时,B就不再处理任务,直到A接到了任务;而当B出现问题时,A也不会受影响,此时再添加一个服务器C就可以,也不用去修改A的代码,因为ABC都是在对队列进行操作,三者不会互相影响,这就达到了解耦合的效果

生产者消费者模型的优点二:削峰填谷

还是上面服务器AB的例子

如果没有阻塞队列,那么每个A收到的请求都会立即反映给B,也就是说,A这边扛多少请求量,B就得快速处理多少请求量。但是,不同的服务器做的工作不同,A比较轻松,但B的工作量较大,AB虽然访问量相同,但消耗的硬件资源不同,可能A可以承受得了这些并发量,但B就可能会挂了,就比如B要操作数据库,而数据库就是一个分布式系统中相对脆弱的环节。

但要是有了阻塞队列,B就不需要跟着A的节奏去处理数据了,它可以依然按着自己的节奏慢慢来,虽然速度慢了,但不会把B搞挂了。

与其把B搞挂,不如让它慢慢来。A得到的响应慢,但总好过没有响应。

上述峰值情况不会已知存在,当过了峰值后,A的请求量就会主键回复正常,B就有时间取慢慢处理纪颜的数据了,这就是填谷

削峰填谷就可以保证突发情况下,整个服务系统可以正常运行

2.自己实现阻塞队列

阻塞队列的关键是在普通队列的前提下加上线程安全以及阻塞。我们就用环形数组实现阻塞队列:其实就是之前讲的循环队列,head指向对头,tail指向队尾,从对头删除元素,从队尾添加元素head和tail(包含head不包含tail)之间就是存放元素的空间

put:向队尾添加元素

take:从队首拿取元素

现在,我们已经把put和take的雏形写好了,但是,没有人唤醒线程。谁去唤醒呢?对于put来说,应该是take来唤醒,而对于take来说,应该是put来唤醒

所以在每个方法的最后都应该加一个notifyAll方法

注意,这里最好用notifyAll方法,要是用notify方法,就只能唤醒一个生产者或一个消费者,其他人就永远无法被唤醒了

还有一个问题对于put方法来说,一个线程被唤醒后,队列一定不满吗?不一定,可能在唤醒到blocked的过程中又有其他线程把队列放满了,所以应该在唤醒后再添加一个if判断,如果还满着,就得再锁上:

但是每次出了wait就得再判断一次,那这代码不就是无数个判断吗?所以直接用while循环即可:

同理,take方法可以改写为:

如何使用阻塞队列呢?

3.标准库中的阻塞队列——BlockingQueue

BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue,LinkedBlockingQueue,PriorityBlockingQueue,它继承自Queue这个接口,所以它也有poll,offer等方法,但不建议使用,因为这些方法无阻塞特性。而建议用put,take方法,这些方法有阻塞特性(阻塞式入队列,阻塞式出队列,但没有阻塞式获取队首元素等方法)。

使用方法和上面我们自己实现阻塞队列是一样的

三.定时器

就是达到了某个设定的时间后,就执行某个代码。防止出现死等现象(比如客户端发出请求,但服务器迟迟不响应。是请求没发出去还是服务器出现问题了?不知道,不可以死等)

1.标准库中的定时器

标准克重有一个Timer类,其中有一个核心方法schedule

shedule包含俩个参数,第一个参数指定列即将执行的代码,第二个参数指定多长时间后执行

使用如下:

实际上,主线程执行schedule的时候,就是把该任务给放到了timer对象中,timer对象里面也有一个线程,叫做扫描线程,时间一到,扫描线程就会执行。同时,一个timer可以安排多个任务

2.自己实现定时器

首先timer中要有一个扫描线程,用于扫描轮到了哪个线程执行;其次要有一个数据结构,来存放即将执行的任务;最后,还需要有一个类来描述要执行的任务

首先来想一想应该用什么数据结构呢?如果用ArrayList,就得不停地遍历数组,来判断到哪个任务执行到时间了,会很麻烦,所以应该用一个有顺序的,也就是优先级队列!!!把时间最小的放在队首,这就可用O(1)的时间来回去到时间最小的任务了

首先是一个描述任务的类:

该类描述了这个任务的具体内容以及它的延迟时间。注意,它最终要放到优先级队列中,所以,就应该是可以比较的,也就是要实现comparable接口,注意compareTo方法要进行强制类型转换

然后来看Timer类

首先有一个优先级队列,然后提供一个schedule方法负责将任务入队列,然后我们要搞一个扫描线程,而且一个类只需要一个线程进行扫描,这可以在构造方法中进行

如图,在线程中进行循环扫描队首元素的操作。这里为什么要加锁,主要有以下俩个原因:首先,在该线程中会用到wait操作,其次,这个扫描线程操作的是优先级队列,而刚刚写到schedule方法也是在操作优先级队列,所以不同线程操作同一个变量就会有线程安全问题,所以也要加锁

然后一上来就是判断是否为空,如果队列为空,我们应该咋办呢?那就等到队列不空为止。这里就和阻塞队列一样,用while比反复用if更好更保险

然后看一下队首元素是否达到了执行时间,如果达到了就调用它的run方法,并且将它从队列中删除;如果没达到就什么也不执行,进入下一轮的whlie

这里的else还可以继续优化一下:如果什么也不干,就会反复进行以上判断,直到到了规定的执行时间,在这段时间中,虽然任务没有执行,但cpu也没有停下工作,还是有很大的消耗。所以应该改一下else,让它wait一段时间之后再进行下一轮判断,这段时间就是将要执行任务的时间减去系统时间,代码如下:

这里就可以看出来为什么不用PriorityBlockingQueue而使用普通的优先级队列了,就是因为我们要处理两个wait,而阻塞队列很难实现唤醒两处等待

所以最终的代码如下:

class MyTimerTask implements Comparable<MyTimerTask>{
    //任务执行的时间点
    private long time;
    //要执行的任务
    private Runnable task;
    public MyTimerTask(Runnable run,long delay){
        this.task=run;
        this.time=delay+System.currentTimeMillis();
    }
    @Override
    public int compareTo(MyTimerTask o) {
        return (int)(this.time-o.time);
    }
    public void run(){
        this.task.run();
    }

    public long getTime() {
        return time;
    }

    public Runnable getTask() {
        return task;
    }
}
class MyTimer{
    //使用一个数据结构来安排所有的对象
    private PriorityQueue<MyTimerTask> q=new PriorityQueue<>();
    //把要完成的任务构造成一个对象添加到优先级队列中
    public void schedule(Runnable run,long delay){
        MyTimerTask task=new MyTimerTask(run,delay);
        q.offer(task);
        locker.notify();//唤醒扫描线程
    }
    //需要一个锁对象进行加锁操作
    private Object locker=new Object();
    public MyTimer(){
        Thread t=new Thread(()->{
            //循环进行多轮对队首元素的判定
            while(true){
                synchronized (locker){
                    //循环等待,直到队列不空为止
                    while(q.isEmpty()){
                        try {
                            locker.wait();//这里的等待是因为队列为空,所以它需要用schedule来唤醒
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    MyTimerTask task=q.peek();
                    if(task.getTime()<=System.currentTimeMillis()){
                        task.run();
                        //任务执行完了就应该从队列中删除
                        q.poll();
                    }
                    else{
                        //等待一段时间,避免出现忙等消耗cpu资源的现象
                        try {
                            locker.wait(task.getTime()-System.currentTimeMillis());
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    }
                }

            }
        });
        t.start();
    }

}

四.线程池

为什么要有线程池?首先,有线程是因为进程太重量了;但是要是线程太多,不断地创建销毁线程,也会导致开销加大,也就从轻量变成重量了。那么该如何解决?

一个是使用协程(它是把系统调度过程省略了),但java中协程并不流行;另一个办法就是使用线程池(它可以使线程不至于很慢)

线程池的原理就是在使用线程1的时候就提前把线程2345……都创建好。优化了频繁创建销毁线程的场景

1.标准库中的线程池

标准库中实例化线程池对象用到不是构造方法,而是其他类的一个普通成员方法。这种构造对象的模式叫做工厂模式

工厂模式:

一般情况下,通过new来创建对象,new就会触发类的构造方法,但是构造方法会有局限性,就比如要创建一个点坐标,有俩中创建模式:

public Point(double x,double y);这是通过直角坐标系来创建

public Point(double r,double a);这是通过极坐标的形式来创建

但是,这俩种方法没有构成重载,因为参数类型都一样。这就是构造方法的局限性。但要是使用了普通方法,就可以根据方法名称来区分构造形式,这就是工厂模式。事件中就可以撞门搞一个PointFactory类,里面编写makePointXY方法和makePointRA方法来进行区分

Executors创建线程池的几种形式:

第一种是创建一个固定线程数量的线程池

创建一个只包含单个线程的线程池

创建一个线程数目动态增长的线程池。它会随着往池中加入任务,会根据需要自动创建线程,并且不会立即销毁

创建一个线程池,可以将任务的调度放在给定的延迟时间之后。

由上面的源码可以看出,Executors实际上是ThreadPoolExcutor的工厂,它是对ThreadPoolExecutor的封装。executor表示执行者。

ThreadPoolExecutor的相关方法

构造方法:

先来说一说corePoolSize和maximumPoolSize,说这个之前,我们来举个例子:在一个大厂中,包含了正式员工和实习生,尬场为啥要招实习生呢?就是因为任务太多了,做不过来,所以要招实习生。但是,正式员工可以摸鱼(也就是可以没事干闲着),而实习生不可以摸鱼,也就是说,当任务量逐渐减少时,实习生们就没事干了,就会进行裁员,裁的就是那些摸鱼实习生。线程池也是这样corePoolSize就是正式员工的数量,它们永远不会被裁员,maximumPoolSize就是正式员工加实习生的数量,也就是线程池最大的容量。

keepAliveTime就是允许实习生摸鱼的时间,当达到上限时,这些线程就会被销毁

unit是时间的单位

workQueue这是一个阻塞队列,用来存放线程池中的任务。这里的阻塞队列可以根据需要进行选择,如果是需要优先级,就选择PriorityBlockingQueue;如果不需优先级,且是顺序执行,就可用ArrayBlockingQueue;还有就是如果得跳跃执行,就是用LinkedBlockingQueue

threadFactory:这是一个线程工厂,它是什么时候被用呢?就是当Executors在创建线程时要用到这个工厂,如下是它的源码:

最后一个参数是RejectedExecutionHandler叫做线程池的拒绝策略:线程池能够容纳的线程数是有限的,当达到上限时,就要进行拒绝

线程池的拒绝策略:

有以上四种:

AbortPolicy:直接抛异常

CallerRunsPolicy:新加的任务让添加任务的线程执行

DiscardOidestPolicy:抛弃线程池中最早添加到任务,然后执行新任务

DiscardPolicy:直接抛弃新任务,不执行

线程池中设置存放多少个线程更加合适

假设有N个cpu逻辑核心,放N个?N+1个?2N个?无法确定

当所有代码为cpu密集型时(就是需要进行各种算术运算逻辑判断)线程数不应超过N,超过了也无法提高效率

当所有代码为IO密集型时(就是总是进行读硬盘,IO操作)就用大于N,因为这些操作不吃cpu,一个核心可通过调度来并发执行。

所以正确的做法是进行试验,对程序进行性能测试,验证需要多少个线程数

2.自己实现线程池

首先要有一个阻塞队列,用来存放提交的任务,然后就是任务的提交操作

假设我们是在创建一个有固定线程数目的线程池,那么这个指定数目应该在构造方法中体现,同时还得有一个类,专门描述这些线程要干的任务(就是不停的从队列中拿取任务并进行运行)。先来看看如何用一个类来描述每一个线程的任务:

本来,我是想着给run一个参数,到时候调用run时将线程池中存放任务的队列传给工作线程,然后让工作线程从里面取任务。但突然想到,这个run方法是重写自Thread的,不能随便修改参数,所以我们要另辟蹊径,办法如下:

我们可以像上面一样,在调用worker构造方法时,将线程池里面的阻塞队列当作参数传给worker即可。

那么最后就是线程池的构造方法啦:

创建出n个工作线程,并让它们开始工作

这里可以优化一下:就是用一个数据结构将这些工作线程也组织起来,如下:

那么线程池如何使用呢?如下:

哎?发现打印操作报错啦,为什么?这就是变量捕获,所以不能直接使用i,而应如下操作:

最终,自定义线程池的代码就如下:

class MyThreadPool{
    //用阻塞队列将添加的任务组织起来
    private BlockingQueue<Runnable> queue=new LinkedBlockingQueue<>();
    //添加任务
    public void submit(Runnable runnable) throws InterruptedException {
        queue.put(runnable);
    }
    //将工作线程组织起来
    private List<WorkerThread> workers=new LinkedList<>();
    //创建出指定数目的工作线程
    public MyThreadPool(int n){
        for(int i=0;i<n;i++){
            WorkerThread workerThread=new WorkerThread(this.queue);
            workerThread.start();
            workers.add(workerThread);
        }
    }
    //一个类,用来描述每个工作线程要做什么
    static class WorkerThread extends Thread{
        private BlockingQueue<Runnable> queue=null;
        public WorkerThread(BlockingQueue<Runnable> queue){
            this.queue=queue;
        }
        //反复从队列中拿取元素,当队列为空时,就阻塞等待
        @Override
        public void run() {
            while(true){
                try {
                    Runnable runnable=queue.take();
                    runnable.run();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
public class Test1 {
    public static void main(String[] args) throws InterruptedException {
        MyThreadPool pool=new MyThreadPool(10);
        for(int i=0;i<1000;i++){
            int id=i;
            pool.submit(()->{
                System.out.println("执行任务"+id);
            });
        }
    }
}

举报

相关推荐

0 条评论