一、介绍 AQS
Doug Lea 在写 JUC
的时候引入了 java.util.concurrent.locks.AbstractQueuedSynchronizer
,也就是 基于队列实现的抽象同步器,我们一般称之为 AQS
。
何为 AQS ?
JUC
里面很多有关锁、同步组件(ReentrantLock
、CountDownLatch
、CyclicBarrier
、FutureTask
等)的实现都是基于AQS
。
为什么需要 AQS ?
很多锁、同步器,它们有很多工作是类似的,如果能提取出一个工具类,那么就可以直接用,对于 ReentrantLock
和 CountDownLatch
这些来说可以屏蔽很多细节,通过实现一些方法,就可以获得AQS
的能力,只关注它们自己的业务逻辑就可以。
同步器基本原理
// acquire操作如下:
while(同步状态申请获取失败) {
if(当前线程未进入等待队列) {
当前线程放入等待队列;
}
尝试阻塞当前线程;
}
当前线程移出等待队列
// release操作如下:
更新同步状态
if(同步状态足够允许一个阻塞的线程申请获取) {
解除一个或者多个等待队列中的线程的阻塞状态;
}
为了实现上述操作,需要下面三个基本环节的相互协作:
- 同步状态的原子性管理(使用
CAS
原子的修改共享标记位) - 等待队列的管理
- 线程的阻塞与解除阻塞(调用
LockSupport
,底层是基于Unsafe
类的park()
&unpark()
方法)
二、源码分析
AQS 关键属性
// java.util.concurrent.locks.AbstractQueuedSynchronizer
// 头结点,包装着当前持有锁的线程
private transient volatile Node head;
// 尾节点,每个新的节点进来,都插入到最后,形成了一个链表
private transient volatile Node tail;
// 代表当前锁的状态,0 代表没有被占用,大于 0 代表有线程持有当前锁,volatile 保证线程之间的可见性
private volatile int state;
// 代表当前持有独占锁的线程
// AbstractQueuedSynchronizer 继承自 AbstractOwnableSynchronizer 的一个属性
private transient Thread exclusiveOwnerThread;
Node 关键属性
// java.util.concurrent.locks.AbstractQueuedSynchronizer.Node
static final class Node {
// 标识节点当前在共享模式下(线程以共享的模式等待锁)
static final Node SHARED = new Node();
// 标识节点当前在独占模式下(线程正在以独占的方式等待锁)
static final Node EXCLUSIVE = null;
// 此线程取消了争抢这个锁
static final int CANCELLED = 1;
// 表示线程已经准备好了,就等资源释放了
static final int SIGNAL = -1;
// 表示节点在等待队列中,节点线程等待唤醒(先不讨论)
static final int CONDITION = -2;
// 表示下个 acquireShared 应该无条件的传播(当前线程处在 SHARED 情况下,该字段才会使用,先不讨论)
static final int PROPAGATE = -3;
// 取值为上面的 1、-1、-2、-3,或者 0 (初始化的默认值)
// 如果 waitStatus > 0 代表此线程取消了等待
volatile int waitStatus;
// 前驱节点的引用
volatile Node prev;
// 后继节点的引用
volatile Node next;
// 包在当前 node 的线程
volatile Thread thread;
// 下一个等待条件的节点的引用(先不讨论)
Node nextWaiter;
}
AQS 关键方法
AQS
定好方法让子类根据自身的需要去实现
// java.util.concurrent.locks.AbstractQueuedSynchronizer
// 获取锁(修改标志位),可以进入等待队列,直到获取锁
// final 修饰,子类只能调用不能修改
// 会调子类的 tryAcquire(),尝试获取锁
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 释放锁
// final 修饰,子类只能调用不能修改
// 会调子类的 tryRelease(),释放锁,完全释放返回 true,否则 false
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// 尝试获取锁(修改标志位),立即返回
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 尝试释放锁
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// 为当前线程和给定模式创建节点并将其排队
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
// 入队
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// 唤醒后继节点
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 从后尾部往前遍历找到最前的一个处于正常阻塞状态的结点
// 为了避免高并发情况下,漏了一些节点的情况
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒
LockSupport.unpark(s.thread);
}
// 中断当前线程
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
Acquire:
while (!tryAcquire(arg)) {
enqueue thread if it is not already queued;
possibly block current thread;
}Release:
if (tryRelease(arg))
unblock the first queued thread;
从 ReentrantLock 看 AQS 原理
ReentrantLock
的内部类 Sync
,有公平锁和非公平锁两种同步模式。
// java.util.concurrent.locks.ReentrantLock.Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
// 争抢锁
abstract void lock();
// 释放锁的过程不区分公平锁和非公平锁,所以放在父类 Sync 里
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// state 为 0,说明没有线程获取锁,把当前持有独占锁的线程设为 null
free = true;
setExclusiveOwnerThread(null);
}
// state 不为 0,说明线程重入,还没完全释放锁,把剩余的未释放的次数设回 state
setState(c);
return free;
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
// java.util.concurrent.locks.ReentrantLock.NonfairSync
// 非公平锁的实现
static final class NonfairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
// 因为这是非公平锁,所以走非公平锁的实现
return nonfairTryAcquire(acquires);
}
// 争抢锁
final void lock() {
// 通过 CAS 设置变量 State
if (compareAndSetState(0, 1))
// CAS 成功,也就是获取锁成功,则将当前线程设置为独占线程
setExclusiveOwnerThread(Thread.currentThread());
else
// CAS 失败,也就是获取锁失败,则走 acquire() 方法
// 这里的 acquire() 里面是先调非公平锁的 tryAcquire() 方法
// 如果成功就获得锁,失败则调 acquireQueued() 方法入队
acquire(1);
}
}
// java.util.concurrent.locks.ReentrantLock.FairSync
// 公平锁的实现
static final class FairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 加锁
final void lock() {
// 这里的 acquire() 里面是先调公平锁的 tryAcquire() 方法
// 如果成功就获得锁,失败则调 acquireQueued() 方法入队
acquire(1);
}
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer
// 以自旋模式获取锁
// 如果在等待时被打断,则返回 true, 否则返回 false
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 看一下前驱节点是不是为头结点,如果是表明当前节点不需要等待,直接 tryAcquire() 争抢锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 检查和更新未能获取的节点的状态
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 前个节点的等待状态为 SIGNAL,说明前个节点状态正常,返回 true
return true;
if (ws > 0) {
// 前个节点的等待状态大于 0,说明前个节点已经取消等待,继续往前找,直到前个节点是好的
// 进入阻塞队列排队的线程会被挂起,而唤醒的操作是由前驱节点完成的
// 这里就需要找个好的前驱节点,因为还得靠它来唤醒
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 代码如果走到这里说明前驱节点的 waitStatus 不等于 -1 和 1,那也就是只可能是 0,-2,-3
// 每个新的 node 入队时,waitStatus 没有设置,所以初始化都是 0
// 正常情况如果前驱节点是之前的 tail 节点,那么它的 waitStatus 就为 0
// 此时需要通过 CAS 将前驱节点的 waitStatus 设为 SIGNAL (也就是-1)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 这次返回 false,下次死循环进来 shouldParkAfterFailedAcquire() 就会返回 true
return false;
}
// 让当前线程 park,然后检查是否中断
private final boolean parkAndCheckInterrupt() {
// 阻塞线程,底层调用 UNSAFE 的 api
LockSupport.park(this);
return Thread.interrupted();
}
三、总结
这次从源码入手,浅显的介绍了 AQS
,主要从 acquire()
& release()
为入口,以 ReentrantLock
的拿锁,排队,等待挂起,唤醒,释放展开分析 AQS
的基本原理。
Q.E.D.