一、介绍 AQS

Doug Lea 在写 JUC 的时候引入了 java.util.concurrent.locks.AbstractQueuedSynchronizer,也就是 基于队列实现的抽象同步器,我们一般称之为 AQS

何为 AQS ?

JUC里面很多有关锁、同步组件(ReentrantLockCountDownLatchCyclicBarrierFutureTask 等)的实现都是基于AQS

为什么需要 AQS ?

很多锁、同步器,它们有很多工作是类似的,如果能提取出一个工具类,那么就可以直接用,对于 ReentrantLockCountDownLatch 这些来说可以屏蔽很多细节,通过实现一些方法,就可以获得AQS的能力,只关注它们自己的业务逻辑就可以。

同步器基本原理

// acquire操作如下:
while(同步状态申请获取失败) {
    if(当前线程未进入等待队列) {
        当前线程放入等待队列;
    }
    尝试阻塞当前线程;
}
当前线程移出等待队列

// release操作如下:
更新同步状态
if(同步状态足够允许一个阻塞的线程申请获取) {
    解除一个或者多个等待队列中的线程的阻塞状态;
}

为了实现上述操作,需要下面三个基本环节的相互协作:

  • 同步状态的原子性管理(使用 CAS 原子的修改共享标记位)
  • 等待队列的管理
  • 线程的阻塞与解除阻塞(调用LockSupport,底层是基于Unsafe类的 park() & unpark()方法)

二、源码分析

AQS 关键属性

// java.util.concurrent.locks.AbstractQueuedSynchronizer
// 头结点,包装着当前持有锁的线程
private transient volatile Node head;

// 尾节点,每个新的节点进来,都插入到最后,形成了一个链表 
private transient volatile Node tail;

// 代表当前锁的状态,0 代表没有被占用,大于 0 代表有线程持有当前锁,volatile 保证线程之间的可见性
private volatile int state;

// 代表当前持有独占锁的线程
// AbstractQueuedSynchronizer 继承自 AbstractOwnableSynchronizer 的一个属性
private transient Thread exclusiveOwnerThread;

image

Node 关键属性

// java.util.concurrent.locks.AbstractQueuedSynchronizer.Node
static final class Node {
  // 标识节点当前在共享模式下(线程以共享的模式等待锁)
  static final Node SHARED = new Node();
  // 标识节点当前在独占模式下(线程正在以独占的方式等待锁)
  static final Node EXCLUSIVE = null;

  // 此线程取消了争抢这个锁
  static final int CANCELLED =  1;
  // 表示线程已经准备好了,就等资源释放了
  static final int SIGNAL    = -1;
  // 表示节点在等待队列中,节点线程等待唤醒(先不讨论)
  static final int CONDITION = -2;
  // 表示下个 acquireShared 应该无条件的传播(当前线程处在 SHARED 情况下,该字段才会使用,先不讨论)
  static final int PROPAGATE = -3;

  // 取值为上面的 1、-1、-2、-3,或者 0 (初始化的默认值)
  // 如果 waitStatus > 0 代表此线程取消了等待
  volatile int waitStatus;

  // 前驱节点的引用
  volatile Node prev;

  // 后继节点的引用
  volatile Node next;

  // 包在当前 node 的线程
  volatile Thread thread;

  // 下一个等待条件的节点的引用(先不讨论)
  Node nextWaiter;
}

AQS 关键方法

AQS 定好方法让子类根据自身的需要去实现

// java.util.concurrent.locks.AbstractQueuedSynchronizer
// 获取锁(修改标志位),可以进入等待队列,直到获取锁
// final 修饰,子类只能调用不能修改
// 会调子类的 tryAcquire(),尝试获取锁
public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

// 释放锁
// final 修饰,子类只能调用不能修改
// 会调子类的 tryRelease(),释放锁,完全释放返回 true,否则 false
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

// 尝试获取锁(修改标志位),立即返回
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

// 尝试释放锁
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

// 为当前线程和给定模式创建节点并将其排队
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

// 入队
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

// 唤醒后继节点
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
      	// 从后尾部往前遍历找到最前的一个处于正常阻塞状态的结点
      	// 为了避免高并发情况下,漏了一些节点的情况
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
 	if (s != null)
      	// 唤醒
        LockSupport.unpark(s.thread);
}

// 中断当前线程
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

Acquire:
while (!tryAcquire(arg)) {
enqueue thread if it is not already queued;
possibly block current thread;
}

Release:
if (tryRelease(arg))
unblock the first queued thread;

从 ReentrantLock 看 AQS 原理

ReentrantLock 的内部类 Sync,有公平锁和非公平锁两种同步模式。

// java.util.concurrent.locks.ReentrantLock.Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
    // 争抢锁
    abstract void lock();
  
    // 释放锁的过程不区分公平锁和非公平锁,所以放在父类 Sync 里
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
          	throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
          	// state 为 0,说明没有线程获取锁,把当前持有独占锁的线程设为 null
            free = true;
            setExclusiveOwnerThread(null);
        }
      	// state 不为 0,说明线程重入,还没完全释放锁,把剩余的未释放的次数设回 state
        setState(c);
        return free;
    }
  
  	final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
              	throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
  	}
}
// java.util.concurrent.locks.ReentrantLock.NonfairSync
// 非公平锁的实现
static final class NonfairSync extends Sync { 
    protected final boolean tryAcquire(int acquires) {
        // 因为这是非公平锁,所以走非公平锁的实现
        return nonfairTryAcquire(acquires);
    }
  
    // 争抢锁
    final void lock() {
      // 通过 CAS 设置变量 State
      if (compareAndSetState(0, 1))
          // CAS 成功,也就是获取锁成功,则将当前线程设置为独占线程
          setExclusiveOwnerThread(Thread.currentThread());
      else
          // CAS 失败,也就是获取锁失败,则走 acquire() 方法
          // 这里的 acquire() 里面是先调非公平锁的 tryAcquire() 方法
          // 如果成功就获得锁,失败则调 acquireQueued() 方法入队
          acquire(1);
    }
}
// java.util.concurrent.locks.ReentrantLock.FairSync
// 公平锁的实现
static final class FairSync extends Sync {
  	protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
              setExclusiveOwnerThread(current);
              return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
              throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
  
    // 加锁
    final void lock() {
      	// 这里的 acquire() 里面是先调公平锁的 tryAcquire() 方法
        // 如果成功就获得锁,失败则调 acquireQueued() 方法入队
      	acquire(1);
    }
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer
// 以自旋模式获取锁
// 如果在等待时被打断,则返回 true, 否则返回 false
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 看一下前驱节点是不是为头结点,如果是表明当前节点不需要等待,直接 tryAcquire() 争抢锁
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
              	interrupted = true;
        }
    } finally {
        if (failed)
           cancelAcquire(node);
    }
}

// 检查和更新未能获取的节点的状态
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
      	// 前个节点的等待状态为 SIGNAL,说明前个节点状态正常,返回 true
      	return true;
    if (ws > 0) {
      	// 前个节点的等待状态大于 0,说明前个节点已经取消等待,继续往前找,直到前个节点是好的
      	// 进入阻塞队列排队的线程会被挂起,而唤醒的操作是由前驱节点完成的
        // 这里就需要找个好的前驱节点,因为还得靠它来唤醒
        do {
          	node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
      	// 代码如果走到这里说明前驱节点的 waitStatus 不等于 -1 和 1,那也就是只可能是 0,-2,-3
      	// 每个新的 node 入队时,waitStatus 没有设置,所以初始化都是 0
      	// 正常情况如果前驱节点是之前的 tail 节点,那么它的 waitStatus 就为 0
      	// 此时需要通过 CAS 将前驱节点的 waitStatus 设为 SIGNAL (也就是-1)
      	compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
  	// 这次返回 false,下次死循环进来 shouldParkAfterFailedAcquire() 就会返回 true
    return false;
}

// 让当前线程 park,然后检查是否中断
private final boolean parkAndCheckInterrupt() {
  	// 阻塞线程,底层调用 UNSAFE 的 api
    LockSupport.park(this);
    return Thread.interrupted();
}

image

三、总结

这次从源码入手,浅显的介绍了 AQS ,主要从 acquire() & release() 为入口,以 ReentrantLock 的拿锁,排队,等待挂起,唤醒,释放展开分析 AQS 的基本原理。

Q.E.D.


知识的价值不在于占有,而在于使用