本文为笔者对 JUC 的学习记录,主要参考了尚硅谷的 JUC 教程

1. JUC 概述

什么是 JUC?

java.util.concurrent 工具包的简称,是一个处理线程的工具包,JDK1.5 开始出现的

线程和进程:

  1. 进程 Process 是系统进行资源分配和调度的基本单位,是线程的容器。进程是程序的实体,是程序在某一数据集合上的一次活动
  2. 线程 Thread 是操作系统执行的最小单位,是进程中实际运作的单位,是进程中的一个单一顺序控制流。一个进程可以有多个线程,每个线程执行不同的任务

进程的状态:

Thread.State 是一个枚举类,包含6种信息:NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED

WAITING 和 TIMED_WAITING 区别:WAITING(不见不散),TIMED_WAITING(过时不候)

wait 和 sleep:

  1. sleep 是 Thread 的静态方法,wait 是 Object 的方法,任何实例对象都能调用
  2. sleep 不释放锁也不占用锁,wait 会释放锁,但调用的前提是当前线程占有锁(也就是代码要在 synchronized 中)
  3. 都可以被 interrupted 方法中断

并发和并行:

并行对应的是串行。并发是多个线程对应一个点(春运抢票、电商秒杀),并行是多个工作一起执行,后面再汇总(边泡面边烧水边看 b 站)

管程:

管程 Monitor 监视器,所说的锁就是监视器,是一种同步机制,保证同一时间只有一个线程访问被保护的数据或代码。JVM 的同步是基于进入和退出实现的,使用管程对象实现

用户线程和守护线程:

用户线程是平时使用的线程,自定义线程。守护线程是特殊线程,运行在后台,如垃圾回收线程

  • 主线程结束了,用户线程还在运行,那么 JVM 还在存活状态

image-20230512112441451

  • 没有用户线程了,都是守护线程,那么 JVM 结束。(设置守护线程要在 start 前)

image-20230512112355472

2. Synchronized 关键字和 Lock 接口

Synchronized 关键字:

  1. 修饰代码块:synchronized { 代码 },里面的代码叫同步语句块,作用于调用这个代码块的对象
  2. 修饰方法:作用于调用方法的对象
  3. 修饰静态方法:作用于这个类的所有对象
  4. 修饰类:作用于这个类的所有对象

多线程的编程步骤(上):

有一套固定套路

  1. 第一步:创建资源类,在资源类创建属性和操作方法(买一个空调,内容不能更改,有啥用啥)
  2. 第二步:创建多个线程,调用资源类的操作方法(3个售票员,卖30张票)
package com.bluestragglers.juc.sync;

// 第一步,创建资源类,定义属性和操作方法
class Ticket {
    // 票数
    private int number = 30;

    // 卖票方法
    public synchronized void sale() {
        // 判断是否有票
        if (number > 0) {
            System.out.println(Thread.currentThread().getName() + "卖出了第" + (number--) + "张票,剩余:" + number);
        }
    }
}

public class SaleTicket {
    public static void main(String[] args) {
        Ticket ticket = new Ticket();
        Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                // 卖票
                for (int i = 0; i < 40; i++) {
                    ticket.sale();
                }
            }
        }, "AA");
        Thread thread2 = new Thread(thread1, "BB");
        Thread thread3 = new Thread(thread1, "CC");
        thread1.start();
        thread2.start();
        thread3.start();
    }
}

image-20230512115215206

需要注意的是,这里的 synchronized 关键字的作用是避免两个人卖同一张票

Synchronized 是自动上锁和解锁。要想实现手动上锁和解锁,可以用 JUC 的 Lock 接口

Lock 接口:

提供了比 synchronized 功能更强大,更好扩展。Lock 是一个接口,实现类有 ReentrantLock, ReentrantReadWriteLock, ReadLock, ReentrantReadWriteLock, WriteLock 等。可重入锁:公共厕所

package com.bluestragglers.juc.lock;

import java.util.concurrent.locks.ReentrantLock;

class LTicket {
    // 票数
    private int number = 30;

    // 创建可重入锁
    private final ReentrantLock lock = new ReentrantLock();

    // 卖票方法
    public void sale() {
        // 上锁
        lock.lock();
        try {
            // 判断是否有票
            if (number > 0) {
                System.out.println(Thread.currentThread().getName() + "卖出了第" + (number--) + "张票,剩余:" + number);
            }
        } finally {
            // 释放锁
            lock.unlock();
        }
    }
}

public class LSaleTicket {
    public static void main(String[] args) {

        LTicket lTicket = new LTicket();

        Thread thread1 = new Thread(() -> {
            for (int i = 0; i < 40; i++) {
                lTicket.sale();
            }
        }, "AA");
        Thread thread2 = new Thread(thread1, "BB");
        Thread thread3 = new Thread(thread1, "CC");
        thread1.start();
        thread2.start();
        thread3.start();
    }
}

调用 start 后,线程是否会马上生成?不会,因为 start 方法内部调用了 start0() 方法,这是个 native 方法,具体什么时候生成线程,要看操作系统创建

创建线程方法:

  1. 继承 Thread 类
  2. 实现 Runnable 接口
  3. 使用 Callable 接口
  4. 使用线程池

Lock 和 synchronized 关键字的区别:

  1. Lock 是一个接口,而 synchronized 是一个关键字,是内置的语言实现
  2. synchronized 发生异常时,会自动释放线程占有的锁,因此不会发生死锁。Lock 发生异常时如果不调用 unLock() 释放锁,会容易造成死锁。所以 Lock 用 try finally 写并在 finally 中释放锁
  3. Lock 可以让等待锁的线程响应终端,synchronized 不行,等待的线程会一直等待下去,不能响应中断
  4. Lock 可以知道有没有成功获取锁,而 synchronized 不行
  5. Lock 可以提高多个线程进行读操作的效率
  6. 性能上来说,如果资源竞争不激烈,两者性能差不多。而资源竞争激烈时,Lock 性能远远优于 synchronized

3. 线程间通信

多线程的编程步骤(中):

  1. 创建资源类
  2. 在资源类操作方法
    1. 判断(注意防止虚假唤醒)
    2. 干活
    3. 通知
  3. 创建多个线程,调用资源类的操作方法

样例:两个线程,对同一个初始值为0的变量操作。一个线程+1,一个-1,一直为0

方法:使用 synchronized 关键字。内部用 Object 的 wait() 和 notifyAll() 方法实现等待和通知

package com.bluestragglers.juc.sync;

class Share {
    private int num = 0;

    public synchronized void incr() throws InterruptedException {
        if (num != 0) {
            this.wait(); // 等待
        }
        num++; // 干活
        System.out.println(Thread.currentThread().getName() + " " + num);
        this.notifyAll(); // 唤醒其他线程
    }

    public synchronized void decr() throws InterruptedException {
        if (num != 1) {
            this.wait(); // 等待
        }
        num--; // 干活
        System.out.println(Thread.currentThread().getName() + " " + num);
        this.notifyAll(); // 唤醒其他线程
    }
}

public class ThreadDemo1 {
    public static void main(String[] args) {
        Share share = new Share();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    share.incr();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "A").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    share.decr();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "B").start();
    }
}

image-20230512154835776

虚假唤醒问题:

上面的例子,假如 AA 和 BB 是 +1,CC 和 DD 是 -1。假如 AA 之后是 BB 执行,那 BB 会 wait(),这时 AA 又执行完一次唤醒了 BB,那么 BB 就会从这个 wait() 这里被唤醒,然后跳过 if 判断执行后续操作,判断失效了。wait() 的特点是哪里睡哪里醒

image-20230512155639539

解决方案:不使用 if,使用 while,避免虚假唤醒

image-20230512155702217

Lock 实现:

package com.bluestragglers.juc.lock;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class Share {
    private int num = 0;

    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    // +1
    public void incr() {
        lock.lock();
        try {
            while (num != 0) { // 判断
                condition.await(); // 等待
            }
            num++; // 干活
            System.out.println(Thread.currentThread().getName() + " " + num);
            condition.signalAll(); // 唤醒其他线程
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    // -1
    public void decr() {
        lock.lock();
        try {
            while (num != 1) { // 判断
                condition.await(); // 等待
            }
            num--; // 干活
            System.out.println(Thread.currentThread().getName() + " " + num);
            condition.signalAll(); // 唤醒其他线程
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

public class ThreadDemo2 {
    public static void main(String[] args) {
        Share share = new Share();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                share.incr();
            }
        }, "A").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                share.decr();
            }
        }, "B").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                share.decr();
            }
        }, "C").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                share.incr();
            }
        }, "D").start();
    }
}

image-20230512170236436

4. 线程间定制化通信

线程间定制化通信:

流程:启动三个线程,按照要求:AA 打印 5 次,BB 10 次,CC 15 次,进行 10 轮

方案:设置一个标志位 flag,同时设置三个锁 Condition,分别控制 AA, BB 和 CC 的关系。通过 condition.await() 等待信号,同时通过 condition.signal() 释放信号

image-20230512170640427

package com.bluestragglers.juc.lock;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class ShareResource {
    // 定义标志位
    private int flag = 1; // 1 AA, 2 BB, 3 CC

    // 创建 Lock 锁
    private Lock lock = new ReentrantLock();

    // 创建三个 condition,相当于三把钥匙
    private Condition condition1 = lock.newCondition();
    private Condition condition2 = lock.newCondition();
    private Condition condition3 = lock.newCondition();

    // 打印 AA 的方法
    public void print5() {
        // 上锁
        lock.lock();
        try {
            while (flag != 1) {
                // 等待
                condition1.await();
            }
            // 干活
            for (int i = 1; i <= 5; ++i) {
                System.out.println(Thread.currentThread().getName() + "\t" + i);
            }
            // 通知
            flag = 2; // 修改标志位
            condition2.signal(); // 通知 BB 线程
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 解锁
            lock.unlock();
        }
    }

    // 打印 BB 的方法
    public void print10() {
        // 上锁
        lock.lock();
        try {
            while (flag != 2) {
                // 等待
                condition2.await();
            }
            // 干活
            for (int i = 1; i <= 10; ++i) {
                System.out.println(Thread.currentThread().getName() + "\t" + i);
            }
            // 通知
            flag = 3; // 修改标志位
            condition3.signal(); // 通知 CC 线程
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 解锁
            lock.unlock();
        }
    }

    // 打印 CC 的方法
    public void print15() {
        // 上锁
        lock.lock();
        try {
            while (flag != 3) {
                // 等待
                condition3.await();
            }
            // 干活
            for (int i = 1; i <= 15; ++i) {
                System.out.println(Thread.currentThread().getName() + "\t" + i);
            }
            // 通知
            flag = 1; // 修改标志位
            condition1.signal(); // 通知 AA 线程
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 解锁
            lock.unlock();
        }
    }

}

public class ThreadDemo03 {
    public static void main(String[] args) {
        ShareResource shareResource = new ShareResource();
        new Thread(() -> {
            for (int i = 1; i <= 10; ++i) {
                shareResource.print5();
            }
        }, "AA").start();
        new Thread(() -> {
            for (int i = 1; i <= 10; ++i) {
                shareResource.print10();
            }
        }, "BB").start();
        new Thread(() -> {
            for (int i = 1; i <= 10; ++i) {
                shareResource.print15();
            }
        }, "CC").start();
    }
}

image-20230512171658887

5. 集合的线程安全

集合的线程不安全演示:

List list = new ArrayList<>();
for (int i = 0; i < 30; ++i) {
  new Thread(() -> {
    list.add(Thread.currentThread().getName());
    System.out.println(list);
  }, String.valueOf(i)).start();
}

执行可能会报 ConcurrentModificationException 错(Java 13 之前),原因是进行了并发修改

解决方案1——Vector:

原理:加了 synchronized 关键字

解决方案2——Collections:

原理:Collections 工具类提供了许多方法,其中 Collections.synchronizedList(new ArrayList<>()) 这样就可以同步了

解决方案3——CopyOnWriteArrayList:

这个方法是最常用的!多用这个方法

原理:写时复制技术。先用 lock 给对象上锁,然后复制一个相同内容的集合,独立写,最后将原容器的引用指向新副本

HashSet 解决方案——CopyOnWriteArraySet:

HashSet 是根据 HashMap 构建的,它的内容就是 HashMap 的 key。同样可以使用 CopyOnWriteArraySet 解决这个问题。

HashMap 解决方案——ConcurrentHashMap:

JDK 1.7:volatile + synchronized + segment

  • volatile:确保所有线程看到这个变量的值是一致的,同时禁止指令重排
  • volatile 保证可见性有序性,但不保证原子性
  • volatile 具体实现过程:强制变量将修改的值写入主存,同时线程 B 修改时会让线程 A 的缓存行无效,这样线程 A 就必须去主存读取
  • synchronized:实现原子性
  • segment:segment 继承了 ReentrantLock,能配合 synchronized 实现互斥同步
  • 优势:对单个 segment[i] 加锁,也就是如果 segment 有 16 个,就支持最多 16 个线程并发

使用 volatile 的场景:

  • 状态标记量
  • 单例模式 double check

JDK 1.8:CAS + synchronized + 红黑树

  • CAS:乐观锁的一种实现。悲观锁是每次执行前都上锁,是阻塞同步。乐观锁是先执行,如果执行时发生了冲突再采取其他策略,是非阻塞同步
  • JUC 的很多类采用了 Unsafe 类的 CAS 操作
  • synchronized:当 table[i] 不为空,并且没有线程正在扩容的时候,进入 table[i] 并使用 synchronized 上锁,然后判断是链表还是红黑树,然后将内容放进来

6. 多线程锁

8锁问题:

package com.bluestragglers.juc.sync;

import java.util.concurrent.TimeUnit;

class Phone {

    public static synchronized void sendSMS() throws Exception {
        //停留4秒
        TimeUnit.SECONDS.sleep(4);
        System.out.println("------sendSMS");
    }

    public synchronized void sendEmail() throws Exception {
        System.out.println("------sendEmail");
    }

    public void getHello() {
        System.out.println("------getHello");
    }
}

/**
 * @Description: 8锁
 *
1 标准访问,先打印短信还是邮件
------sendSMS
------sendEmail

2 停4秒在短信方法内,先打印短信还是邮件
------sendSMS
------sendEmail

3 新增普通的hello方法,是先打短信还是hello
------getHello
------sendSMS

4 现在有两部手机,先打印短信还是邮件
------sendEmail
------sendSMS

5 两个静态同步方法,1部手机,先打印短信还是邮件
------sendSMS
------sendEmail

6 两个静态同步方法,2部手机,先打印短信还是邮件
------sendSMS
------sendEmail

7 1个静态同步方法,1个普通同步方法,1部手机,先打印短信还是邮件
------sendEmail
------sendSMS

8 1个静态同步方法,1个普通同步方法,2部手机,先打印短信还是邮件
------sendEmail
------sendSMS

 */

public class Lock_8 {
    public static void main(String[] args) throws Exception {

        Phone phone = new Phone();
        Phone phone2 = new Phone();

        new Thread(() -> {
            try {
                phone.sendSMS();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "AA").start();

        Thread.sleep(100);

        new Thread(() -> {
            try {
               // phone.sendEmail();
               // phone.getHello();
                phone2.sendEmail();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "BB").start();
    }
}

问题 1 和 2:使用的 synchronized 方法锁了方法,也就是锁了当前对象,所以按照顺序执行

问题 3:hello 没有使用 synchronized 方法,所以不被锁对象,所以可以先执行

问题 4:两个对象,所以不被锁对象影响,所以不按顺序执行

问题 5 和 6:static synchronized 连用,锁了同一把类锁,所以两个对象按照顺序执行

问题 7 和 8:static synchronized 锁了一把类锁,但是不影响 synchronized 锁对象,所以不按照顺序执行。换句话说,类锁像一个公司大门,只有一把锁,但是公司里的房间的锁 (对象的锁)有很多把,所以互相不影响,两个对象不按顺序执行

公平锁和非公平锁:

ReentrantLock:默认是非公平锁,一个线程把所有活都干了,其他线程都被饿死。如果提供一个 true 参数,就成了公平锁

  • 非公平锁:效率高,但是线程饿死
  • 公平锁:效率低,但是阳光普照
  • 非公平锁实现:直接 lock,执行操作
  • 公平锁实现:队列

可重入锁:

synchronized 和 Lock 都是可重入锁。synchronized 是自动的,Lock 需要手动获取和释放锁。synchronized 的锁可以拿着钥匙来回开,像下面的例子就因为可以来回开,所以会报错

package com.bluestragglers.juc.sync;

public class SyncLockDemo {

    public synchronized void add() {
        add();
    }

    public static void main(String[] args) {
        new Thread(() -> {
            new SyncLockDemo().add();
        }, "t1").start();

        Object o = new Object();
        new Thread(() -> {
            synchronized (o) {
                System.out.println(Thread.currentThread().getName() + "外层");
                synchronized (o) {
                    System.out.println(Thread.currentThread().getName() + "中层");
                    synchronized (o) {
                        System.out.println(Thread.currentThread().getName() + "内层");
                    }
                }
            }
        }, "t1").start();
    }
}

同理,Lock 需要手动获取锁和释放锁。如果没有释放,就会出现死锁

死锁:

造成死锁的四个必要条件:

  1. 互斥
  2. 不可抢占
  3. 占有且等待(请求保持)
  4. 多个线程形成循环等待链(循环等待)

验证是否会发生死锁:

  1. jps 命令(查看进程号,类似 Linux 的 ps -ef)
  2. jstack(jvm 自带的堆栈跟踪工具)

首先通过 jps 获取进程号,然后通过 jstack+进程号,分析是否出现死锁

7. Callable 接口

Runnable 和 Callable 的区别:

  • Runnable 在线程终止时,也就是 run() 完成时,无法返回线程结果。Callable 则会可以返回结果
  • Runnable 需要实现不返回任何内容的 run() 方法,Callable 需要实现(重写)完成时返回结果的 call() 方法
  • run() 方法不能引发异常,call() 可以
  • Thread 的构造方法只支持 Runnable,不支持 Callable。FutureTask 实现了 Runnable,同时构造方法支持 Callable,所以可以用 FutureTask 构造 Thread

FutureTask:

功能包括:

  1. 老师上课口渴了,课不能停,新开了一个班长线程买水,买完水后老师线程可以随时 get 水
  2. 4 个同学计算,第 2 个同学计算量特别大,可以先汇总 1、3、4 的答案,最后等 2 的答案回来统一汇总
  3. 考试,首先做会做的题目,最后做不会做的题目

同时需要注意,子线程计算完一次后会将返回的结果保存,后面再调用就直接返回

package com.bluestragglers.juc.callable;

import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

class MyThread1 implements Runnable {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " come in runnable");
    }
}

class MyThread2 implements Callable {
    @Override
    public Integer call() throws Exception {
        return 200;
    }
}

public class Demo01 {
    public static void main(String[] args) {
        new Thread(new MyThread1(), "AA").start();
        FutureTask<Integer> futureTask = new FutureTask<>(() -> {
            System.out.println(Thread.currentThread().getName() + " come in callable");
            return 1024;
        });
        new Thread(futureTask, "BB").start();
    }
}

image-20230513104201100

FutureTask 只计算一次:

image-20230513104728748

8. JUC 强大的辅助类

减少计数 CountDownLatch:

主要方法:

image-20230513114846412

构造方法设初始值,countDown() 每次减一,await() 阻塞,直到 0 时才允许线程执行

例子:教室六个同学先走,班长最后锁门再走

// 先设置一个 CountDownLatch
CountDownLatch countDownLatch = new CountDownLatch(6);
// 6个同学离开教室
for (int i = 1; i <= 6; i++) {
  new Thread(() -> {
    System.out.println(Thread.currentThread().getName() + "\t 离开教室");
    countDownLatch.countDown();
  }, String.valueOf(i)).start();
}
try {
  countDownLatch.await();
  System.out.println(Thread.currentThread().getName() + "\t 班长关门走人");
} catch (InterruptedException e) {
  throw new RuntimeException(e);
}

不使用 CountDownLatch,就会出现班长可能先走的情况

image-20230513120335984

使用了 CountDownLatch,就能避免班长先走

image-20230513120628359

循环栅栏 CyclicBarrier:

CyclicBarrier 构造方法设置障碍数。首先跨域障碍数为 0,每次执行完都加一,最后到达障碍数了执行 await() 后的语句。按照官方的说法是到达终点的线程数量达到要求后执行后续动作

提供 Runnable 的构造方法:启动

image-20230513121113437

例子:集齐七颗龙珠即可满足愿望

private static final int DRAGON_BALL_COUNT = 7;
public static void main(String[] args) {
    CyclicBarrier cyclicBarrier = new CyclicBarrier(DRAGON_BALL_COUNT, () -> {
        System.out.println("召唤神龙");
    });
    for (int i = 1; i <= DRAGON_BALL_COUNT; ++i) {
        int finalI = i;
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "收集到第" + finalI + "颗龙珠");
            try {
                cyclicBarrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, String.valueOf(i)).start();
    }
}

信号量 Semaphore:

常用内容:构建方法,设置信号量数量、acquire() 拿一个信号量、release() 释放一个信号量

image-20230513143642176

例子:6 辆汽车抢 3 个车位

// 3 个车位
Semaphore semaphore = new Semaphore(3);
// 6 辆汽车
for (int i = 1; i <= 6; ++i) {
  new Thread(() -> {
    try {
      semaphore.acquire();
      System.out.println(Thread.currentThread().getName() + "\t抢到车位");
      Thread.sleep(3000);
      System.out.println(Thread.currentThread().getName() + "\t离开车位");
      semaphore.release();
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }).start();
}

image-20230513145756067

9. ReentrantReadWriteLock 读写锁

MySQL 事物隔离级别:

读取未提交、读取已提交、可重复读、串行化

脏读(读取了还没有提交的数据)、不可重复读(一个事务范围内相同的两次查询得到了不同数据)、幻读(多次读取返回的结果集不一样,如增加或减少了行数据)

幻读原因:两个线程不会修改正在修改或阅读的记录,但可能会增加和删除几个记录,造成记录条数不一样

ReentrantReadWriteLock:

可以通过 readLock() 和 writeLock() 上读写锁

image-20230513160217997

例子:建一个 map,同时设置 2 个线程,分别读和写 map

没有读写锁时,会出现问题

image-20230513160407419

加上之后就没有问题了。可以发现,写是独占的,读是共享的

image-20230513160536786

为什么用读写锁:

一阶段:无锁,多个线程抢夺资源,造成混乱

二阶段:独占锁 synchronized 和 ReentrantLock,读不能共享

三阶段:读写锁 ReentrantReadWriteLock,读可以共享,写不能共享。但是仍然存在问题,就是锁饥饿问题

锁降级:

写操作肯定要同时保留读锁和写锁,锁降级就是把写锁释放了,只保留读锁,这样其他线程也可以来读了

同时要注意,读锁不能升级为写锁。简单来说就是写的时候可以读,读的时候不能写

reentrantReadWriteLock.readLock().lock();
System.out.println("主线程读取");
reentrantReadWriteLock.writeLock().lock();
System.out.println("主线程写入");

image-20230513162620519

reentrantReadWriteLock.writeLock().lock();
System.out.println("主线程写入");
reentrantReadWriteLock.readLock().lock();
System.out.println("主线程读取");

image-20230513162645042

10. BlockingQueue 阻塞队列

image-20230513164216353

阻塞队列:

当队列为空时,获取元素操作会被阻塞,直到其他线程往空队列中插入了新元素。添加元素也是类似的,队列满的时候被阻塞,直到其他线程移除队列中的元素

为什么叫阻塞队列:

在多线程中,有的时候会将线程阻塞,直到其他线程唤醒。阻塞队列就是这样的,队列满的时候添加线程阻塞,直到其他线程移除元素并唤醒这个添加线程。读取线程也是类似的情况

阻塞队列不需要手动阻塞和唤醒线程,BlockingQueue 都包办了

阻塞队列可以用于生产者消费者情况

阻塞队列基本架构:

阻塞队列 BlockingQueue 是一个接口,实现类有许多 ArrayBlockingQueue ...

image-20230513165123500

阻塞队列实现类分类:

ArrayBlockingQueue, LinkedBlockingQueue

PriorityBlockingQueue

DelayQueue:使用优先级队列、带延迟的、无界的阻塞队列

SynchronousQueue:不存储元素的、单个的阻塞队列

常用方法:

可以发现,真正想使用阻塞队列的特色功能,需要用 put(e)、take() 方法,或者 offer(e, time, unit)、poll(time, unit) 方法

image-20230513170138519

image-20230513170244341

BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
System.out.println(blockingQueue.take());
blockingQueue.put("d");
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());

image-20230513171022920

11. ThreadPool 线程池

线程池优势:

  • 降低资源消耗
  • 提高响应速度
  • 提高资源的可管理性

线程池实现:

线程池通过 Executor 框架实现,这个框架使用了 Executor、Executors、ExecutorService、ThreadPoolExecutor 这几个类

image-20230513201602032

线程池使用:

常用方法:Executors 提供了下面的方法,其中 newCachedThreadPool() 可以根据需求创建,可扩容;newFixedThreadPool(int) 可以提供固定数量的线程;newSingleThreadPool() 是单线程池单线程,一个任务一个任务执行。它们都返回一个 ExecutorService 对象,可以通过 execute(Runnable) 方法执行

image-20230513201839041

例子:有 5 个窗口,10 个人办理业务

通过 Executors.newFixedThreadPool 获取 ExecutorService,然后执行 execute(Runnable) 方法调用线程池中的线程执行内容

ExecutorService threadPool1 = Executors.newFixedThreadPool(5);
ExecutorService threadPool2 = Executors.newSingleThreadExecutor();
ExecutorService threadPool3 = Executors.newCachedThreadPool();
for (int i = 1; i <= 10; ++i) {
  threadPool1.execute(() -> {
    System.out.println(Thread.currentThread().getName() + "\t办理业务");
  });
}

image-20230513202628637

image-20230513202904701

image-20230513202843539

ThreadPoolExecutor 七种参数:

打开 Executors,可以发现其实它们都是通过 ThreadPoolExecutor 实现的功能

image-20230513203037067

corePoolSize:在线程池中保留的线程数量,即使线程是空闲状态也保留,除非设置了 allowCoreThreadTimeOut

maximumPoolSize:线程池中允许的最大线程数量

keepAliveTime:线程数大于 corePoolSize 了,就允许空闲状态线程最多保留的时间,超过时间就终止线程

unit:时间单位

workQueue:等待任务执行的队列。这个队列只保留 execute() 提交的 Runnable 任务

threadFactory:生成新线程的工厂

handler:当线程达到最大数量同时队列容量也达到最大数量时的解决方案

拒绝策略:

四种。抛出异常、退回调用者执行、抛弃最早的任务、丢弃任务

image-20230513205604022

创建线程池:

阿里要求不能用 Executors 创建线程池,因为可能会造成 OOM 异常。所有线程池都要用 ThreadPoolExecutor 自定义创建

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,
                5,
                2L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardOldestPolicy()
                                                              );
for (int i = 1; i <= 10; ++i) {
  threadPoolExecutor.execute(() -> {
    System.out.println(Thread.currentThread().getName() + "\t办理业务");
  });
}
threadPoolExecutor.shutdown();

image-20230513211354895

12. Fork/Join 分支合并框架

Fork/Join 操作:

Fork 是把任务拆分,Join 是把任务合并。把复杂任务拆分成小任务,然后并行执行,最后合并结果

image-20230513212144239

image-20230513212253417

image-20230513212521851

上面的例子中,通过 fork() 方法构建分支,然后通过 join() 方法合并

例子:1+2+3+...+100,要求相加的两个数差值不能超过 10

class MyTask extends RecursiveTask<Integer> {

    private static final Integer VALUE = 10;
    private int begin, end, result;

    public MyTask(int begin, int end) {
        this.begin = begin;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        if (this.end - this.begin <= VALUE) {
            for (int i = this.begin; i <= this.end; ++i) {
                this.result += i;
            }
        } else {
            int middle = (this.begin + this.end) / 2;
            MyTask task01 = new MyTask(this.begin, middle);
            MyTask task02 = new MyTask(middle + 1, this.end);
            task01.fork();
            task02.fork();
            this.result = task01.join() + task02.join();
        }
        return this.result;
    }
}

public class ForkJoinDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        MyTask myTask = new MyTask(0, 100);
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);
        Integer result = forkJoinTask.get();
        System.out.println(result);
        forkJoinPool.shutdown();
    }
}

需要注意:

  1. 构建的类需要继承 RecursiveTask 方法
  2. 需要构建 ForkJoinPool 对象,并通过 ForkJoinPool.submit(Task) 提供任务,最后通过 ForkJoinPool.get() 获取结果
  3. 分治时使用 Task.fork(),获取结果时使用 Task.join()

13. CompletableFuture 异步回调

同步和异步:

CompletableFuture 是用来做异步回调的,也就是 A 同学不在,在了给我打电话

image-20230513214151987

Future 接口:异步回调。CompletableFuture 实现了这个接口,因此可以做异步回调

CompletableFuture:

常用功能:没有返回值的 runAsync(Runnable) 和有返回值的 supplyAsync(Supplier)

image-20230513214325582

CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
  System.out.println(Thread.currentThread().getName() + " CompletableFuture.runAsync");
});
completableFuture.get();

CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
  System.out.println(Thread.currentThread().getName() + " CompletableFuture.supplyAsync");
  int i = 10 / 0;
  return 1024;
});
completableFuture2.whenComplete((t, u) -> {
  System.out.println("t: " + t);
  System.out.println("u: " + u);
}).exceptionally((e) -> {
  System.out.println(e.getMessage());
  return 233;
}).get();

image-20230513215024260