碎碎念

本人根据B站视频总结的Java面试题,初步打算分为初级篇,中级篇和高级篇,因为篇幅比较长所以分开总结。顺便吐槽一下现在的面试面试造火箭,很多不会用到,但是你得会。看完总体下来还是收获很大的,在这里博主呼吁大家不要做API调用工程师,Ctrl C + Ctrl V 程序员!

volatile

volatile是Java虚拟机提供的轻量级的同步机制

  • 保证可见性
  • 不保证原子性
  • 禁止指令重排

JMM(Java内存模型)

JMM(Java内存模型Java Memory Model,简称JMM)本身是一种抽象的概念并不真是存在,它描述的是一组规则或者规范,通过这组规范定义了程序中各个变量(包括实例字段,静态字段和构成数组对象的元素)的访问方式。

  • JMM三大特性:

    1. 可见性
    2. 原子性
    3. 有序性
  • JMM关于同步的规定:

    1. 线程解锁前,必须把共享变量的值刷新回主内存
    2. 线程加锁前,必须读取主内存的最新值到自己的工作内存
    3. 加锁解锁是同一把锁

由于JVM运行程序的实体是线程,而每个线程创建时JVM都会为其创建一个工作内存(有些地方成为栈空间),工作内存是每个线程的私有数据空间,而Java内存模型中规定所有变量都存储在主内存,主内存是共享内存区域,所有线程都可以访问,但线程对变量的操作(读取赋值等)必须在工作内存中进行,首先要将变量从主内存拷贝的自己的工作内存空间,然后对变量进行操作,操作完成后再将变量写回主内存,不能直接操作主内存中的变量,各个线程中的工作内存中存储着主内存中的变量副本拷贝,因此不同的线程间无法访问对方的工作内存,线程间的通信(传值)必须通过主内存来完成。

代码演示

可见性代码验证

  • 在没有加volatile之前程序是找不到number变量的
1
2
3
4
5
6
7
class MyData {
int number = 0;

public void add() {
this.number = 60;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class VolatileDemo {
public static void main(String[] args) {
MyData myData = new MyData();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "come in");
try {
TimeUnit.SECONDS.sleep(3);
myData.add();
System.out.println(Thread.currentThread().getName() + "\t" + "update number value: " + myData.number);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, "AAA").start();

while (myData.number == 0) {

}
System.out.println(Thread.currentThread().getName() + "\t" + "missing is over");
}
}
  • 在加上volatile之后
1
volatile int number = 0;

volatile不保证原子性案例

1
2
3
4
5
6
7
class MyData {
volatile int number = 0;

public void add() {
number++;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class VolatileDemo {
public static void main(String[] args) {
MyData myData = new MyData();
for (int i = 1; i <= 20; i++) {
new Thread(() -> {
for (int j = 1; j <= 1000; j++) {
myData.add();
}
}, String.valueOf(i)).start();
}

while (Thread.activeCount() > 2) {
Thread.yield();
}

System.out.println(Thread.currentThread().getName() + "\t finally number value: " + myData.number);
}
}

如果正确的话number的值应为20000,可以看到值为19686,说明volatile不保证原子性。

volatile不保证原子性解决办法

  • synchronized
  • 使用JUC下的AtomicInteger
1
2
AtomicInteger atomicInteger = new AtomicInteger();
atomicInteger.getAndIncrement();

volatile指令重排

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class VolatileDemo {
int a = 0;
boolean flag = false;
public void method01() {
a = 1;
flag = true;
}
public void method02() {
if (flag) {
a = a + 5;
System.out.println(a);
}
}
}

多线程环境中线程交替执行,由于编译器优化重排的存在,两个线程中使用的变量能否保持一致性是无法确定的,结果无法预测

线程安全性获取保证

工作内存与主内存延迟现象导致的可见性问题

可以使用synchronized或volatile关键字解决,它们都可以使一个线程修改后的变量立即对其他线程可见

对于指令重排导致的可见性问题和有序性问题,可以利用volatile关键字解决,因为volatile的另外一个作用就是禁止指令重排

单例模式下在多线程环境下可能存在的问题

1
2
3
4
5
for (int i = 0; i < 10; i++) {
new Thread(() -> {
Singleton.getInstance();
}, String.valueOf(i)).start();
}

对于多线程下单例模式存在的问题我们可以采用DCL机制来解决

1
2
3
4
5
6
7
8
9
10
public static Singleton getInstance() {
if (instance == null) {
synchronized (Singleton.class) {
if (instance == null) {
instance = new Singleton();
}
}
}
return instance;
}

单例模式下volatile分析

DCL(双端检索)机制不一定线程安全,原因是有指令重排的存在,加入volatile可以禁止指令重排

原因在于某一个线程执行到第一次检测,读取到的instance不为null时,instance的饮用对象可能没有完成初始化

instance = new Singleton() 可以分为一下3步完成(伪代码)
memory = allocate; // 1. 分配对象空间
instance(memory) // 2. 初始化对象
instance = memory // 3. 设置instance指向刚分配的内存地址,此时instance != null

步骤2和步骤3不存在数据依赖关系,而且无论重排前还是重排后程序的执行结果在单线程中并没有改变,因此这种重排优化是允许的。

instance = new Singleton() 可以分为一下3步完成(伪代码)
memory = allocate; // 1. 分配对象空间
instance = memory // 3. 设置instance指向刚分配的内存地址,此时instance != null 但是对象还没有初始化完成
instance(memory) // 2. 初始化对象

但是指令重排只会保证串行语义的执行一致性(单线程),但并不会关心多线程间的语义一致性。
所以当一条线程访问instance != null 时,由于实例未必已初始化完成,也就造成了线程安全的问题。

  • 解决办法也很简单,加上volatile禁止指令重排
1
private static volatile Singleton instance = null;

CAS

CAS是什么

CAS的全称为Compare-And-Swap,它是一条CPU并发原语。它的功能是判断内存某个位置的值是否为预期值,如果是则更改为新的值,这个过程是原子的。

CAS并发原语体现在JAVA语言中就是sun.misc.Unsafe类中的各个方法,调用Unsafe类中的CAS方法,JVM会帮我们实现出CAS汇编指令。这是一种完全依赖于硬件的功能,通过它实现了原子操作。再次强调,由于CAS是一种系统原语,原语属于操作系统用语范畴,是由若干条指令组成的,用于完成某个功能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许被中断,也就是说CAS是一条CPU的原子指令,不会造成所谓的数据不一致问题

比较并交换

1
2
3
AtomicInteger atomicInteger = new AtomicInteger(5);
System.out.println(atomicInteger.compareAndSet(5, 2022) + "\t current data: " + atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(5, 1024) + "\t current data: " + atomicInteger.get());

true current data: 2022
false current data: 2022

底层原理

1
2
3
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
1
2
3
4
5
6
7
8
9
10
11
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

private volatile int value;
  • Unsafe

是CAS的核心类,由于Java方法无法直接访问底层系统,需要通过本地(native)方法来访问,Unsafe相当于一个后门,基于该类可以直接操作特定内存的数据,Unsafe类存在于sun.misc包中,其内部方法操作可以像C的指针一样直接操作内存,因为Java中CAS操作的执行依赖于Unsafe方法。
注意Unsafe类中的所有方法都是native修饰的,也就是说Unsafe类中的方法都直接调用操作系统底层资源执行相应任务

  • valueOffset

变量valueOffset,表示该变量在内存中的偏移地址,因为Unsafe就是根据内存偏移地址获取数据的。

  • 变量value用volatile修饰,保证了多线程之间的可见性。

案例

假设线程A和线程B两个线程同时执行getAndAddInt操作(分别跑在不同CPU上)

  1. AtomicInteger里面的value原始值为3,即内存中AtomicInteger的value为3,根据JMM模型,线程A和线程B各自持有一份值为3的value的副本分别到各自的工作内存。
  2. 线程A通过getIntVolatile(var1, var2)拿到value值3,这时线程A被挂起。
  3. 线程B也通过getIntVolatile(var1, var2)方法获取到value值3,此时线程B刚好没有被挂起并执行了compareAndSwapInt方法比较内存值也为3,成功修改内存值为4,线程B打完收功,一切ok。
  4. 这时线程A恢复,执行compareAndSwapInt方法比较,发现自己手里的值数字3和主内存的值数字4不一致,说明该值已经被其他线程抢先一步修改过了,那线程A本次修改失败,只能重新读取重新来一遍
  5. 线程A重新获取value值,因为变量value和volatile修饰,所以其他线程对它修改,线程A总是能够看到,线程A继续执行compareAndSwapInt进行比较替换,知道成功。

CAS缺点

  • 循环时间长开销大,如果CAS长时间一直不成功,可能会给CPU带来很大的开销。
  • 只能保证一个共享变量的原子操作。对于多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁来保证原子性。
  • ABA问题

ABA问题

CAS会导致ABA问题

ABA问题是怎么产生的

CAS算法实现一个重要前提需要取出内存中某时刻的数据并在当下时刻比较并替换,那么在这个时间差异会导致数据的变化。比如说一个线程one从内存位置V中取出A,这时另一个线程two也从内存中取出A,并且线程two进行了一些操作将值变成了B,然后线程two又将V位置的数据变成A,这时候线程one进行CAS操作发现内存中仍然是A,然后线程one操作成功。尽管线程one的CAS操作成功,但是不代表这个过程就是没有问题的

AtomicReference原子引用

1
2
3
4
5
6
User z3 = new User("z3", 22);
User li4 = new User("li4", 25);
AtomicReference<User> atomicReference = new AtomicReference<>();
atomicReference.set(z3);
System.out.println(atomicReference.compareAndSet(z3, li4) + "\t" + atomicReference.get().toString());
System.out.println(atomicReference.compareAndSet(z3, li4) + "\t" + atomicReference.get().toString());

true User(userName=li4, age=25)
false User(userName=li4, age=25)

AtomicStampedReference原子引用(ABA问题的解决)

  • 创建AtomicStampedReference原子引用
1
static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(100, 1);
  • 创建t1线程
1
2
3
4
5
6
7
8
9
10
11
12
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t第一次版本号: " + atomicStampedReference.getStamp());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
atomicStampedReference.compareAndSet(100, 101, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "\t第二次版本号: " + atomicStampedReference.getStamp());
atomicStampedReference.compareAndSet(101, 100, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "\t第三次版本号: " + atomicStampedReference.getStamp());
},"t1").start();
  • 创建t2线程
1
2
3
4
5
6
7
8
9
10
11
new Thread(() -> {
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + "\t第一次版本号: " + atomicStampedReference.getStamp());
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
boolean result = atomicStampedReference.compareAndSet(100, 2019, stamp, stamp + 1);
System.out.println(Thread.currentThread().getName() + "\t 修改成功否: " + result + "\t 当前版本号: " + atomicStampedReference.getStamp());
},"t2").start();
  • 结果如下,成功解决ABA问题

t1 第一次版本号: 1
t2 第一次版本号: 1
t1 第二次版本号: 2
t1 第三次版本号: 3
t2 修改成功否: false 当前版本号: 3

集合类不安全的问题

故障现象

1
2
3
4
5
6
7
List<String> list = new ArrayList<>();
for (int i = 1; i <= 30; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list);
}, String.valueOf(i)).start();
}

java.util.ConcurrentModificationException

导致原因

并发争抢导致,一个人正在写,另一个过来抢夺,导致数据不一致异常。并发修改异常。

解决方案

  • new Vertor<>();
  • Collections.synchronizedList(new ArrayList<>());
  • new CopyOnWriteArrayList<>();

写时复制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}

CopyOnWrite容器写时复制的容器。王荣旗添加元素的时候,不直接往当前Object[]添加,而是先将当前容器Object[]进行Copy,复制出一个新的容器Object[] newElements,然后新的容器Object[] newElements里添加元素,添加元素之后,再将原容器的引用指向新的容器setArray(newElements),这样做的好处是可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素,所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器

HashSet

HashSet底层结构其实就是HashMap,这里我们以源码为证

1
2
3
public HashSet() {
map = new HashMap<>();
}

但是为什么我们add的时候只需要添加一个值呢?而不是像map一样那样是K,V呢?我们这里还是以源码为证解释

1
2
3
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}

这里的e就是我们add的值,而这个PRESENT其实是一个常量,我们不需要添加

1
private static final Object PRESENT = new Object();

Map

  • new ConcurrentHashMap<>();

公平锁和非公平锁

  • 公平锁是指多个线程按照申请锁的顺序来获取锁,类似排队打饭,先来后到
  • 非公平锁是指多个线程获取锁的顺序并不是按照申请锁的顺序,又肯呢个后申请的线程比先申请的线程有限获取锁,在高并发的情况下,有可能会造成优先级反转或者饥饿现象
  • 并发包中ReentrantLock的创建可以指定构造函数的boolean类型来得到公平锁或非公平锁,默认是非公平锁
  • 关于两者的区别
    1. 公平锁就是很公平,在并发环境中,每个线程在获取锁时会先查看此锁维护的等待队列,如果为空,或者当前线程是等待队列的第一个,就占有锁,否则就会加入到等待队列中,以后会按照FIFO的规则从队列中取到自己
    2. 非公平锁比较粗鲁,上来就是直接尝试占有锁,如果尝试失败,就再采用类似公平锁那种方式
  • 对于synchronized而已,也是一种非公平锁

可重入锁(又名递归锁)

  • 指的是同一线程外层函数获取锁之后,内层递归函数仍然能获取该锁的代码,在同一个线程在外层获取锁的时候,在进入内层方法会自动获取锁,也就是说,线程可以进入任何一个它已经拥有的锁所同步的代码块
  • ReentrantLock/Synchronized 就是一个典型的可重入锁
  • 可重入锁最大的作用就是避免死锁

代码验证

1
2
3
4
5
6
7
8
9
class Phone {
public synchronized void sendSMS() throws Exception {
System.out.println(Thread.currentThread().getName() + "\t invoked sendSMS()");
sendEmail();
}
public synchronized void sendEmail() throws Exception {
System.out.println(Thread.currentThread().getName() + "\t invoked sendEmail()");
}
}
1
2
3
4
5
6
7
new Thread(() -> {
try {
phone.sendSMS();
} catch (Exception e) {
throw new RuntimeException(e);
}
}, "t1").start();
1
2
3
4
5
6
7
new Thread(() -> {
try {
phone.sendSMS();
} catch (Exception e) {
throw new RuntimeException(e);
}
}, "t2").start();

t1 invoked sendSMS()
t1 invoked sendEmail()
t2 invoked sendSMS()
t2 invoked sendEmail()

自旋锁(spinlock)

自旋锁是指获取锁的线程不会立即阻塞,而是采用循环的方式尝试获取锁,这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗CPU

代码验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
AtomicReference<Thread> atomicReference = new AtomicReference<>();

public void myLock() {
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + "\t come in");
while (!atomicReference.compareAndSet(null, thread)) {

}
}

public void myUnlock() {
Thread thread = Thread.currentThread();
atomicReference.compareAndSet(thread, null);
System.out.println(Thread.currentThread().getName() + "\t invoked myUnlock()");
}

public static void main(String[] args) {
SpinLockDemo spinLockDemo = new SpinLockDemo();

new Thread(() -> {
spinLockDemo.myLock();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
spinLockDemo.myUnlock();
}, "AA").start();

try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

new Thread(() -> {
spinLockDemo.myLock();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
spinLockDemo.myUnlock();
}, "BB").start();
}

AA come in
BB come in
AA invoked myUnlock()
BB invoked myUnlock()

独占锁(写锁)/共享锁(读锁)/互斥锁

  • 独占锁指该锁一次只能被一个线程所持有。对ReentrantLock和Synchronized而言都是独占锁
  • 共享锁指该锁可被多个线程所持有
  • 对ReentrantReadWriteLock其读锁是共享锁,其写锁是独占锁
  • 读锁的共享锁可保证并发读是非常高效的,读写,写读,谢谢的过程是互斥的

代码验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class MyCaChe {
private volatile Map<String, Object> map = new HashMap<>();
private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
public void put(String key, Object value) {
rwLock.writeLock().lock();
System.out.println(Thread.currentThread().getName() + "\t 正在写入: " + key);
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
rwLock.writeLock().unlock();
}
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "\t 写入完成");
}
public void get(String key) {
rwLock.readLock().lock();
System.out.println(Thread.currentThread().getName() + "\t 正在读取");
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
rwLock.readLock().unlock();
}
Object result = map.get(key);
System.out.println(Thread.currentThread().getName() + "\t 读取完成: " + result);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
MyCaChe myCaChe = new MyCaChe();
for (int i = 1; i <= 5; i++) {
final int tempInt = i;
new Thread(() -> {
myCaChe.put(tempInt + "", tempInt + "");
}, String.valueOf(i)).start();
}
for (int i = 1; i <= 5; i++) {
final int tempInt = i;
new Thread(() -> {
myCaChe.get(tempInt + "");
}, String.valueOf(i)).start();
}

1 正在写入 1 写入完成 2 正在写入 2 写入完成 3 正在写入 3 写入完成
1 正在读取 3 正在读取 2 正在读取
1 读取完成 3 读取完成 2 读取完成

CountDownLatch

让一些线程阻塞知道另一些线程完成一系列操作后才能被唤醒。CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,调用线程会被阻塞。其他线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞),当计数器为零时,因调用await方法被阻塞的线程会被唤醒,继续执行。

  • 创建CountryEnum枚举类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public enum CountryEnum {
ONE(1, "齐"), TWO(2, "楚"), THREE(3, "燕"), FOUR(4, "赵"), FIVE(5, "魏"), SIX(6, "韩");
@Getter private Integer retCode;
@Getter private String retMessage;
CountryEnum(Integer retCode, String retMessage) {
this.retCode = retCode;
this.retMessage = retMessage;
}
public static CountryEnum forEach_CountryEnum(int index) {
CountryEnum[] myArray = CountryEnum.values();
for (CountryEnum element : myArray) {
if (index == element.getRetCode()) {
return element;
}
}
return null;
}
}
  • 代码演示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class CountDownLatchDemo {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 国,被灭");
countDownLatch.countDown();
}, CountryEnum.forEach_CountryEnum(i).getRetMessage()).start();
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + "\t 秦帝国,一统华夏");
}
}
  • 结果如下,只有当前面六国都被消灭之后,秦国才能统一

赵 国,被灭 齐 国,被灭 魏 国,被灭 燕 国,被灭 韩 国,被灭 楚 国,被灭
main 秦帝国,一统华夏

CyclicBarrier

CyclicBarrier的字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活,线程进入屏障通过CyclicBarrier的await()方法。

  • 代码演示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
System.out.println("召唤神龙");
});
for (int i = 1; i <= 3; i++) {
final int tempInt = i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 收集到第: " + tempInt + "龙珠");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
}
}, String.valueOf(i)).start();
}

2 收集到第: 2龙珠
1 收集到第: 1龙珠
3 收集到第: 3龙珠
召唤神龙!

Semaphore

  • 代码演示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "\t 抢到车位");
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + "\t 停车3秒后离开车位");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
semaphore.release();
}
}, String.valueOf(i)).start();
}

信号量主要用于两个目的,一个用于多个共享资源的互斥使用,另一个用于并发线程数的控制。

1 抢到车位 3 抢到车位 2 抢到车位
2 停车3秒后离开车位 1 停车3秒后离开车位 5 抢到车位 6 抢到车位
3 停车3秒后离开车位 4 抢到车位 6 停车3秒后离开车位 4 停车3秒后离开车位 5 停车3秒后离开车位

阻塞队列

阻塞队列,顾名思义,首先它是一个队列,那么他肯定是一个先进先出(FIFO)的数据结构。与普通队列不同的是,他支持两个附加操作,即阻塞添加和阻塞删除方法。

  • 当阻塞队列是空时,从队列中获取元素的操作将会被阻塞。
  • 当阻塞队列是满时,从队列中添加元素的操作将会被阻塞。
  • 试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列中插入新的元素。
  • 试图往已满的阻塞队列中添加新的元素的线程同样也会被阻塞,知道其他的线程从队列中移除一个或者多个元素或者完全晴空队列后使队列重新变得空闲起来并后续新增。

在多线程领域: 所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会被自动唤醒

为什么需要BlockingQueue

好处是我们不需要关心什么时候需要阻塞线程,什么时候需要被唤醒线程,因为这一切BlockQueue都给你一首包办了,在concurrent包发布之前,在多线程环境下,我们每个程序员都必须自己去控制这些细节,尤其还要千古效率和线程安全,而这会给我们的程序带来不小的复杂度。

BlockQueue七种实现类

  • ArrayBlockingQueue: 由数组结构组成的有界阻塞队列
  • LinkedBlockingQueue: 由链表结构组成的有界(但大小默认值为Integer.MAX_VALUE = 2147483647)阻塞队列
  • PriorityBlockingQueue: 支持优先级排序的无界阻塞队列
  • DelayQueue: 使用优先级队列实现的延迟无界阻塞队列
  • SynchronousQueue: 不存储的阻塞队列,也即单个元素的队列
  • LinkedTransferQueue: 由链表结构组成的无界阻塞队列
  • LinkedBlockingDeque: 由链表结构组成的双向阻塞队列

BlockQueue的核心方法

方法类型 抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove(e) poll() take() poll(time, unit)
检查 element() peek() 不可用 不可用
  • 抛出异常
    1. 当阻塞队列满时,再往队列里add插入元素会抛IllegalStateException: Queue full
    2. 当阻塞队列空时,再往队列里移除元素会抛NoSuchElementException
  • 特殊值
    1. 插入方法,成功true失败false
    2. 移除方法,成功返回队列的元素,队列里面没有酒返回null
  • 一直阻塞
    1. 当阻塞队列满时,生产者线程继续往队列里put元素,队列会一直阻塞生产线程知道put数据or响应中断退出
    2. 当阻塞队列空时,消费者线程试图从队列里take元素,队列会一直阻塞消费者线程知道队列可用
  • 超时退出
    1. 当阻塞队列满时,队列会阻塞生产者线程一定时间,超过限时后生产者线程会退出

代码演示

1
2
3
4
5
6
7
8
9
10
11
12
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "\t put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName() + "\t put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName() + "\t put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, "AAA").start();
1
2
3
4
5
6
7
8
9
10
11
12
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "\t" + blockingQueue.take());
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "\t" + blockingQueue.take());
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "\t" + blockingQueue.take());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, "BBB").start();

AAA put 1 BBB 1
AAA put 2 BBB 2
AAA put 3 BBB 3
注意: 这里的put和take虽然是同步的但是因为BBB线程会sleep5秒,所以也就是每次put都要等5秒

生产者消费者传统版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class ShareDate {
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment() throws Exception {
lock.lock();
try {
while (number != 0) {
condition.await();
}
number++;
System.out.println(Thread.currentThread().getName() + "\t" + number);
condition.signalAll();
} finally {
lock.unlock();
}
}
public void decrement() throws Exception {
lock.lock();
try {
while (number == 0) {
condition.await();
}
number--;
System.out.println(Thread.currentThread().getName() + "\t" + number);
condition.signalAll();
} finally {
lock.unlock();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ProdConsumerTraditionDemo {
public static void main(String[] args) {
ShareDate shareDate = new ShareDate();
new Thread(() -> {
for (int i = 1; i <= 3; i++) {
try {
shareDate.increment();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}, "AA").start();
new Thread(() -> {
for (int i = 1; i <= 3; i++) {
try {
shareDate.decrement();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}, "BB").start();
}
}

AA 1 BB 0
AA 1 BB 0
AA 1 BB 0
注意: 这里一定要用while不可以用if

Synchronized和Lock有什么区别

  • 原始构成
    1. Synchronized是关键字属于JVM层面,monitorenter(底层是通过monitor对象来完成,其实wait和notify等方法也依赖于monitor对象只有在同步快或方法中才能调用wait/notify等方法)monitorexit
    2. Lock是具体类(java.util.concurrent.locks.Lock)是api层面的锁
  • 使用方法
    1. Synchronized不需要用户去手动释放锁,当Synchronized代码执行完后系统会自动让线程释放对锁的占用
    2. ReentrantLock则需要用户去手动释放锁若没有主动释放锁,就有可能导致出现死锁现象
  • 等待是否可中断
    1. Synchronized不可中断,除非抛出异常或者正常运行完成
    2. ReentrantLock可中断,设置超时方法tryLock(long timeout, TimeUnit unit) lockInterruptibly()放代码块中,调用interrupt方法可中断
  • 加锁是否公平
    1. Synchronized非公平锁
    2. ReentrantLock两者都可以,默认非公平锁,构造方法可以传入boolean值,true为公平锁,false为非公平锁
  • 锁绑定多个条件Condition
    1. Synchronized没有
    2. ReentrantLock用来实现分组唤醒需要唤醒的线程们,可以精确唤醒,而不是像Synchronized要么随机唤醒一个线程要么唤醒全部线程

生产者消费者阻塞队列版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class MyResource {
private volatile boolean FLAG = true;
private AtomicInteger atomicInteger = new AtomicInteger();
BlockingQueue<String> blockingQueue = null;
public MyResource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
System.out.println(blockingQueue.getClass().getName());
}
public void myProd() throws Exception {
String data;
boolean retValue;
while (FLAG) {
data = atomicInteger.incrementAndGet() + "";
retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
if (retValue) {
System.out.println(Thread.currentThread().getName() + "\t 插入队列" + data + "成功");
} else {
System.out.println(Thread.currentThread().getName() + "\t 插入队列" + data + "失败");
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName() + "\t 大老板叫停,表示FLAG=false,生产动作结束");
}
public void myConsumer() throws Exception {
String result;
while (FLAG) {
result = blockingQueue.poll(2L, TimeUnit.SECONDS);
if (null == result || "".equalsIgnoreCase(result)) {
FLAG = false;
System.out.println(Thread.currentThread().getName() + "\t 超过2秒钟没有取到,消费退出");
return;
}
System.out.println(Thread.currentThread().getName() + "\t 消费成功" + result + "成功");
}
}
public void stop() {
this.FLAG = false;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 生产线程启动");
try {
myResource.myProd();
} catch (Exception e) {
throw new RuntimeException(e);
}
}, "Prod").start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 消费线程启动");
try {
myResource.myConsumer();
} catch (Exception e) {
throw new RuntimeException(e);
}
}, "Prod").start();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("5秒钟时间到,main线程叫停,活动结束");
try {
myResource.stop();
} catch (Exception e) {
throw new RuntimeException(e);
}

java.util.concurrent.ArrayBlockingQueue
Prod 生产线程启动 Prod 插入队列1成功 Prod 消费线程启动 Prod 消费成功1成功
Prod 插入队列2成功 Prod 消费成功2成功 Prod 插入队列3成功 Prod 消费成功3成功
Prod 插入队列4成功 Prod 消费成功4成功 Prod 插入队列5成功 Prod 消费成功5成功
5秒钟时间到,main线程叫停,活动结束
Prod 大老板叫停,表示FLAG=false,生产动作结束 Prod 超过2秒钟没有取到,消费退出