0
点赞
收藏
分享

微信扫一扫

多线程案例

目录

一、单例模式

1,什么是单例模式

2,单例模式分类

二,生产者消费者模型

1,阻塞队列是什么

2,标准库中的阻塞队列

3,生产者消费者模型

三、定时器

1,定时器是什么

2,标准库中的定时器

3,实现定时器

四、线程池

1,线程池是什么

2,标准库中的线程池

3,实现线程池


一、单例模式

1,什么是单例模式

2,单例模式分类

饿汉模式

类加载的同时, 创建实例.

//先创建一个表示单例的类
    //我们要求Singletion这个类只能有一个实例
    //饿汉模式,“饿”是指类被加载,实例就会被创建
    static class Singletion{
        //把构造方法变为私有,此时该类外部就无法new这个类的实例
        private Singletion(){}

        //创建一个static的成员,表示该类的唯一实例
        private static Singletion instance = new Singletion();

        public static Singletion getInstance() {
            return instance;
        }
    }

    public static void main(String[] args) {
        Singletion s1 = Singletion.getInstance();
        Singletion s2 = Singletion.getInstance();
        System.out.println(s1 == s2);
    }

懒汉模式

class Singleton {
    private static Singleton instance = null;
    private Singleton() {}
    public static Singleton getInstance() {
        if (instance == null) {
            instance = new Singleton();
       }
        return instance;
   }
}

 当一个类被new两次,就不是单例模式了,如果线程更多,new的次数可能更多,不符合单例模式

static class Singletion{
        private Singletion() { }
        private static Singletion instance = null;

        public static Singletion getInstance(){
            if (instance == null){
                instance = new Singletion();
            }
            return instance;
        }

    }

保证线程安全,我们选择加锁

static class Singletion{
        private Singletion() { }
        private static Singletion instance = null;

        public static Singletion getInstance(){
            synchronized (Singletion.class){
                if (instance == null){
                    instance = new Singletion();
                }
                return instance;
            }
        }
    }

 

当多线程首次调用 getInstance, 大家可能都发现 instance 为 null, 于是又继续往下执行来竞争锁, 其中竞争成功的线程, 再完成创建实例的操作

当这个实例创建完了之后, 其他竞争到锁的线程就被里层 if 挡住了. 也就不会继续创建其他实例

 static class Singletion{
        private Singletion() { }
        private static Singletion instance = null;

        public static Singletion getInstance(){
            if (instance == null){
                synchronized (Singletion.class){
                    if (instance == null){
                        instance = new Singletion();
                    }
                }
            }
            return instance;
        }
    }
}

 涉及多个读操作,可能会被编译器优化,为了避免 "内存可见性" 导致读取的 instance 出现偏差, 于是补充上 volatile。

static class Singletion{
        private Singletion() { }
        private volatile static Singletion instance = null;

        public static Singletion getInstance(){
            if (instance == null){
                synchronized (Singletion.class){
                    if (instance == null){
                        instance = new Singletion();
                    }
                }
            }
            return instance;
        }

    }

二,生产者消费者模型

1,阻塞队列是什么

2,标准库中的阻塞队列

BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// 入队列
queue.put("abc");
// 出队列. 如果没有 put 直接 take, 就会阻塞. 
String elem = queue.take();

3,生产者消费者模型

static class BlockingQueue{
        //基于链表
        //基于数组
        private int[] array = new int[100];
        private volatile int head = 0;
        private volatile int tail = 0;
        //head和tail构成的是一个前闭后开的区间
        //当两者重合的时候,可能表示队列空,也可能是表示队列满
        //为了区分空还是满,需要额外引入一个size来表示

        private volatile int size = 0;
        //入队列 出队列 取队首元素
        public void put(int value) throws InterruptedException {
            synchronized (this){
                while (size == array.length){
                    wait();
                }

                //把value放在队尾即可
                array[tail] = value;
                tail++;
                if (tail == array.length){
                    tail = 0;
                }
                size++;
                notify();
            }
        }

        public int take() throws InterruptedException {
            int ret = -1;
            synchronized (this) {
                while (size == 0) {
                    this.wait();
                }
                ret = array[head];
                head++;
                if (head == array.length) {
                    head = 0;
                }
                size--;
                notify();
            }
            return ret;
        }
    }

    public static void main(String[] args) {
        BlockingQueue blockingQueue = new BlockingQueue();
        //第一次,让消费者消费的快一些,生产者生产的慢一些
        //就会看到消费者阻塞等待,每次有新的生产者生产元素的时候,消费者才能消费
        //第二次,让消费者消费的慢一些,生产者生产的快一些
        //就会看到生产者往队列中插入元素,插入元素满了之后就会阻塞等待
        //随后消费者每次消费一个元素,生产才能产生新的元素
        Thread producer = new Thread(){
            @Override
            public void run() {
                for (int i = 0; i < 10000; i++){
                    try {
                        blockingQueue.put(i);
                        System.out.println("生产元素:" + i);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        producer.start();
        Thread consumer = new Thread(){
            @Override
            public void run() {
                while (true){
                    try {
                        int ret = blockingQueue.take();
                        System.out.println("消费元素:" + ret);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        consumer.start();
    }
public static void main(String[] args) {
        BlockingQueue blockingQueue = new BlockingQueue();
        //第一次,让消费者消费的快一些,生产者生产的慢一些
        //就会看到消费者阻塞等待,每次有新的生产者生产元素的时候,消费者才能消费
        //第二次,让消费者消费的慢一些,生产者生产的快一些
        //就会看到生产者往队列中插入元素,插入元素满了之后就会阻塞等待
        //随后消费者每次消费一个元素,生产才能产生新的元素
        Thread producer = new Thread(){
            @Override
            public void run() {
                for (int i = 0; i < 10000; i++){
                    try {
                        blockingQueue.put(i);
                        System.out.println("生产元素:" + i);
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        producer.start();
        Thread consumer = new Thread(){
            @Override
            public void run() {
                while (true){
                    try {
                        int ret = blockingQueue.take();
                        System.out.println("消费元素:" + ret);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        consumer.start();
    }

or (int i = 0; i < 10000; i++){
                    try {
                        blockingQueue.put(i);
                        System.out.println("生产元素:" + i);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        producer.start();
        Thread consumer = new Thread(){
            @Override
            public void run() {
                while (true){
                    try {
                        int ret = blockingQueue.take();
                        System.out.println("消费元素:" + ret);
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        consumer.start();
    }

三、定时器

1,定时器是什么

定时器也是软件开发中的一个重要组件. 类似于一个 "闹钟". 达到一个设定的时间之后, 就执行某个指定好的代码.

2,标准库中的定时器

        标准库中提供了一个 Timer 类. Timer 类的核心方法为 schedule .

        schedule 包含两个参数. 第一个参数指定即将要执行的任务代码, 第二个参数指定多长时间之后 执行 (单位为毫秒).

Timer timer = new Timer();
timer.schedule(new TimerTask() {
    @Override
    public void run() {
        System.out.println("hello");
   }
}, 3000);

3,实现定时器

import java.util.concurrent.PriorityBlockingQueue;

public class Thread4 {

    //优先级队列的元素必须是可比较的
    //比较规则有两种:
    //1,让Task实现Comparable接口
    //2,让优先级队列构造的时候,传入一个对象比较器
    static class Task implements Comparable<Task>{
        //Runnable中有一个run方法,借助run方法来执行具体的任务是啥
        private Runnable command;
        //time表示啥时候来执行command,是一个绝对时间
        private long time;
        //构造方法的参数
        public Task(Runnable command,long after){
            this.command = command;
            this.time = System.currentTimeMillis() + after;
        }

        //具体任务的逻辑
        public void run(){
            command.run();
        }

        @Override
        public int compareTo(Task o) {
            return (int) (this.time - o.time);
        }
    }

    static class Worker extends Thread{
        private PriorityBlockingQueue<Task> queue = null;
        private Object mailBox = null;

        public Worker(PriorityBlockingQueue<Task> queue, Object mailBox){
            this.queue = queue;
            this.mailBox = mailBox;
        }

        @Override
        public void run() {
            while (true){
                try{
                    //1,取出队首元素
                    Task task = queue.take();
                    //2,检查当前任务时间是否到了
                    long curTime = System.currentTimeMillis();
                    if (task.time > curTime){
                        //时间还没到,把任务塞回队列中
                        queue.put(task);
                        synchronized (mailBox){
                            mailBox.wait(task.time - curTime);
                        }
                    }else {
                        task.run();
                    }
                }catch (InterruptedException e){
                    e.printStackTrace();
                    break;
                }
            }
        }
    }

    static class Timer{
        //为了避免忙等,需要使用wait方法
        //使用一个单独的对象来辅助wait
        //使用this也行
        private Object mailBox = new Object();
        //定时器的基本组成
        //1,用一个类来描述队伍
        //2.用一个阻塞优先级队列来组织若干任务,让队首元素就是时间最早的任务,如果队首元素时间未到,其他元素也不能执行
        //3.用一个线程来循环扫描当前阻塞队列的队首元素,如果时间到了,就让执行指定任务
        private PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>();

        public Timer(){
            //创建线程
            Worker worker = new Worker(queue,mailBox);
            worker.start();

        }
        //4.还需要提供一个方法,让调用者把任务给“安排“进来
        public void schedule(Runnable command,long after){
            Task task = new Task(command,after);
            queue.put(task);
            synchronized (mailBox){
                mailBox.notify();
            }
        }
    }

    public static void main(String[] args) {
        Timer timer = new Timer();
        timer.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println("he he");
                timer.schedule(this,2000);
            }
        },2000);
    }


}

四、线程池

1,线程池是什么

2,标准库中的线程池

Executors 创建线程池的几种方式

3,实现线程池

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Thread5 {

    //使用一个类来描述这个工作线程
    static class Worker extends Thread{
        private int id = 0;
        //每个Worker线程都需要从任务队列中取任务
        //需要获取任务队列的实例
        private BlockingQueue<Runnable> queue = null;

        public Worker(BlockingQueue<Runnable> queue,int id){
            this.queue = queue;
            this.id = id;
        }

        @Override
        public void run() {
           //此处的try把while包裹了,目的是只要线程收到异常,就立刻结束run方法(也是结束线程)
            try {
                while (!Thread.currentThread().isInterrupted()){
                    Runnable command = queue.take();
                    System.out.println("thread: " + id + "running...");
                    command.run();
                }
            }catch (InterruptedException e){
                System.out.println("线程被终止");
            }
        }
    }

    //本质就是生产者消费模型
    //调用execute的代码就是生产者,生产了任务
    //worker就是消费者,消费队列中的任务
    //交易产所就是BlockingQueue
    static class MyThreadPool{
        //阻塞队列组织若干任务
        private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
        //List组织若干线程
        private List<Worker> workers = new ArrayList<>();
        //先假设有10个线程容量
        private static final int maxWorkerCount = 10;

        //实现execute方法和shutdown方法
        public void execute(Runnable command) throws InterruptedException {
            //线程数目较少的时候,创建新的线程为工作线程
            //线程足够多,达到阈值,不用创建新线程
            if (workers.size() < maxWorkerCount){
                Worker worker = new Worker(queue,workers.size());
                worker.start();
                workers.add(worker);
            }
            queue.put(command);
        }

        //当shutdown结束,所有线程结束
        public void shutdown() throws InterruptedException {
            //终止所有线程
            for (Worker worker : workers){
                worker.interrupt();
            }
            //还需要等待每个线程执行结束
            for (Worker worker : workers){
                worker.join();
            }
        }

        static class Command implements Runnable{
            private int num;
            public Command(int num){
                this.num = num;
            }

            @Override
            public void run() {
                System.out.println("正在执行任务:" + num);
            }
        }

        public static void main(String[] args) throws InterruptedException {
            MyThreadPool myThreadPool = new MyThreadPool();
            for (int i = 0; i < 1000; i++){
                myThreadPool.execute(new Command(i));
            }
            Thread.sleep(2000);
            myThreadPool.shutdown();
            System.out.println("线程已经被销毁");
        }

    }

}
举报

相关推荐

0 条评论