AQS与ReentrantLock

定义

AQS是并发体系中一个非常重要的类,ReentrantLock就是基于AQS实现的,AQS底层使用了LockSupport类实现对线程的等待通知机制,相比于wait和notify方法,使用的更加的灵活,所以,要学习并发线程的底层知识,这个类你必须要学明白。

初始化

默认初始化,为非公平锁,为什么?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}

/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

Sync类

他们都是继承Sync抽象类,Sync继承了AQSAbstractQuenedSynchronizer 抽象类,AQS是并发里面十分重要的一个类,jdk1.7ConcurrentHashMap曾使用过ReentranLock用于分段锁,在线程池中Worker(任务单元)也是用到了AQS。因此,要学习并发,必须把这个类学明白。

类图

AQS类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {

private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;

static final class Node {
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
}

在AQS内部维护了state字段,表示当前资源是否已经有线程对他进行了加锁,每一次加锁,该值都会加一,每次解锁,该值都会减一。

源码

lock

在lock方法,我们看到它调用了sync.lock(),sync.lock调用了acquire(1).

1
2
3
4
5
6
7
8
9
10
public void lock() {
sync.lock();
}

static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
}

再跟进到acquire(1)里,可以看到AQS的一个模板方法

1
2
3
4
5
6
7
8
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer 
implements java.io.Serializable {
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
selfInterrupt();
}
}
}

这里可以知道acquire干了三件事情

  1. tryAcquire:当前线程尝试获得锁(state),如果获得锁成功,则结束。如果获取失败,进入步骤2
  2. addWaiter:添加一个等待者,将当前线程以排他形式扔到队列里面
  3. acquireQueued:以排队的方式将阻塞的线程一个一个唤醒

tryAcquire

FairSync对tryAcquire的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
acquire(1);
}

/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

首先是先拿到当前线程,获取当前state值,看一下是否等于0,等于0表示我当前可以对他进行加锁,但是公平锁我需要去检查一下队列,看当前队列是否有别的线程已经在排队了,如果有的话,就不去CAS,返回false,如果没有(即为空或者当前线程是队头),就去CAS获取锁,返回true,hasQueuedPredecessors实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* @return {@code true} if there is a queued thread preceding the
* current thread, and {@code false} if the current thread
* is at the head of the queue or the queue is empty
* @since 1.7
*/
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

没有其他线程的话,就给自己上锁,使用CAS(compareAndSetState)将state从0设置为1,如果这中间state被别的线程改了,则加锁失败,返回false。

如果state!=0的话,会检查当前线程是否是拥有锁的线程,如果是,将state+1,表示重入。

addWaiter

当tryAcquire拿到锁以后就结束了,表示加锁成功。如果拿不到锁,那么就将调用addWaiter方法,将当前线程添加到队列的尾端,相当于排队等待被唤醒嘛,来看看它是如何实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
return node;
}

addWaiter方法有一个参数,指定了传Node.EXCLUSIVE 排他锁或者是Node.SHARED 共享锁,addWaiter做了两件事情:

  1. 一上来就使用一次CAS将要添加的线程加入到队列尾部,这是在队列不为空的情况下操作的,所以得先判断一下。
  2. 如果失败的话,那说明有竞争,已经有线程也在往队列里面排,那么我就不断的试,直到将线程续接在队列的尾部。

acquireQueued

当线程插入到队列中后,那么队列里的线程会排队的去获得锁,那么是如何实现的呢?(这些线程是不是全阻塞了啊)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

首先它看一看前置节点是不是头节点,如果不是,那就阻塞parkAndCheckInterrupt。如果是,那就尝试获取锁,这里会和头节点产生竞争,如果竞争到锁,表示头节点已经释放了(因为头节点总是先获得锁,等释放后才会唤醒队列里的线程),然后将当前节点置为头节点。如果没有竞争到锁,说明头节点还没有释放,然后检查一下,停止尝试释放锁cancelAcquire

unlock

那么,程序是如何释放锁并且释放完后,是如何通知到的后置节点的呢,有必要去看一下unlock方法了

unlock方法调用了sync.release(1)方法,AQS对该方法实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

这里干了两件事情:

  1. tryRelease:释放锁
  2. unparkSuccessor:唤醒头节点

tryRelease

释放锁很简单,将state-1,如果是0的话,需要将锁的当前拥有者置为空。

1
2
3
4
5
6
7
8
9
10
11
12
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

unparkSuccessor

这里唤醒的话,不是唤醒队列里的所有线程,而是唤醒队列的头节点,因此在唤醒之前需要判断一下队列是否为空。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

可以看到AQS内部对每一个线程Node同时维护一个waitStatus字段,这是唤醒其他线程的核心。如果队列中线程被为空或者被cancelled,就会递归找到没有cancelled的那个节点。

公平锁与非公平锁

那么,学到这里,我们来回答一下开头那个问题,为什么要默认为非公平锁?公平锁与非公平锁的区别?

看过代码知道,公平锁是要排队的,当线程竞争时,多个线程按照申请锁的顺序来获取锁,线程直接进入队列中排队,队列中的第一个线程才能获得锁。公平锁的优点是等待锁的线程不会饿死。缺点是整体吞吐效率相对非公平锁要低,等待队列中除第一个线程以外的所有线程都会阻塞,CPU唤醒阻塞线程的开销比非公平锁大。

那么非公平锁是什么个概念呢?非公平锁是多个线程加锁时直接尝试获取锁,获取不到才会到等待队列的队尾等待。但如果此时锁刚好可用,那么这个线程可以无需阻塞直接获取到锁,所以非公平锁有可能出现后申请锁的线程先获取锁的场景。非公平锁的优点是可以减少唤起线程的开销,整体的吞吐效率高,因为线程有几率不阻塞直接获得锁,CPU不必唤醒所有线程。缺点是处于等待队列中的线程可能会饿死,或者等很久才会获得锁。

对比一下代码,也是调用了sync.lock方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

这里,一来就去使用CAS尝试获取锁,如果刚好锁释放了,队列的头节点还没加锁,当前线程就有可能拥有这把锁。否则的话,调用 acquire(1)方法

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

acquire模版方法是一样的,分三步:加锁、加锁不行放等待队列,看看排队是否到我获取锁了。

唯一的区别是tryAcquire的实现方式。这里不会去检查队列hasQueuedPredecessors,因为非公平锁,是无序的,如果state为0的话,就直接使用一次CAS竞争,谁先拿到锁,谁就先执行,否则就阻塞等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

总结

综上,公平锁就是通过同步队列来实现多个线程按照申请锁的顺序来获取锁,从而实现公平的特性。非公平锁加锁时不考虑排队等待问题,直接尝试获取锁,所以存在后申请却先获得锁的情况。