0
点赞
收藏
分享

微信扫一扫

多线程使用

weednoah 2022-04-13 阅读 75
java

文章目录

1.多线程基础

CPU执行代码都是一条一条顺序执行的,单核cpu也可以同时运行多个任务,因为操作系统执行多任务实际上就是让CPU对多个任务轮流交替执行。

对于多核CPU,通常任务的数量远远多于CPU的核数,所以任务也是交替执行的。

1.1 进程

在计算机中,我们把一个任务称为一个进程,某些进程内部还需要同时执行多个子任务,我们把子任务称为线程。

进程和线程的关系就是:一个进程可以包含一个或多个线程,但至少会有一个线程。

操作系统调度的最小任务单位其实不是进程,而是线程。常用的Windows、Linux等操作系统都采用抢占式多任务,如何调度线程完全由操作系统决定,程序自己不能决定什么时候执行,以及执行多长时间。

因为同一个应用程序,既可以有多个进程,也可以有多个线程,因此,实现多任务的方法,有以下几种:
(1)多进程模式(每个进程只有一个线程):
(2)多线程模式(一个进程有多个线程):
(3)多进程+多线程模式(复杂度最高):

1.2 进程 vs 线程

进程和线程是包含关系,但是多任务既可以由多进程实现,也可以由单进程内的多线程实现,还可以混合多进程+多线程。

具体采用哪种方式,要考虑到进程和线程的特点。

和多线程相比,多进程的缺点在于:

创建进程比创建线程开销大,尤其是在Windows系统上;
进程间通信比线程间通信要慢,因为线程间通信就是读写同一个变量,速度很快。
而多进程的优点在于:

多进程稳定性比多线程高,因为在多进程的情况下,一个进程崩溃不会影响其他进程,而在多线程的情况下,任何一个线程崩溃会直接导致整个进程崩溃。

1.3 多线程

Java语言内置了多线程支持:一个Java程序实际上是一个JVM进程,JVM进程用一个主线程来执行main()方法,在main()方法内部,我们又可以启动多个线程。此外,JVM还有负责垃圾回收的其他工作线程等。

因此,对于大多数Java程序来说,我们说多任务,实际上是说如何使用多线程实现多任务。

和单线程相比,多线程编程的特点在于:多线程经常需要读写共享数据,并且需要同步。例如,播放电影时,就必须由一个线程播放视频,另一个线程播放音频,两个线程需要协调运行,否则画面和声音就不同步。因此,多线程编程的复杂度高,调试更困难。

Java多线程编程的特点又在于:

多线程模型是Java程序最基本的并发模型;
后续读写网络、数据库、Web开发等都依赖Java多线程模型。

2.创建新线程

Java语言内置了多线程支持。当Java程序启动的时候,实际上是启动了一个JVM进程,然后,JVM启动主线程来执行main()方法。在main()方法中,我们又可以启动其他线程。

package com.fei.hanhanhanhan;

public class T1 {
    public static void main(String[] args) {
        System.out.println("main start...");

        // Thread t = new Thread();
        // Thread t = new MyThread();
        // Thread t = new Thread(new MyRunnable());
        Thread t = new Thread() {
            public void run() {
                System.out.println("thread run...");
                try {
                    Thread.sleep(10);  // 传入的参数是毫秒
                } catch (InterruptedException e) {}
                System.out.println("thread end.");
            }
        };
        // 内部调用private native void start0()方法,native修饰符表示这个方法由JVM虚拟机内部的C代码实现的,不是由Java代码实现的
        // 一个线程对象只能调用一次start()方法
        t.setPriority(10) // 对线程设定优先级, 1~10, 默认值5
        Thread.currentThread().setPriority(1);  // 对线程设定优先级, 1~10, 默认值5
        t.start();

        System.out.println("main end...");
    }
}

class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("start new thread!");
    }
}

3.线程的状态

3.1 Java线程的状态

New:新创建的线程,尚未执行;
Runnable:运行中的线程,正在执行run()方法的Java代码;
Blocked:运行中的线程,因为某些操作被阻塞而挂起;
Waiting:运行中的线程,因为某些操作在等待中;
Timed Waiting:运行中的线程,因为执行sleep()方法正在计时等待;
Terminated:线程已终止,因为run()方法执行完毕。

当线程启动后,它可以在Runnable、Blocked、Waiting和Timed Waiting这几个状态之间切换,直到最后变成Terminated状态,线程终止。

3.2 线程终止的原因

线程正常终止:run()方法执行到return语句返回;
线程意外终止:run()方法因为未捕获的异常导致线程终止;
对某个线程的Thread实例调用stop()方法强制终止(强烈不推荐使用)。

3.3 join() 等待另一个线程运行结束

package com.fei.hanhanhanhan;

public class T2 {
    public static void main(String[] args) throws Exception {
        System.out.println("start.......");
        Thread t1 = new Thread() {
            public void run() {
                System.out.println("t1........");
            }
        };
        t1.start();
        // 等待该线程结束,然后才继续往下执行自身线程
        // 也可以指定一个等待时间,超过等待时间后就不再继续等待
        t1.join();
        System.out.println("end........");
    }
}

4.中断线程

4.1 中断线程

package com.fei.hanhanhanhan;

public class T3 {
    public static void main(String[] args) throws Exception {
        Thread t1 = new Thread() {
            public void run() {
                int n = 0;
                while (! isInterrupted()) {
                    n ++;
                    System.out.println(n + " hello!");
                }
            }
        };
        t1.start();
        Thread.sleep(1); // 暂停1毫秒
        t1.interrupt(); // 发出了“中断请求”
        t1.join();  // 等待结束再运行
        System.out.println("end");
    }

}

4.2 中断处于等待中的线程

t1线程对象再等待hello线程结束,此时收到了主线程的中断请求,因此hello.join()会立刻抛出异常。

package com.fei.hanhanhanhan;

public class T3 {
    public static void main(String[] args) throws Exception {
        Thread t1 = new Thread() {
            public void run() {
                Thread hello = new HelloThread();
                hello.start(); // 启动hello线程
                try {
                    hello.join(); // 等待hello线程结束
                } catch (InterruptedException e) {
                    System.out.println("interrupted!");
                }
                hello.interrupt();
            }
        };
        t1.start();
        Thread.sleep(1000); // 暂停1毫秒
        t1.interrupt(); // 发出了“中断请求”
        t1.join();  // 等待结束再运行
        System.out.println("end");
    }

}

class HelloThread extends Thread {
    public void run() {
        int n = 0;
        while (!isInterrupted()) {
            n++;
            System.out.println(n + " hello!");
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                break;
            }
        }
    }
}

4.3 设置标志位

package com.fei.hanhanhanhan;

public class T3 {
    public static void main(String[] args) throws Exception {
        HelloThread t = new HelloThread();
        t.start();
        Thread.sleep(1); // 暂停1毫秒
        t.running = false;  // 标志位置为false
    }

}

class HelloThread extends Thread {
    public volatile boolean running = true;  // 线程间共享变量
    public void run() {
        int n = 0;
        while (running) {
            n ++;
            System.out.println(n + " hello!");
        }
        System.out.println("end!");
    }
}

4.4 volatile 关键字:线程可见性

线程间共享变量需要使用volatile(ˈvɒlətaɪl‘)关键字标记,确保每个线程都能读取到更新后的变量值。

为什么要对线程间共享的变量用关键字volatile声明?这涉及到Java的内存模型。在Java虚拟机中,变量的值保存在主内存中,但是,当线程访问变量时,它会先获取一个副本,并保存在自己的工作内存中。如果线程修改了变量的值,虚拟机会在某个时刻把修改后的值回写到主内存,但是,这个时间是不确定的!

这会导致如果一个线程更新了某个变量,另一个线程读取的值可能还是更新前的。例如,主内存的变量a = true,线程1执行a = false时,它在此刻仅仅是把变量a的副本变成了false,主内存的变量a还是true,在JVM把修改后的a回写到主内存之前,其他线程读取到的a的值仍然是true,这就造成了多线程之间共享的变量不一致。

因此,volatile关键字的目的是告诉虚拟机:

每次访问变量时,总是获取主内存的最新值;
每次修改变量后,立刻回写到主内存。
volatile关键字解决的是可见性问题:当一个线程修改了某个共享变量的值,其他线程能够立刻看到修改后的值。

如果我们去掉volatile关键字,运行上述程序,发现效果和带volatile差不多,这是因为在x86的架构下,JVM回写主内存的速度非常快,但是,换成ARM的架构,就会有显著的延迟。

5.守护线程

守护线程是指为其他线程服务的线程。在JVM中,所有非守护线程都执行完毕后,无论有没有守护线程,虚拟机都会自动退出。因此,JVM退出时,不必关心守护线程是否已结束。

守护线程不能持有任何需要关闭的资源,例如打开文件等,因为虚拟机退出时,守护线程没有任何机会来关闭文件,这会导致数据丢失。

如何创建守护线程呢?方法和普通线程一样,只是在调用start()方法前,调用setDaemon(true)把该线程标记为守护线程:

package com.fei.hanhanhanhan;

import java.time.LocalTime;

public class T4 {
    public static void main(String[] args) {
        Thread t1 = new Thread() {
            public void run() {
                while (true) {
                    System.out.println(LocalTime.now());
                }
            }
        };
        Thread t2 = new Thread() {
            public void run()  {
                try {
                    Thread.sleep(5000);
                } catch (Exception e) {}
            }
        };
        t1.setDaemon(true);  
        t1.start();    // 守护进程,当非守护进程在5s结束后也会结束
        t2.start();   // 非守护进程
    }
}

6.线程同步

6.1 synchronized关键字:原子性

如果多个线程同时读写共享变量,会出现数据不一致的问题。

多线程模型下,要保证逻辑正确,对共享变量进行读写时,必须保证一组指令以原子方式执行:某一个线程执行时,其他线程必须等待:

这种加锁和解锁之间的代码块我们称之为临界区(Critical Section),任何时候临界区最多只有一个线程能执行。
可见,保证一段代码的原子性就是通过加锁和解锁实现的。Java程序使用synchronized(sɪŋkrənaɪzd)关键字对一个对象进行加锁:

注意加锁对象必须是同一个实例;

package com.fei.hanhanhanhan;

import java.time.LocalTime;

public class T4 {
    public static void main(String[] args) throws Exception {
        Thread add = new AddThread();
        Thread dec = new DecThread();
        add.start();
        dec.start();
        add.join();
        dec.join();
        System.out.println(Counter.count);
    }
}

class Counter {
    public static final Object lock = new Object();   // 设置一个对象作为lock
    public static int count = 0;    
}

class AddThread extends Thread {
    public void run() {
        for (int i=0; i<10000; i++) {
            synchronized(Counter.lock) {
                Counter.count += 1;
            }
        }
    }
}

class DecThread extends Thread {
    public void run() {
        for (int i=0; i<10000; i++) {
            synchronized(Counter.lock) {
                Counter.count -= 1;
            }
        }
    }
}

使用synchronized解决了多线程同步访问共享变量的正确性问题。但是,它的缺点是带来了性能下降。因为synchronized代码块无法并发执行。此外,加锁和解锁需要消耗一定的时间,所以,synchronized会降低程序的执行效率。

6.1 不需要synchronized的操作

JVM规范定义了几种原子操作:

基本类型(long和double除外)赋值,例如:int n = m;
引用类型赋值,例如:List list = anotherList。

long和double是64位数据,JVM没有明确规定64位赋值操作是不是一个原子操作,不过在x64平台的JVM是把long和double的赋值作为原子操作实现的。

class Pair {
    int first;
    int last;
    public void set(int first, int last) {
        synchronized(this) {
            this.first = first;
            this.last = last;
        }
    }
}

但是,如果是多行赋值语句,就必须保证是同步操作

7.同步方法

7.1 synchronized逻辑封装

我们知道Java程序依靠synchronized对线程进行同步,使用synchronized的时候,锁住的是哪个对象非常重要。

让线程自己选择锁对象往往会使得代码逻辑混乱,也不利于封装。更好的方法是把synchronized逻辑封装起来。

package com.fei.han;

public class Counter {
    private int count = 0;

    public void add(int n) {
        synchronized(this) {
            count += n;
        }
    }

    public void dec(int n) {
        synchronized(this) {
            count -= n;
        }
    }

    public int get() {
        return count;
    }
}

package com.fei.han;

public class T1 {
    public static void main(String[] args) throws Exception {
        Counter counter = new Counter();
        Thread m1 = new MyThread1(counter);
        Thread m2 = new MyThread2(counter);
        m1.start();
        m2.start();
        m1.join();
        m2.join();
        System.out.println(counter.get());
    }
}

class MyThread1 extends Thread {
    Counter counter;

    MyThread1(Counter counter) {
        this.counter = counter;
    }

    public void run() {
        counter.add(100);
    }
}

class MyThread2 extends Thread {
    Counter counter;

    MyThread2(Counter counter) {
        this.counter = counter;
    }

    public void run() {
        counter.dec(1);
    }
}

7.2 线程安全

如果一个类被设计为允许多线程正确访问,我们就说这个类就是“线程安全”的(thread-safe),一个类没有特殊说明,默认不是thread-safe;

上面的Counter类就是线程安全的。Java标准库的java.lang.StringBuffer也是线程安全的。

还有一些不变类,例如String,Integer,LocalDate,它们的所有成员变量都是final,多线程同时访问时只能读不能写,这些不变类也是线程安全的。

最后,类似Math这些只提供静态方法,没有成员变量的类,也是线程安全的。

除了上述几种少数情况,大部分类,例如ArrayList,都是非线程安全的类,我们不能在多线程中修改它们。但是,如果所有线程都只读取,不写入,那么ArrayList是可以安全地在线程间共享的。

7.3 synchronized修饰普通方法

当我们锁住的是this实例时,实际上可以用synchronized修饰这个方法。下面两种写法是等价的:

public void add(int n) {
    synchronized(this) { // 锁住this
        count += n;
    } // 解锁
}
public synchronized void add(int n) { // 锁住this
    count += n;
} // 解锁

因此,用synchronized修饰的方法就是同步方法,它表示整个方法都必须用this实例加锁。

7.4 synchronized修饰static方法

对于static方法,是没有this实例的,因为static方法是针对类而不是实例。但是我们注意到任何一个类都有一个由JVM自动创建的Class实例,因此,对static方法添加synchronized,锁住的是该类的Class实例。上述synchronized static方法实际上相当于:

public class Counter {
    public static void test(int n) {
        synchronized(Counter.class) {
            ...
        }
    }
}

8.死锁

8.1 可重入锁

public class Counter {
    private int count = 0;

    public synchronized void add(int n) {  // 获取对象锁
        if (n < 0) {
            dec(-n);  // 再次获取对象锁
        } else {
            count += n;
        }
    }

    public synchronized void dec(int n) {
        count += n;
    }
}

JVM允许同一个线程重复获取同一个锁,这种能被同一个线程反复获取的锁,就叫做可重入锁。

由于Java的线程锁是可重入锁,所以,获取锁的时候,不但要判断是否是第一次获取,还要记录这是第几次获取。每获取一次锁,记录+1,每退出synchronized块,记录-1,减到0的时候,才会真正释放锁。

8.2 死锁

一个线程可以获取一个锁后,再继续获取另一个锁。例如:

public void add(int m) {
    synchronized(lockA) { // 获得lockA的锁
        this.value += m;
        synchronized(lockB) { // 获得lockB的锁
            this.another += m;
        } // 释放lockB的锁
    } // 释放lockA的锁
}

public void dec(int m) {
    synchronized(lockB) { // 获得lockB的锁
        this.another -= m;
        synchronized(lockA) { // 获得lockA的锁
            this.value -= m;
        } // 释放lockA的锁
    } // 释放lockB的锁
}

两个线程各自持有不同的锁,然后各自试图获取对方手里的锁,造成了双方无限等待下去,这就是死锁。死锁发生后,没有任何机制能解除死锁,只能强制结束JVM进程。

那么我们应该如何避免死锁呢?答案是:线程获取锁的顺序要一致。即严格按照先获取lockA,再获取lockB的顺序,改写dec()方法如下:

public void dec(int m) {
    synchronized(lockA) { // 获得lockA的锁
        this.value -= m;
        synchronized(lockB) { // 获得lockB的锁
            this.another -= m;
        } // 释放lockB的锁
    } // 释放lockA的锁
}

9.wait和notify

synchronized解决了多线程竞争的问题,但是并没有解决多线程协调的问题。

在synchronized内部可以调用wait()使线程进入等待状态;

在synchronized内部可以调用notify()随机唤醒一个或notifyAll()唤醒所有等待线程,被唤醒的线程需要重新获得锁后才能继续执行。

package com.fei.hanh;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;

class TaskQueue {
    Queue<String> queue = new LinkedList<>();

    public synchronized void addTask(String s) {
        this.queue.add(s);
        this.notifyAll();  // 唤醒其他线程
    }

    public synchronized String getTask() throws InterruptedException {
        while (queue.isEmpty()) {
            this.wait();  // 释放this锁,并进入等待   当被唤醒后重新获取this锁
        }
        return queue.remove();
    }
}

public class T1 {
    public static void main(String[] args) throws InterruptedException {
        TaskQueue q = new TaskQueue();
        List<Thread> ts = new ArrayList<Thread>();
        for (int i=0; i<5; i++) {
            Thread t = new Thread() {
                public void run() {
                    // 执行task:
                    while (true) {
                        try {
                            String s = q.getTask();
                            System.out.println("execute task: " + s);
                        } catch (InterruptedException e) {
                            return;
                        }
                    }
                }
            };
            t.start();
            ts.add(t);
        }
        Thread add = new Thread(() -> {
            for (int i=0; i<10; i++) {
                // 放入task:
                String s = "t-" + Math.random();
                System.out.println("add task: " + s);
                q.addTask(s);
                try { Thread.sleep(100); } catch(InterruptedException e) {}
            }
        });
        add.start();
        add.join();
        Thread.sleep(100);
        for (Thread t : ts) {
            t.interrupt();
        }
    }
}

10.使用ReentrantLock和Condition

从Java 5开始,引入了一个高级的处理并发的java.util.concurrent包,它提供了大量更高级的并发功能,能大大简化多线程程序的编写。

我们知道Java语言直接提供了synchronized关键字用于加锁,但这种锁一是很重,二是获取时必须一直等待,没有额外的尝试机制。

java.util.concurrent.locks包提供的ReentrantLock用于替代synchronized加锁

package com.fei.hanh;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class TaskQueue {
    private final Lock lock = new ReentrantLock();   // 代替 synchronized
    private final Condition condition = lock.newCondition();  // 从Lock对象获取Condition对象,代替wait()和notify()
    private Queue<String> queue = new LinkedList<>();

    public void addTask(String s) {
        lock.lock();  // 上锁
        try {
            queue.add(s);
            condition.signalAll();  // 唤醒
        } finally {
            lock.unlock();  // 释放锁
        }
    }

    public String getTask() throws Exception {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                if (condition.await(10, TimeUnit.SECONDS)) {  // 等待10s
                } else {
                    return "-1";  // 如果10s还没有被唤醒就直接返回-1
                }
                condition.await(10, TimeUnit.SECONDS);
            }
            return queue.remove();
        } finally {
            lock.unlock();
        }
    }
}

11.使用ReadWriteLock

ReentrantLock使得在读和写时只能有一个线程占有资源。但是如果很多个线程只读取数据,不修改数据(如论坛加载评论)就不会导致读取的数据出现逻辑错误。这种读多写少的情况可以使用ReadWriteLock。

只允许一个线程写入(其他线程既不能写入也不能读取);
没有写入时,多个线程允许同时读(提高性能)。

public class Counter {
    private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
    private final Lock rlock = rwlock.readLock();
    private final Lock wlock = rwlock.writeLock();
    private int[] counts = new int[10];

    public void inc(int index) {
        wlock.lock(); // 加写锁
        try {
            counts[index] += 1;
        } finally {
            wlock.unlock(); // 释放写锁
        }
    }

    public int[] get() {
        rlock.lock(); // 加读锁
        try {
            return Arrays.copyOf(counts, counts.length);
        } finally {
            rlock.unlock(); // 释放读锁
        }
    }
}

12.使用StampedLock

如果我们深入分析ReadWriteLock,会发现它有个潜在的问题:如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,即读的过程中不允许写,这是一种悲观的读锁。

要进一步提升并发执行效率,Java 8引入了新的读写锁:StampedLock。

StampedLock和ReadWriteLock相比,改进之处在于:读的过程中也允许获取写锁后写入!这样一来,我们读的数据就可能不一致,所以,需要一点额外的代码来判断读的过程中是否有写入,这种读锁是一种乐观锁。

乐观锁的意思就是乐观地估计读的过程中大概率不会有写入,因此被称为乐观锁。反过来,悲观锁则是读的过程中拒绝有写入,也就是写入必须等待。显然乐观锁的并发效率更高,但一旦有小概率的写入导致读取的数据不一致,需要能检测出来,再读一遍就行。

public class Point {
    private final StampedLock stampedLock = new StampedLock();

    private double x;
    private double y;

    public void move(double deltaX, double deltaY) {
        long stamp = stampedLock.writeLock(); // 获取写锁
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            stampedLock.unlockWrite(stamp); // 释放写锁
        }
    }

    public double distanceFromOrigin() {
        long stamp = stampedLock.tryOptimisticRead(); // 获得一个乐观读锁
        // 注意下面两行代码不是原子操作
        // 假设x,y = (100,200)
        double currentX = x;
        // 此处已读取到x=100,但x,y可能被写线程修改为(300,400)
        double currentY = y;
        // 此处已读取到y,如果没有写入,读取是正确的(100,200)
        // 如果有写入,读取是错误的(100,400)
        if (!stampedLock.validate(stamp)) { // 检查乐观读锁后是否有其他写锁发生
            stamp = stampedLock.readLock(); // 获取一个悲观读锁
            try {
                currentX = x;
                currentY = y;
            } finally {
                stampedLock.unlockRead(stamp); // 释放悲观读锁
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }
}

和ReadWriteLock相比,写入的加锁是完全一样的,不同的是读取。注意到首先我们通过tryOptimisticRead()获取一个乐观读锁,并返回版本号。接着进行读取,读取完成后,我们通过validate()去验证版本号,如果在读取过程中没有写入,版本号不变,验证成功,我们就可以放心地继续后续操作。如果在读取过程中有写入,版本号会发生变化,验证将失败。在失败的时候,我们再通过获取悲观读锁再次读取。由于写入的概率不高,程序在绝大部分情况下可以通过乐观读锁获取数据,极少数情况下使用悲观读锁获取数据。

可见,StampedLock把读锁细分为乐观读和悲观读,能进一步提升并发效率。但这也是有代价的:一是代码更加复杂,二是StampedLock是不可重入锁,不能在一个线程中反复获取同一个锁

StampedLock还提供了更复杂的将悲观读锁升级为写锁的功能,它主要使用在if-then-update的场景:即先读,如果读的数据满足条件,就返回,如果读的数据不满足条件,再尝试写。

13.使用java.util.concurrent包提供的线程安全的集合

我们在前面已经通过ReentrantLock和Condition实现了一个BlockingQueue

public class TaskQueue {
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    private Queue<String> queue = new LinkedList<>();

    public void addTask(String s) {
        lock.lock();
        try {
            queue.add(s);
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public String getTask() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                condition.await();
            }
            return queue.remove();
        } finally {
            lock.unlock();
        }
    }
}

BlockingQueue的意思就是说,当一个线程调用这个TaskQueue的getTask()方法时,该方法内部可能会让线程变成等待状态,直到队列条件满足不为空,线程被唤醒后,getTask()方法才会返回。

因为BlockingQueue非常有用,所以我们不必自己编写,可以直接使用Java标准库的java.util.concurrent包提供的线程安全的集合:ArrayBlockingQueue。

除了BlockingQueue外,针对List、Map、Set、Deque等,java.util.concurrent包也提供了对应的并发集合类。我们归纳一下:
interface、non-thread-safe、thread-safe
List、ArrayList、CopyOnWriteArrayList
Map、HashMap、ConcurrentHashMap
Set、HashSet / TreeSet、CopyOnWriteArraySet
Queue、ArrayDeque / LinkedList、ArrayBlockingQueue / LinkedBlockingQueue
Deque、ArrayDeque / LinkedList、LinkedBlockingDeque
使用这些并发集合与使用非线程安全的集合类完全相同。我们以ConcurrentHashMap为例:

Map<String, String> map = new ConcurrentHashMap<>();
// 在不同的线程读写:
map.put("A", "1");
map.put("B", "2");
map.get("A", "1");

因为所有的同步和加锁的逻辑都在集合内部实现,对外部调用者来说,只需要正常按接口引用,其他代码和原来的非线程安全代码完全一样。即当我们需要多线程访问时,把:

Map<String, String> map = new HashMap<>();
改为:

Map<String, String> map = new ConcurrentHashMap<>();
就可以了。

java.util.Collections工具类还提供了一个旧的线程安全集合转换器,可以这么用:

Map unsafeMap = new HashMap();
Map threadSafeMap = Collections.synchronizedMap(unsafeMap);

但是它实际上是用一个包装类包装了非线程安全的Map,然后对所有读写方法都用synchronized加锁,这样获得的线程安全集合的性能比java.util.concurrent集合要低很多,所以不推荐使用。

14.使用Atomic

使用java.util.concurrent.atomic提供的原子操作可以简化多线程编程:

原子操作实现了无锁的线程安全;

适用于计数器,累加器等。

Java的java.util.concurrent包除了提供底层锁、并发集合外,还提供了一组原子操作的封装类,它们位于java.util.concurrent.atomic包。

我们以AtomicInteger为例,它提供的主要操作有:

  • 增加值并返回新值:int addAndGet(int delta)
  • 加1后返回新值:int incrementAndGet()
  • 获取当前值:int get()
  • 用CAS方式设置:int compareAndSet(int expect, int update)

Atomic类是通过无锁(lock-free)的方式实现的线程安全(thread-safe)访问。它的主要原理是利用了CAS:Compare and Set。

如果我们自己通过CAS编写incrementAndGet(),它大概长这样:

public int incrementAndGet(AtomicInteger var) {
    int prev, next;
    do {
        prev = var.get();
        next = prev + 1;
    } while ( ! var.compareAndSet(prev, next));
    return next;
}

CAS是指,在这个操作中,如果AtomicInteger的当前值是prev,那么就更新为next,返回true。如果AtomicInteger的当前值不是prev,就什么也不干,返回false。通过CAS操作并配合do … while循环,即使其他线程修改了AtomicInteger的值,最终的结果也是正确的。

我们利用AtomicLong可以编写一个多线程安全的全局唯一ID生成器:

class IdGenerator {
    AtomicLong var = new AtomicLong(0);

    public long getNextId() {
        return var.incrementAndGet();
    }
}

通常情况下,我们并不需要直接用do … while循环调用compareAndSet实现复杂的并发操作,而是用incrementAndGet()这样的封装好的方法,因此,使用起来非常简单。

在高度竞争的情况下,还可以使用Java 8提供的LongAdder和LongAccumulator。

15.使用线程池

Java语言虽然内置了多线程支持,启动一个新线程非常方便,但是,创建线程需要操作系统资源(线程资源,栈空间等),频繁创建和销毁大量线程需要消耗大量时间。

如果可以复用一组线程:

┌─────┐ execute ┌──────────────────┐
│Task1 │──────> │ ThreadPool │
├─────┤ │┌───────┐┌───────┐│
│Task2 │ ││ Thread1 ││ Thread2 │ │
├─────┤ │└───────┘└───────┘│
│Task3 │ │┌───────┐┌───────┐│
├─────┤ ││ Thread3 ││ Thread4 ││
│Task4 │ │└───────┘└───────┘│
├─────┤ └──────────────────┘
│Task5 │
├─────┤
│Task6 │
└─────┘

那么我们就可以把很多小任务让一组线程来执行,而不是一个任务对应一个新线程。这种能接收大量小任务并进行分发处理的就是线程池。

简单地说,线程池内部维护了若干个线程,没有任务的时候,这些线程都处于等待状态。如果有新任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,新任务要么放入队列等待,要么增加一个新线程进行处理。

Java标准库提供了ExecutorService接口表示线程池,它的典型用法如下:

// 创建固定大小的线程池:
ExecutorService executor = Executors.newFixedThreadPool(3);
// 提交任务:
executor.submit(task1);
executor.submit(task2);
executor.submit(task3);
executor.submit(task4);
executor.submit(task5);

因为ExecutorService只是接口,Java标准库提供的几个常用实现类有:

FixedThreadPool:线程数固定的线程池;
CachedThreadPool:线程数根据任务动态调整的线程池;
SingleThreadExecutor:仅单线程执行的线程池。

创建这些线程池的方法都被封装到Executors这个类中。我们以FixedThreadPool为例,看看线程池的执行逻辑:

// thread-pool
import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) {
        // 创建一个固定大小的线程池:
        ExecutorService es = Executors.newFixedThreadPool(4);
        for (int i = 0; i < 6; i++) {
            es.submit(new Task("" + i));
        }
        // 关闭线程池:
        es.shutdown();
    }
}

class Task implements Runnable {
    private final String name;

    public Task(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        System.out.println("start task " + name);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
        System.out.println("end task " + name);
    }
}

我们观察执行结果,一次性放入6个任务,由于线程池只有固定的4个线程,因此,前4个任务会同时执行,等到有线程空闲后,才会执行后面的两个任务。

线程池在程序结束的时候要关闭。使用shutdown()方法关闭线程池的时候,它会等待正在执行的任务先完成,然后再关闭。shutdownNow()会立刻停止正在执行的任务,awaitTermination()则会等待指定的时间让线程池关闭。

如果我们把线程池改为CachedThreadPool,由于这个线程池的实现会根据任务数量动态调整线程池的大小,所以6个任务可一次性全部同时执行。

如果我们想把线程池的大小限制在4~10个之间动态调整怎么办?我们查看Executors.newCachedThreadPool()方法的源码:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                    60L, TimeUnit.SECONDS,
                                    new SynchronousQueue<Runnable>());
}

因此,想创建指定动态范围的线程池,可以这么写:

int min = 4;
int max = 10;
ExecutorService es = new ThreadPoolExecutor(min, max,
        60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());

还有一种任务,需要定期反复执行,例如,每秒刷新证券价格。这种任务本身固定,需要反复执行的,可以使用ScheduledThreadPool。放入ScheduledThreadPool的任务可以定期反复执行。

创建一个ScheduledThreadPool仍然是通过Executors类:

ScheduledExecutorService ses = Executors.newScheduledThreadPool(4);

我们可以提交一次性任务,它会在指定延迟后只执行一次:

// 1秒后执行一次性任务:
ses.schedule(new Task("one-time"), 1, TimeUnit.SECONDS);

如果任务以固定的每3秒执行,我们可以这样写:

// 2秒后开始执行定时任务,每3秒执行:
ses.scheduleAtFixedRate(new Task("fixed-rate"), 2, 3, TimeUnit.SECONDS);

如果任务以固定的3秒为间隔执行,我们可以这样写:

// 2秒后开始执行定时任务,以3秒为间隔执行:
ses.scheduleWithFixedDelay(new Task("fixed-delay"), 2, 3, TimeUnit.SECONDS);

注意FixedRate和FixedDelay的区别。FixedRate是指任务总是以固定时间间隔触发,不管任务执行多长时间:

而FixedDelay是指,上一次任务执行完毕后,等待固定的时间间隔,再执行下一次任务:

因此,使用ScheduledThreadPool时,我们要根据需要选择执行一次、FixedRate执行还是FixedDelay执行。

细心的童鞋还可以思考下面的问题:

在FixedRate模式下,假设每秒触发,如果某次任务执行时间超过1秒,后续任务会不会并发执行?

如果任务抛出了异常,后续任务是否继续执行?

Java标准库还提供了一个java.util.Timer类,这个类也可以定期执行任务,但是,一个Timer会对应一个Thread,所以,一个Timer只能定期执行一个任务,多个定时任务必须启动多个Timer,而一个ScheduledThreadPool就可以调度多个定时任务,所以,我们完全可以用ScheduledThreadPool取代旧的Timer。

小结
JDK提供了ExecutorService实现了线程池功能:

线程池内部维护一组线程,可以高效执行大量小任务;

Executors提供了静态方法创建不同类型的ExecutorService;

必须调用shutdown()关闭ExecutorService;

ScheduledThreadPool可以定期调度多个任务。

16.Callable接口和Future接口

Runnable接口有个问题,它的方法没有返回值。如果任务需要一个返回结果,那么只能保存到变量,还要提供额外的方法读取,非常不便。所以,Java标准库还提供了一个Callable接口,和Runnable接口比,它多了一个返回值:

class Task implements Callable<String> {
    public String call() throws Exception {
        return longTimeCalculation(); 
    }
}

并且Callable接口是一个泛型接口,可以返回指定类型的结果。

现在的问题是,如何获得异步执行的结果?

如果仔细看ExecutorService.submit()方法,可以看到,它返回了一个Future类型,一个Future类型的实例代表一个未来能获取结果的对象:

ExecutorService executor = Executors.newFixedThreadPool(4); 
// 定义任务:
Callable<String> task = new Task();
// 提交任务并获得Future:
Future<String> future = executor.submit(task);
// 从Future获取异步执行返回的结果:
String result = future.get(); // 可能阻塞

当我们提交一个Callable任务后,我们会同时获得一个Future对象,然后,我们在主线程某个时刻调用Future对象的get()方法,就可以获得异步执行的结果。在调用get()时,如果异步任务已经完成,我们就直接获得结果。如果异步任务还没有完成,那么get()会阻塞,直到任务完成后才返回结果。

一个Future接口表示一个未来可能会返回的结果,它定义的方法有:

get():获取结果(可能会等待)
get(long timeout, TimeUnit unit):获取结果,但只等待指定的时间;
cancel(boolean mayInterruptIfRunning):取消当前任务;
isDone():判断任务是否已完成。

17.使用CompletableFuture

18.使用Fork/Join线程池

19.使用ThreadLocal

对于多任务,Java标准库提供的线程池可以方便地执行这些任务,同时复用线程。Web应用程序就是典型的多任务应用,每个用户请求页面时,我们都会创建一个任务,类似:

public void process(User user) {
    checkPermission();
    doWork();
    saveStatus();
    sendResponse();
}

然后,通过线程池去执行这些任务。

观察process()方法,它内部需要调用若干其他方法,同时,我们遇到一个问题:如何在一个线程内传递状态?

process()方法需要传递的状态就是User实例。有的童鞋会想,简单地传入User就可以了:

public void process(User user) {
    checkPermission(user);
    doWork(user);
    saveStatus(user);
    sendResponse(user);
}

但是往往一个方法又会调用其他很多方法,这样会导致User传递到所有地方:

void doWork(User user) {
    queryStatus(user);
    checkStatus();
    setNewStatus(user);
    log();
}

这种在一个线程中,横跨若干方法调用,需要传递的对象,我们通常称之为上下文(Context),它是一种状态,可以是用户身份、任务信息等。

给每个方法增加一个context参数非常麻烦,而且有些时候,如果调用链有无法修改源码的第三方库,User对象就传不进去了。

Java标准库提供了一个特殊的ThreadLocal,它可以在一个线程中传递同一个对象。

ThreadLocal实例通常总是以静态字段初始化如下:

static ThreadLocal<User> threadLocalUser = new ThreadLocal<>();
它的典型使用方式如下:

void processUser(user) {
    try {
        threadLocalUser.set(user);
        step1();
        step2();
    } finally {
        threadLocalUser.remove();
    }
}

通过设置一个User实例关联到ThreadLocal中,在移除之前,所有方法都可以随时获取到该User实例:

void step1() {
    User u = threadLocalUser.get();
    log();
    printUser();
}

void log() {
    User u = threadLocalUser.get();
    println(u.name);
}

void step2() {
    User u = threadLocalUser.get();
    checkUser(u.id);
}

注意到普通的方法调用一定是同一个线程执行的,所以,step1()、step2()以及log()方法内,threadLocalUser.get()获取的User对象是同一个实例。

实际上,可以把ThreadLocal看成一个全局Map<Thread, Object>:每个线程获取ThreadLocal变量时,总是使用Thread自身作为key:

Object threadLocalValue = threadLocalMap.get(Thread.currentThread());

因此,ThreadLocal相当于给每个线程都开辟了一个独立的存储空间,各个线程的ThreadLocal关联的实例互不干扰。

最后,特别注意ThreadLocal一定要在finally中清除:

try {
    threadLocalUser.set(user);
    ...
} finally {
    threadLocalUser.remove();
}

这是因为当前线程执行完相关代码后,很可能会被重新放入线程池中,如果ThreadLocal没有被清除,该线程执行其他代码时,会把上一次的状态带进去。

为了保证能释放ThreadLocal关联的实例,我们可以通过AutoCloseable接口配合try (resource) {…}结构,让编译器自动为我们关闭。例如,一个保存了当前用户名的ThreadLocal可以封装为一个UserContext对象:

public class UserContext implements AutoCloseable {

    static final ThreadLocal<String> ctx = new ThreadLocal<>();

    public UserContext(String user) {
        ctx.set(user);
    }

    public static String currentUser() {
        return ctx.get();
    }

    @Override
    public void close() {
        ctx.remove();
    }
}

使用的时候,我们借助try (resource) {…}结构,可以这么写:

try (var ctx = new UserContext("Bob")) {
    // 可任意调用UserContext.currentUser():
    String currentUser = UserContext.currentUser();
} // 在此自动调用UserContext.close()方法释放ThreadLocal关联对象

这样就在UserContext中完全封装了ThreadLocal,外部代码在try (resource) {…}内部可以随时调用UserContext.currentUser()获取当前线程绑定的用户名。

小结
ThreadLocal表示线程的“局部变量”,它确保每个线程的ThreadLocal变量都是各自独立的;

ThreadLocal适合在一个线程的处理流程中保持上下文(避免了同一参数在所有方法中传递);

使用ThreadLocal要用try … finally结构,并在finally中清除。

举报

相关推荐

0 条评论