狂神视频地址:https://www.bilibili.com/video/BV1B7411L7tE
准备工作
- 新建一个Maven项目,引入一个lombok依赖.
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.8</version>
</dependency>
</dependencies>
- 项目配置
JDK8
- 下载JDK帮助文档
1. 什么是JUC
Java.util.concurrent
2. 线程和进程
进程:一个程序,QQ.exe Music.exe 程序的集合;
一个进程往往可以包含多个线程,至少包含一个!
Java默认有几个线程? 2 个 :mian、GC。
线程:开了一个进程 Typora,写字,自动保存(线程负责的)。
2.1 Java 真的可以开启线程吗?
java是开不了线程的
new Thread().start();
public synchronized void start() {
/**
* This method is not invoked for the main method thread or "system"
* group threads created/set up by the VM. Any new functionality added
* to this method in the future may have to also be added to the VM.
*
* A zero status value corresponds to state "NEW".
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();
/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented. */
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
//调用本地方法,底层的C++,JAVA无法直接操作硬件
private native void start0();
首先。start()是一个synchronized方法,同步方法,安全,这个方法会把当前线程加入一个线程组,调用了start0()方法,这个start0()是用native修饰的,也就是本地方法。所以最后是调用了本地方法,JAVA是没有权限开启线程的。start()调用了本地的C++方法,因为java是运行在虚拟机之上的,无法直接操作硬件。
2.2 并发、并行
并发编程:并发,并行
并发:多个线程操作同一个资源
- 并发:cpu只有一个核:多线程操作同一个资源(cpu通过线程间的快速交替,模拟出来多条线程,看似并行,实际串行)
- 并行:cpu有多个核,多个线程可以同时执行,可以通过线程池完成
public class Demo01 {
public static void main(String[] args) {
//获取CPU核数
//CPU 密集型,IO密集型
System.out.println(Runtime.getRuntime().availableProcessors());
}
}
并发编程的本质:充分利用CPU的资源
2.3 线程状态
public enum State {
//新生
NEW,
//运行
RUNNABLE,
//阻塞
BLOCKED,
//等待(死死的等)
WAITING,
//超时等待(到期就不等了)
TIMED_WAITING,
//终止
TERMINATED;
}
2.4 wait/sleep区别
- 来自不同的类
wait()来自Object类,sleep()来自Thread类 - 关于锁的释放
wait()会释放锁,sleep()不会释放锁,可以理解为抱着锁睡觉 - 使用范围不同
wait()只能在同步代码块中使用,sleep()可以在任何地方使用 - 是否需要捕获异常
wait()不需要捕获异常,sleep()必须要捕获异常
3. LOCL锁(重点)
3.1 传统的synchronized锁
在公司真正的多线程开发中,线程就是一个单独的资源类,没有任何附属的操作(类中只有属性和方法),为了降低耦合性,不会用类去实现接口,因为实现类接口就不是OOP编程了,而且实现了接口的话耦合性变高,如果让类实现了Runnable,这个类就只是一个线程类了
如下代码,只是把Ticket作为了资源类,并没有让它实现Runnable接口
/**
* 真正的多线程开发,公司中的开发,降低耦合性
* 线程就是一个单独的资源类,没有认合附属操作
* 1. 属性,方法
*/
//基本的卖票例子
public class SaleTicket01 {
public static void main(String[] args) {
//多线程操作
//并发:多个线程操作同一个资源类,把资源类丢入线程
Ticket ticket = new Ticket();
// @FunctionalInterface 函数式接口,JDK1.8 Lambda表达式
new Thread(()->{
for (int i = 0; i < 40; i++) {
ticket.sale();
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 40; i++) {
ticket.sale();
}
},"B").start();
new Thread(()->{
for (int i = 0; i < 40; i++) {
ticket.sale();
}
},"C").start();
}
}
//资源类 OOP编程
class Ticket {
//属性,方法
//票数
private int num = 30;
//卖票方式
// synchronized 本质:队列,锁
public synchronized void sale() {
if (num > 0) {
System.out.println(Thread.currentThread().getName()+"卖出了第"+(num--)+"张票,剩余:"+num);
}
}
}
3.2 Lock锁
公平锁:十分公平,先来后到,排队
非公平锁:不公平,可以插队
(默认是非公平锁,是为了公平,比如一个线程要3s,另一个线程要3h,难道一定要让3h的锁先来就先执行吗)
public class SaleTicket02 {
public static void main(String[] args) {
//并发:多个线程操作同一个资源类,把资源类丢入线程
Ticket2 ticket = new Ticket2();
new Thread(()->{ for (int i = 0; i < 40; i++) ticket.sale();},"A").start();
new Thread(()->{ for (int i = 0; i < 40; i++) ticket.sale();},"B").start();
new Thread(()->{ for (int i = 0; i < 40; i++) ticket.sale();},"C").start();
}
}
//lock三部曲
// 1. new ReentrantLock()
// 2. lock.lock(); //加锁
// 3. finally => lock.unlock(); //解锁
class Ticket2 {
//属性,方法
private int num = 30;
Lock lock = new ReentrantLock();
public void sale() {
lock.lock(); //加锁
try {
// 业务代码
if (num > 0) {
System.out.println(Thread.currentThread().getName()+"卖出了第"+(num--)+"张票,剩余:"+num);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock(); //解锁
}
}
}
3.3 Synchronized和Lock锁的区别
- Synchronized:内置的Java关键字,Lock:是一个java类
- Synchronized:无法判断获取锁的状态,Lock:可以判断是否获取了锁
- Synchronized:会自动释放锁,Lock:必须要手动释放锁,如果不释放锁,会死锁
- Synchronized:线程1(获得锁,阻塞),线程2(等待,傻傻的等),Lock:Lock锁不一定会等待,Lock有一个方法Lock.tryLock();会去尝试获取锁
- Synchronized:可重入锁,不可以在中断,非公平, Lock:可重入,可以判断锁,可以设置公平或者非公平
- Synchronized:适合锁少量的代码同步问题,Lock:适合锁大量的同步代码
4. 生产者消费者问题
4.1 synchronized版
/**
* 线程之间的通信问题:生产者消费者问题
* 线程交替执行 A B 操作同一个变量 num=0
* A num+1
* B num-1
*/
public class A {
public static void main(String[] args) {
Data data = new Data();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
}
}
//判断等待,业务,通知
class Data{ //数字,资源类
private int num = 0;
// +1
public synchronized void increment() throws InterruptedException {
if (num != 0) {
//等待
this.wait();
}
num++;
System.out.println(Thread.currentThread().getName()+"=>"+num);
//通知其他线程,我+1完了
this.notifyAll();
}
// -1
public synchronized void decrement() throws InterruptedException {
if (num == 0) {
//等待
this.wait();
}
num--;
System.out.println(Thread.currentThread().getName()+"=>"+num);
//通知其他线程,我-1完了
this.notifyAll();
}
}
synchronized版本存在的问题
如果增加两个线程,即两个线程加,两个线程减,得到如下结果,并不能1,0交替,而且出现了2,3
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
问题原因分析
看jdk1.8的官方文档,找到Object类的wait()方法
while (<condition does not hold>)
obj.wait(timeout);
... // Perform action appropriate to condition
} ```
结论
就是用if判断的话,唤醒后线程会从wait之后的代码开始运行,不会重新判断if条件,直接继续运行if代码块之后的代码,而如果使用while的话,也会从wait之后的代码运行,但是唤醒后会重新判断循环条件,如果不成立再执行while代码块之后的代码块,成立的话继续wait。
解决方法
把if判断改成while判断等待,因为if判断进if之后不会停,用while判断的话,变量一旦被修改,另外一个线程拿到锁之后,就会等待,防止虚假唤醒。
public synchronized void increment() throws InterruptedException {
while (num != 0) {
//等待
this.wait();
}
num++;
System.out.println(Thread.currentThread().getName()+"=>"+num);
//通知其他线程,我+1完了
this.notifyAll();
}
4.2 JUC版(lock锁)
public class B {
public static void main(String[] args) {
Data2 data = new Data2();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
}
}
class Data2{
private int num = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
// +1
public void increment() throws InterruptedException {
lock.lock();
try {
while (num != 0) {
//等待
condition.await();
}
num++;
System.out.println(Thread.currentThread().getName()+"=>"+num);
//通知其他线程,我+1完了
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
// -1
public void decrement() throws InterruptedException {
lock.lock();
try {
while (num == 0) {
//等待
condition.await();
}
num--;
System.out.println(Thread.currentThread().getName()+"=>"+num);
//通知其他线程,我-1完了
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
Condition 精准的通知和唤醒线程
//A执行完调用B,B执行完调用C,C执行完调用A
public class C {
public static void main(String[] args) {
Data3 data = new Data3();
new Thread(()->{ for (int i = 0; i < 10; i++) data.printA(); },"A").start();
new Thread(()->{ for (int i = 0; i < 10; i++) data.printB(); },"B").start();
new Thread(()->{ for (int i = 0; i < 10; i++) data.printC(); },"C").start();
}
}
class Data3{
private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
private int num = 1; //1A 2B 3C
public void printA() {
lock.lock();
try {
//业务:判断-执行-通知
while (num != 1){
//等待
condition1.await();
}
System.out.println(Thread.currentThread().getName()+"=>AAAAA");
//唤醒,唤醒指定的人,B
num = 2;
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printB() {
lock.lock();
try {
//业务:判断-执行-通知
while (num != 2){
//等待
condition2.await();
}
System.out.println(Thread.currentThread().getName()+"=>BBBBB");
//唤醒,唤醒指定的人,B
num = 3;
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printC() {
lock.lock();
try {
//业务:判断-执行-通知
while (num != 3){
//等待
condition3.await();
}
System.out.println(Thread.currentThread().getName()+"=>CCCCC");
//唤醒,唤醒指定的人,B
num = 1;
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
5. 八锁
锁是什么,如何判断锁的是谁?
8锁,就是关于锁的8个问题
- 问题1:标准情况下,先发短信,还是先打电话?
/*
问题1:标准情况下,先打印发短信,还是先打印打电话?结果是:先打印发短信,后打印打电话
原因:不能回答先调用A线程,这是错误的,不是先调用先执行,这是锁的问题,因为被Synchronized修饰的
方法,锁的对象是方法的调用者,所以调用两个方法的对象都是phone,但是现在phone只有一个,也就是说着两个方
法现在用的是同一把锁,谁先拿到,谁就先执行
*/
public class Test1 {
public static void main(String[] args) {
phone phone = new phone();
new Thread(()->{ phone.sendSms(); },"A").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
new Thread(()->{ phone.call(); },"B").start();
}
}
class phone{
//synchronized 锁的对象是方法的调用者
public synchronized void sendSms() {
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
}
结果:先打印发短信,后打印打电话
- 问题2:给发短信方法加延时4s,程序的执行情况
public synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
结果:经过4s之后打印发短信,然后立马打印打电话。
如果把主函数中延时改为5s,那么程序运行情况是经过4s打印发送短信,然后经过1s打印打电话(主程序的延时是同时进行的)
public class Test1 {
public static void main(String[] args) {
phone phone = new phone();
new Thread(()->{ phone.sendSms(); },"A").start();
try {
TimeUnit.SECONDS.sleep(5);
} catch (Exception e) {
e.printStackTrace();
}
new Thread(()->{ phone.call(); },"B").start();
}
}
- 问题3:当调用普通方法,而不是synchronized方法时,程序的执行情况
public class Test1 {
public static void main(String[] args) {
phone phone = new phone();
new Thread(()->{ phone.sendSms(); },"A").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
new Thread(()->{ phone.sayHello(); },"B").start();
}
}
class phone{
//synchronized 锁的对象是方法的调用者
public synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
public void sayHello()
{
System.out.println("hello");
}
}
结果:1秒先打印Hello,4秒后打印发短信
先执行普通方法,因为普通方法没有锁,不受锁的影响
- 问题4:两个对象分别调用synchronized方法时,程序的执行情况
public static void main(String[] args) {
//两个对象,两个调用者,两把锁
phone phone1 = new phone();
phone phone2 = new phone();
new Thread(()->{ phone1.sendSms(); },"A").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
new Thread(()->{ phone2.call(); },"B").start();
}
结果:先打电话,后发短信。
两个对象,两个调用者,两把锁,因为锁不一样,所以耗时短的先输出。
- 问题5:增加两个静态的同步方法,只有一个对象,程序的执行情况
public class Test1 {
public static void main(String[] args) {
phone phone = new phone();
new Thread(()->{ phone.sendSms(); },"A").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
new Thread(()->{ phone.call(); },"B").start();
}
}
class phone{
//static 静态方法,类一加载就有了,锁的是Class
public static synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public static synchronized void call() {
System.out.println("打电话");
}
}
结果:先发短信,后打电话。
- 问题6:两个静态的同步方法,两个对象,程序的执行情况
public class Test1 {
public static void main(String[] args) {
phone phone1 = new phone();
phone phone2 = new phone();
new Thread(()->{ phone1.sendSms(); },"A").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
new Thread(()->{ phone2.call(); },"B").start();
}
}
结果:先发短信,后打电话。
两个对象的Class模板只有一个,static锁的是Class
- 问题7:一个静态同步方法,一个普通同步方法,只有一个对象,程序的执行情况
public class Test1 {
public static void main(String[] args) {
phone phone = new phone();
new Thread(()->{ phone.sendSms(); },"A").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
new Thread(()->{ phone.call(); },"B").start();
}
}
class phone{
//静态同步方法,锁的是Class模板
public static synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
//普通同步方法,锁的是调用者,不需要去等待锁
public synchronized void call() {
System.out.println("打电话");
}
}
结果:先打电话,后发短信。
不是同一个锁,锁调用者的不需要等待,锁Class的要等待
- 问题8:一个静态同步方法,一个普通同步方法,两个对象,程序的执行情况
public static void main(String[] args) {
phone phone1 = new phone();
phone phone2 = new phone();
new Thread(()->{ phone1.sendSms(); },"A").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
new Thread(()->{ phone2.call(); },"B").start();
}
结果:先打电话,后发短信。
两个锁,还是锁的对象不一样
小结
普通同步方法:(new this) 锁的是调用者,是一个具体的对象
静态同步方法:(static Class) 锁的是Class模板,是唯一的
6. 集合类不安全
6.1 List不安全
CopyOnWriteArrayList
- Arrarlist测试(单线程)
public class ListTest01 {
public static void main(String[] args) {
List<String> list = Arrays.asList("1", "2", "3");
list.forEach(System.out::println);
}
}
- 多线程下ArrayList还安全吗?
现在我们创建10个线程来向List添加元素
public class ListTest01 {
public static void main(String[] args) {
ArrayList<String> list = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(list);
},String.valueOf(i)).start();
//UUID(Universally Unique Identifier):通用唯一识别码,是一种软件建构的标准.UUID是指在一台机器上生成的数字,它保证对在同一时空中的所有机器都是唯一的
//String.valueOf(i),给10个线程分别取名字
}
}
}
-
并发下 ArrayList 不安全的解决方案
- List list = new Vector<>(); //vector默认是安全的(是同步方法)
- List list = Collections.synchronizedList(new ArrayList<>); //工具类
- List list = new CopyOnWriteArrayList<>();//CopyOnWrite,写入时复制,COW,计算机程序设计领域的一种优化策略
多个线程调用的时候,list,读取的时候 固定的,写入(覆盖)
在写入的时候避免覆盖,造成数据问题!
读写分离(写入的时候复制一个数组出来,写入完之后再插入进去,保证线程安全)
-
CopyOnWriteArrayList 比 Vector 好在哪里?
Vector的add方法有Synchronized修饰(看源码),有Synchronized修饰的方法,效率都比较低
CopyOnWriteArrayList—— add源码
6.2 Set不安全
public class SetTest {
public static void main(String[] args) {
// Set<String> set = new HashSet<>(); //ConcurrentModificationException
// Set<String> set = Collections.synchronizedSet(new HashSet<>());
Set<String> set = new CopyOnWriteArraySet<>();
for (int i = 1; i <= 30; i++) {
new Thread(()->{
set.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(set);
},String.valueOf(i)).start();
}
}
}
HashSet的底层是什么?
public HashSet() {
map = new HashMap<>();
}
//add set本质就是map,key是无法重复的
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}
private static final Object PRESENT = new Object(); //PRESENT 不变的值
6.2 Map不安全
HashMap<Object, Object> map = new HashMap<>();
问题1:Map是这样用的吗?
答:工作中不这样用
问题2:map默认等价于什么
其中16是初始容量,0.75是加载因子
HashMap<Object, Object> map1 = new HashMap<>(16,0.75);
public class MapTest {
public static void main(String[] args) {
// HashMap<Object, Object> map = new HashMap<>(); //ConcurrentModificationException
// HashMap<Object, Object> map = (HashMap<Object, Object>) Collections.synchronizedMap(new HashMap<>());
Map<Object, Object> map = new ConcurrentHashMap<>();
for (int i = 0; i < 30; i++) {
new Thread(()->{
map.put(Thread.currentThread().getName(),UUID.randomUUID().toString().substring(0,5));
System.out.println(map);
},String.valueOf(i)).start();
}
}
}
7. Callable
官方文档:Callable接口类似于Runnable ,因为它们都是为其实例可能由另一个线程执行的类设计的。 然而, Runnable不返回结果,也不能抛出被检查的异常。
- 可以有返回值
- 可以抛出异常
- 方法不同,run() / call()
根据底层源码
public Thread() {
init(null, null, "Thread-" + nextThreadNum(), 0);
}
Thread 只能接受Runnable类型的参数,不能接受Callable类型的参数
但是,Runnable有一个实现类:Class FutureTask
FutureTask可以接受Callable类型的参数
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//new Thread(new Runnable()).start();
//new Thread(new FutureTask<V>()).start();
//new Thread(new FutureTask<V>(Callable)).start();
new Thread().start(); //怎么启动callable
MyThread myThread = new MyThread();
FutureTask futureTask = new FutureTask<>(myThread); //适配类
new Thread(futureTask,"A").start();
Object o = futureTask.get(); //获取Callable的返回结果,get方法可能会产生阻塞,一般放到最后,或者使用异步通信
System.out.println(o);
}
}
class MyThread implements Callable<String>{
@Override
public String call() throws Exception {
System.out.println("call()");
return "aaa";
}
}
问题:如果new两个线程会打印几个call()?
答案:一个
分析:结果会被缓存,效率高
8. 常用的辅助类(必会)
8.1 CountDownLatch(减法计数器)
//计数器
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
//总数是6,必须要在执行任务的时候,再使用
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <= 6; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+" go out");
countDownLatch.countDown(); //数量-1
},String.valueOf(i)).start();
}
countDownLatch.await(); //等待计数器归零,再向下执行
System.out.println("Close door");
}
}
原理:
countDownLatch.countDown(); // 数量-1
countDownLatch.await(); // 等待计数器归零,然后再向下执行
每次有线程调用 countDown() 数量-1,假设计数器变为0,countDownLatch.await() 就会被唤醒,继续执行!
如果 countDown()没有减到0,后面的程序是不会执行的
8.2 CylicBarrier(加法计数器)
public class CylicBarrierDemo {
public static void main(String[] args) throws InterruptedException {
//集齐7颗龙珠召唤神龙
//召唤龙珠的线程
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
System.out.println("召唤神龙成功");
});
for (int i = 1; i <= 7; i++) {
final int temp = i;
new Thread(()->{
//lambda表达式获取不到i
System.out.println(Thread.currentThread().getName()+"收集了"+temp+"颗龙珠");
try {
cyclicBarrier.await(); //等待
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
8.3 Semaphore(信号量)
//模拟停车,假设现在有6辆车,但是只有3个停车位
//在有限的情况下使其有秩序,限流的时候可以使用
public class SemaphoreDemo {
public static void main(String[] args) throws InterruptedException {
//线程数量:停车位
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 6; i++) {
new Thread(()->{
// acquire() 得到
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"抢到车位");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // release() 释放
}
},String.valueOf(i)).start();
}
}
}
原理:
semaphore.acquire():获得,假设如果已经满了,等待,等待被释放为止!
semaphore.release():释放,会将当前的信号量释放 + 1,然后唤醒等待的线程!
作用: 多个共享资源互斥的使用!并发限流,控制最大的线程数!
9. 读写锁ReadWriteLock
读的时候可以被多线程同时读,写的时候只能有一个线程去写。
独占锁(写锁):一次只能被一个线程占有
共享锁(读锁):可以被多个线程同时占有
读-读:可以共存
读-写:不能共存
写-写:不能共存
public class ReadWriteLockTest {
public static void main(String[] args) {
//MyCache myCache = new MyCache();
MyCacheLock myCache = new MyCacheLock();
//写入
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(()->{
myCache.put(temp+"",temp+"");
},String.valueOf(i)).start();
}
//读取
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(()->{
myCache.get(temp+"");
},String.valueOf(i)).start();
}
}
}
//加锁的
class MyCacheLock{
private volatile Map<String,Object> map = new HashMap<>();
//读写锁(更加细粒度的控制)
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
//存 写入的时候,只希望同时只有一个线程在写
public void put(String key, Object value){
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"写入"+key);
map.put(key,value);
System.out.println(Thread.currentThread().getName()+"写入成功");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock();
}
}
//读 取,所有人可以读
public void get(String key){
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"读取"+key);
Object o = map.get(key);
System.out.println(Thread.currentThread().getName()+"读取成功");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
}
}
//自定义缓存
class MyCache{
private volatile Map<String,Object> map = new HashMap<>();
//存 写
public void put(String key, Object value){
System.out.println(Thread.currentThread().getName()+"写入"+key);
map.put(key,value);
System.out.println(Thread.currentThread().getName()+"写入成功");
}
//读 取
public void get(String key){
System.out.println(Thread.currentThread().getName()+"读取"+key);
Object o = map.get(key);
System.out.println(Thread.currentThread().getName()+"读取成功");
}
}
10. 阻塞队列
写入:如果队列满了,就必须阻塞等待
读取:如果队列是空的,必须阻塞等待生产
BlockingQueue
什么情况会使用阻塞队列?
多线程(A调用B,必须等B先执行,B没有执行完,A就会挂起或者等待)
线程池(出了弹性大小之外,一般会用一个队列去维护里面的大小)
学会使用队列
添加,移除
四组API
- 抛出异常
public class BqTest {
public static void main(String[] args) {
test1();
}
// 1. 抛出异常
public static void test1(){
//队列大小
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(arrayBlockingQueue.add("a"));
System.out.println(arrayBlockingQueue.add("b"));
System.out.println(arrayBlockingQueue.add("c"));
//java.lang.IllegalStateException: Queue full 抛出异常
//System.out.println(arrayBlockingQueue.add("d"));
System.out.println(arrayBlockingQueue.remove());
System.out.println(arrayBlockingQueue.remove());
System.out.println(arrayBlockingQueue.remove());
//java.util.NoSuchElementException
//System.out.println(arrayBlockingQueue.remove());
}
}
- 不会抛出异常
// 2. 有返回值,不抛出异常
public static void test2(){
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(arrayBlockingQueue.offer("a"));
System.out.println(arrayBlockingQueue.offer("b"));
System.out.println(arrayBlockingQueue.offer("c"));
System.out.println(arrayBlockingQueue.offer("d")); //false 不抛出异常
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll()); //null 不抛出异常
}
- 阻塞等待
// 3. 等待,阻塞(一直阻塞)
public static void test3() throws InterruptedException {
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
arrayBlockingQueue.put("a");
arrayBlockingQueue.put("b");
arrayBlockingQueue.put("c");
//arrayBlockingQueue.put("d"); //队列没有位置,一直阻塞
System.out.println(arrayBlockingQueue.take());
System.out.println(arrayBlockingQueue.take());
System.out.println(arrayBlockingQueue.take());
//System.out.println(arrayBlockingQueue.take()); //没有这个元素,一直阻塞
}
- 超时等待
// 4. 等待,阻塞(等待超时)
public static void test4() throws InterruptedException {
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(arrayBlockingQueue.offer("a"));
System.out.println(arrayBlockingQueue.offer("b"));
System.out.println(arrayBlockingQueue.offer("c"));
System.out.println(arrayBlockingQueue.offer("d", 2, TimeUnit.SECONDS)); //等待超过2秒就退出
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll(2, TimeUnit.SECONDS)); //等待超过2秒就退出
}
- 同步队列
- 没有容量,进去一个元素,必须等待取出来之后,才能再往里面放一个元素。
- 和其他的BlockingQueue 不一样, SynchronousQueue 不存储元素
- put了一个元素,必须从里面先take取出来,否则不能在put进去值!
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> synchronousQueue = new SynchronousQueue<>();//同步队列
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+" put 1");
synchronousQueue.put("1");
System.out.println(Thread.currentThread().getName()+" put 2");
synchronousQueue.put("2");
System.out.println(Thread.currentThread().getName()+" put 3");
synchronousQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T1").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+" take "+synchronousQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+" take "+synchronousQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+" take "+synchronousQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T2").start();
}
}
11. 线程池
- 池化技术
- 程序运行的本质:占用系统资源,为了优化资源的使用,引入了池化技术
- 比如线程池、连接池。内存池、对象池
- 池化技术:事先准备好一些资源,有人要用,就来拿,用完就归还。
线程池的好处:
- 降低资源消耗
- 提高响应速度
- 方便管理线程
线程可以复用,可以控制最大并发数,管理线程
线程池重点核心:三大方法, 7大参数, 4种拒绝策略
11. 1 三大方法
//Executors 工具类 3大方法
public class Demo01 {
public static void main(String[] args) {
//ExecutorService threadPool = Executors.newSingleThreadExecutor();// 单个线程
//ExecutorService threadPool = Executors.newFixedThreadPool(5);// 创建一个固定的线程池的大小
ExecutorService threadPool = Executors.newCachedThreadPool(); // 可伸缩的
try {
for (int i = 0; i < 100; i++) {
// 使用线程池创建线程
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 线程池用完,程序结束,线程池要关闭
threadPool.shutdown();
}
}
}
11.2 7大参数
源码分析
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
本质:new ThreadPoolExecutor()
public ThreadPoolExecutor(int corePoolSize, //核心线程池大小
int maximumPoolSize, //最大核心线程池大小
long keepAliveTime, //超时了没有人调用就会释放
TimeUnit unit, //超时单位
BlockingQueue<Runnable> workQueue, //阻塞队列
ThreadFactory threadFactory, //线程工厂,创建线程的,一般不用动
RejectedExecutionHandler handler) { //拒绝策略
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
11.3 4种拒绝策略
四种拒绝策略:
- new ThreadPoolExecutor.AbortPolicy() //银行满了,但是还有人进来,不处理这个人的,抛出异常
- new ThreadPoolExecutor.CallerRunsPolicy() //哪来的去哪里 (main线程执行)
- new ThreadPoolExecutor.DiscardPolicy() //队列满了,丢掉任务,不会抛出异常
- new ThreadPoolExecutor.DiscardOldestPolicy() //队列满了,尝试去和最早的竞争,不会抛出异常
public class Demo01 {
public static void main(String[] args) {
//自定义线程池
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy() //队列满了,尝试去和最早的竞争,不会抛出异常
);
try {
// 最大承载 8个:Deque + max
// 超过:RejectedExecutionException
for (int i = 0; i < 10; i++) {
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 线程池用完,程序结束,线程池要关闭
threadPool.shutdown();
}
}
}
- 最大线程到底该如何定义
- CPU密集型:几核,就是几,可以保持CPU的效率最高!
你的电脑是几个核心的(用程序获取),你的最大线程数就设置成几,可以保持CPU的效率最高)
- IO密集型:最大线程数 > 判断你程序中十分耗IO的线程
例如:你的程序里面有15个任务很占用IO资源,就用15个线程去执行,所以最大
线程数量大于这个15就好了,一般是大型IO任务数量的2倍
12. 四大函数式接口
新时代的程序员:lambda表达式、链式编程、函数式接口、Stream流式计算
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
超级多的FunctionalInterface
简化编程模型,在新版本的框架底层大量应用
foreach(消费者类型的函数式接口)
12.1 Function 函数型接口
有一个输入参数,有一个输出
只要是函数式接口,就可以用lambda表达式简化
public class FunctionDemo {
public static void main(String[] args) {
/*Function function = new Function<String,String>() {
//工具类:输出输入的值
@Override
public String apply(String str) {
return str;
};
};*/
Function function = (str)->{return str;};
System.out.println(function.apply("aaa"));
}
}
12.2 Predicate 断定型接口
有一个输入参数,返回值只能是布尔值
public class PredicateDemo {
public static void main(String[] args) {
//判断字符串是否为空
/*Predicate<String> predicate = new Predicate<String>() {
@Override
public boolean test(String str) {
return str.isEmpty();
}
};*/
Predicate<String> predicate = (str)->{return str.isEmpty();};
System.out.println(predicate.test(""));
}
}
12.3 Supplier 供给型接口
public class SuppierDemo {
public static void main(String[] args) {
//打印字符串
/*Supplier<String> supplier = new Supplier<String>() {
@Override
public String get() {
return "aaa";
}
};*/
Supplier<String> supplier = ()->{return "aaa";};
System.out.println(supplier.get());
}
}
12.4 Consumer 消费型接口
public class ConsummerDemo {
public static void main(String[] args) {
//打印字符串
/* Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
};*/
Consumer<String> consumer = (s)->{System.out.println(s);};
consumer.accept("sss");
}
}
13. Stream 流式计算
- 什么是Stream流式计算
- 大数据:存储+计算
- 集合、Mysql本来就是存储数据的,计算应该交给流来操作。
/**
* 题目要求: 用一行代码实现
* 1. Id 必须是偶数
* 2.年龄必须大于23
* 3. 用户名转为大写
* 4. 用户名倒序
* 5. 只能输出一个用户
**/
public class StreamDemo {
public static void main(String[] args) {
User u1 = new User(1, "a", 23);
User u2 = new User(2, "b", 23);
User u3 = new User(3, "c", 23);
User u4 = new User(6, "d", 24);
User u5 = new User(4, "e", 25);
List<User> list = Arrays.asList(u1, u2, u3, u4, u5);
// lambda、链式编程、函数式接口、流式计算
list.stream()
.filter(user -> {return user.getId()%2 == 0;})
.filter(user -> {return user.getAge() > 23;})
.map(user -> {return user.getName().toUpperCase();})
.sorted((user1, user2) -> {return user2.compareTo(user1);})
.limit(1)
.forEach(System.out::println);
}
}