读写锁介绍

现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写的操作了。

针对这种场景,JAVA 的并发包提供了读写锁 ReentrantReadWriteLock,它表示两个锁,一个是读操作相关的锁,称为共享锁;一个是写相关的锁,称为排他锁

线程进入读锁的前提条件:

  • 没有其他线程的写锁

  • 没有写请求, 或者有写请求,但调用线程和持有锁的线程是同一个线程(可重入锁)。

线程进入写锁的前提条件:

  • 没有其他线程的读锁

  • 没有其他线程的写锁

读写锁有以下三个重要的特性

  1. 公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。
  2. 重进入:读锁和写锁都支持线程重进入。
  3. 锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁。

ReadWriteLock

ReadWriteLock 是java.util.concurrent.locks 包中的一个接口,它只有两个方法readLock()writeLock()

ReadWriteLock 管理一组锁,一个是只读的锁,一个是写锁。

读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading.
*/
Lock readLock();

/**
* Returns the lock used for writing.
*
* @return the lock used for writing.
*/
Lock writeLock();
}

ReentrantReadWriteLock实现了ReadWriteLock接口。

ReentrantReadWriteLock 里面提供了很多丰富的方法,不过最主要的有两个方法:readLock()和 writeLock()用来获取读锁和写锁。

ReentrantReadWriteLock

ReentrantReadWriteLock 类的整体结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {

/** 读锁 */
private final ReentrantReadWriteLock.ReadLock readerLock;

/** 写锁 */
private final ReentrantReadWriteLock.WriteLock writerLock;

final Sync sync;

/** 使用默认(非公平)的排序属性创建一个新的 ReentrantReadWriteLock */
public ReentrantReadWriteLock() {
this(false);
}

/** 使用给定的公平策略创建一个新的 ReentrantReadWriteLock */
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}

/** 返回用于写入操作的锁 */
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }

/** 返回用于读取操作的锁 */
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }


abstract static class Sync extends AbstractQueuedSynchronizer {}

static final class NonfairSync extends Sync {}

static final class FairSync extends Sync {}

public static class ReadLock implements Lock, java.io.Serializable {}

public static class WriteLock implements Lock, java.io.Serializable {}
}

可以看到,ReentrantReadWriteLock实现了ReadWriteLock接口,ReadWriteLock接口定义了获取读锁和写锁的规范,具体需要实现类去实现;同时其还实现了Serializable接口,表示可以进行序列化,在源代码中可以看到ReentrantReadWriteLock实现了自己的序列化逻辑。

ReentrantReadWriteLock 有五个内部类,五个内部类之间也是相互关联的。内部类的关系如下图所示。

  • Sync继承自AQSNonfairSyncFairSync继承自 Sync
  • ReadLockWriteLock 实现了Lock接口

入门案例

假如有多个线程要同时进行读操作的话,先看一下 synchronized 达到的效果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Test {

/**
* 创建读写锁
*/
private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();

public static void main(String[] args) {

Test test = new Test();

new Thread(() -> {
test.get();
}, "线程一").start();

new Thread(() -> {
test.get();
}, "线程一").start();

}

public synchronized void get() {

System.out.println(Thread.currentThread().getName() + "正在进行读操作");
System.out.println(Thread.currentThread().getName() + "读操作完毕");
}
}

一个读完才轮到下一个,效率很低

使用 ReentrantReadWriteLock

而改成用读写锁的话,首先定义一个读写方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class ReadWrite {

/**
* 创建一个全局的读写锁
*/
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();


/**
* 读方法
*/
public void read() {

try {
lock.readLock().lock();
System.out.println(Thread.currentThread().getName() + "正在进行读操作");
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放读锁
lock.readLock().unlock();
System.out.println(Thread.currentThread().getName() + "释放读锁");
}
}

/**
* 写方法
*/
public void write() {

try {
lock.writeLock().lock();
System.out.println(Thread.currentThread().getName() + "正在进行写操做");
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放写锁
lock.writeLock().unlock();
System.out.println(Thread.currentThread().getName() + "释放写锁");
}
}
}

接着分别测试读操作和写操作

1、读操作测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Test {

public static void main(String[] args) {

ReadWrite readWrite = new ReadWrite();

new Thread(() -> {
readWrite.read();
}, "线程一").start();

new Thread(() -> {
readWrite.read();
}, "线程二").start();

}
}

2、写操作测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Test {

public static void main(String[] args) {

ReadWrite readWrite = new ReadWrite();

new Thread(() -> {
readWrite.write();
}, "线程一").start();

new Thread(() -> {
readWrite.write();
}, "线程二").start();

}
}

说明线程一和线程二可以同时进行读操作。这样就大大提升了读操作的效率。

3、读锁被占用,写操作测速

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Test {

public static void main(String[] args) {

ReadWrite readWrite = new ReadWrite();

new Thread(() -> {
readWrite.read();
}, "线程一").start();

new Thread(() -> {
readWrite.write();
}, "线程二").start();

}
}

4、写锁被占用,读操作测速

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Test {

public static void main(String[] args) {

ReadWrite readWrite = new ReadWrite();

new Thread(() -> {
readWrite.write();
}, "线程一").start();

new Thread(() -> {
readWrite.read();
}, "线程二").start();

}
}

注意:

  • 如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁。
  • 如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,则申请的线程会一直等待释放写锁。

多线程测试完了,测试一下同一个线程重复获取锁

5、同一线程获取读锁再获取写锁

此时先新增一个读写方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class ReadWrite {

/**
* 创建一个全局的读写锁
*/
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();


/**
* 先读后写
*/
public void readAfterWrite() {

try {
if (lock.readLock().tryLock()) {
System.out.println(Thread.currentThread().getName() + "正在进行读操作");
}
if (lock.writeLock().tryLock()) {
System.out.println(Thread.currentThread().getName() + "正在进行写操作");
}
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.readLock().unlock();
System.out.println(Thread.currentThread().getName() + "释放读锁");
lock.writeLock().unlock();
System.out.println(Thread.currentThread().getName() + "释放写锁");
}
}
}

进行测试

1
2
3
4
5
6
7
8
9
10
11
public class Test {

public static void main(String[] args) {

ReadWrite readWrite = new ReadWrite();

new Thread(() -> {
readWrite.readAfterWrite();
}, "线程一").start();
}
}

6、同一线程获取写锁再获取读锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void writeAfterRead() {

try {
if (lock.writeLock().tryLock()) {
System.out.println(Thread.currentThread().getName() + "正在进行写操作");
}
if (lock.readLock().tryLock()) {
System.out.println(Thread.currentThread().getName() + "正在进行读操作");
}
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.readLock().unlock();
System.out.println(Thread.currentThread().getName() + "释放读锁");
lock.writeLock().unlock();
System.out.println(Thread.currentThread().getName() + "释放写锁");
}
}

测试

1
2
3
4
5
6
7
8
public static void main(String[] args) {

ReadWrite readWrite = new ReadWrite();

new Thread(() -> {
readWrite.writeAfterRead();
}, "线程一").start();
}

7、同一线程获取读锁再获取读锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void readAfterRead() {

try {
if (lock.readLock().tryLock()) {
System.out.println(Thread.currentThread().getName() + "正在进行读操作");
}
if (lock.readLock().tryLock()) {
System.out.println(Thread.currentThread().getName() + "正在进行读操作");
}
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.readLock().unlock();
System.out.println(Thread.currentThread().getName() + "释放读锁");
lock.readLock().unlock();
System.out.println(Thread.currentThread().getName() + "释放读锁");
}
}

测试

1
2
3
4
5
6
7
8
9
10
11
public class Test {

public static void main(String[] args) {

ReadWrite readWrite = new ReadWrite();

new Thread(() -> {
readWrite.readAfterRead();
}, "线程一").start();
}
}

8、同意线程获取写锁再获取写锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void writeAfterWrite() {

try {
if (lock.writeLock().tryLock()) {
System.out.println(Thread.currentThread().getName() + "正在进行写操作");
}
if (lock.writeLock().tryLock()) {
System.out.println(Thread.currentThread().getName() + "正在进行写操作");
}
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.writeLock().unlock();
System.out.println(Thread.currentThread().getName() + "释放写锁");
lock.writeLock().unlock();
System.out.println(Thread.currentThread().getName() + "释放写锁");
}
}

测试

1
2
3
4
5
6
7
8
9
10
11
public class Test {

public static void main(String[] args) {

ReadWrite readWrite = new ReadWrite();

new Thread(() -> {
readWrite.writeAfterWrite();
}, "线程一").start();
}
}

案例总结

Locksynchronized有以下几点不同:

  1. Lock是一个接口,而synchronized是 Java 中的关键字,synchronized是内置的语言实现;
  2. synchronized在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生;而Lock在发生异常时,如果没有主动通过unLock()去释放锁,则很可能造成死锁现象,因此使用Lock时需要在 finally 块中释放锁
  3. Lock 可以让等待锁的线程响应中断 (通过interrupt()方法),而synchronized却不行,使用synchronized时,等待的线程会一直等待下去,不能够响应中断;
  4. 通过Lock可以知道有没有成功获取锁(通过tryLock()方法),而synchronized却无法办到。
  5. Lock可以提高多个线程进行读操作的效率

在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源非常激烈时(即有大量线程同时竞争),此时 Lock 的性能要远远优于 synchronized。

结果分析

  • 场景一: 多个线程同时获取读锁,结果如何? ->多个线程可以同时获取读锁
  • 场景二: 多个线程同时获取写锁, 结果如何? ->同一时间只有一个线程能获取到写锁
  • 场景三: 一个线程先获取读锁其他线程获取写锁,结果如何? ->当对象有读锁时,其他线程无法加上写锁
  • 场景四: 一个线程先获取写锁其他线程获取读锁,结果如何? ->当对象有写锁时,其他线程无法加上读锁
  • ————————————–多线程只能同时获取读锁————————————–
  • 场景五: 同一个线程获取读锁后再去获取写锁,结果如何? ->不行,同一个线程,拥有读锁无法获取写锁,不允许锁升级。
  • 场景六: 同一个线程获取写锁后再去获取读锁,结果如何? ->可以,同一线程,有写锁后可以再获取读锁,可以锁降价
  • 场景七: 同一个线程先获取读锁再获取读锁 ->可以
  • 场景八: 同一个线程先获取写锁再获取写锁 ->可以

AQS(了解)

AQS 简介

AbstractQueuedSynchronizer抽象的队列式的同步器,AQS 定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它

它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。

这里 volatile 是核心关键词,state 的访问方式有三种:

  • getState()
  • setState()
  • compareAndSetState()(线程安全)

AQS 定义两种资源共享方式:

  • Exclusive:独占,只有一个线程能执行,如 ReentrantLock
  • Share:共享,多个线程可同时执行,如 Semaphore/CountDownLatch

AQS 核心方法介绍

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS 已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

  • isHeldExclusively():该线程是否正在独占资源。只有用到 condition 才需要去实现它。
  • tryAcquire(int):独占方式加锁(修改 state 值)。尝试获取资源,成功则返回 true,失败则返回 false。
  • tryRelease(int):独占方式解锁。尝试释放资源,成功则返回 true,失败则返回 false。
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回 true,否则返回 false。

基于 AQS 的实际案列

ReentrantLock为例:

  • state 初始化为 0,表示未锁定状态。A 线程 lock()时,会调用 tryAcquire()独占该锁并将 state+1。
  • 其他线程再 tryAcquire()时就会失败,直到 A 线程 unlock()到 state=0(即释放锁)为止,其它线程才有机会获取该锁。
  • 释放锁之前,A 线程自己是可以重复获取此锁的(state 会累加),这就是可重入的概念。但要注意,获取多少次就要释放多少次,这样才能保证 state 是能回到零态的。

再以 CountDownLatch 以例:

  • 任务分为 N 个子线程去执行,state 也初始化为 N(注意 N 要与线程个数一致)。
  • 这 N 个子线程是并行执行的,每个子线程执行完后 countDown()一次,state 会 CAS 减 1。
  • 等到所有子线程都执行完后(即 state=0),会 unpark()主调用线程,然后主调用线程就会从 await()函数返回,继续后余动作。

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现 tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared 中的一种即可。但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如 ReentrantReadWriteLock。

CAS

CAS 是英文单词 Compare And Swap 的缩写,翻译过来就是比较并替换。

CAS 机制当中使用了 3 个基本操作数:内存地址 V,旧的预期值 A,要修改的新值 B。

更新一个变量的时候,只有当变量的预期值 A 和内存地址 V 当中的实际值相同时,才会将内存地址 V 对应的值修改为 B。

1、在内存地址 V 当中,存储着值为 10 的变量

2、此时线程 1 想要把变量的值增加 1。对线程 1 来说,旧的预期值 A=10,要修改的新值 B=11

3、在线程 1 要提交更新之前,另一个线程 2 抢先一步,把内存地址 V 中的变量值率先更新成了 11

4、线程 1 开始提交更新,首先进行 A 和地址 V 的实际值比较(Compare),发现 A 不等于 V 的实际值,提交失败

5、线程 1 重新获取内存地址 V 的当前值,并重新计算想要修改的新值。此时对线程 1 来说,A=11,B=12。这个重新尝试的过程被称为自旋

6、这一次比较幸运,没有其他线程改变地址 V 的值。线程 1 进行 Compare,发现 A 和地址 V 的实际值是相等的

7、线程 1 进行 SWAP,把地址 V 的值替换为 B,也就是 12

缺点:

1.CPU 开销较大

2.不能保证代码块的原子性

小结

  • 在线程持有读锁的情况下,该线程不能取得写锁(因为获取写锁的时候,如果发现当前的读锁被占用,就马上获取失败,不管读锁是不是被当前线程持有)。

  • 在线程持有写锁的情况下,该线程可以继续获取读锁(获取读锁时如果发现写锁被占用,只有写锁没有被当前线程占用的情况才会获取失败)。

原因: 当线程获取读锁的时候,可能有其他线程同时也在持有读锁,因此不能把获取读锁的线程“升级”为写锁;而对于获得写锁的线程,它一定独占了读写锁,因此可以继续让它获取读锁,当它同时获取了写锁和读锁后,还可以先释放写锁继续持有读锁,这样一个写锁就“降级”为了读锁。