22park:AQS详解

2.2 队列同步器(AQS)

队列同步器AbstractQueuedSynchronizer(以下简称同步器),是用来构建锁或者其他同步组件的基础框架。

2.2.1 同步器的接口与示例

源码解析 https://www.cnblogs.com/waterystone/p/4920797.html

《Java并发编程的艺术》

同步器的设计是基于模版方法模式的,也就是说,使用者需要继承同步器并重写指定的 方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。重写同步器指定的方法时,需要使用同步器提供的如下3个方法来访问或修改同步状态。

  • getState():获取当前同步状态。

  • setState(int newState):设置当前同步状态。

  • compareAndSetState(int expect,int update):使用CAS设置当前状态,该方法能够保证状态 设置的原子性。

同步状态state使用volatitle修饰。

/** * The synchronization state. */private volatile int state;

同步器可重写的方法与描述

  • protected boolen isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
  • protected boolean tryAcquire(int arg):独占方式。尝试获取资源,成功则返回true,失败则返回false。
  • protected boolean tryRelease(int arg):独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • protected int tryAcquireShared(int arg):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • protected boolean tryReleaseShared(int arg):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

此时可去看下ReentrantLock源码看下AQS的应用。

2.2.2 队列同步器的实现分析
同步队列

同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。

同步队列中的节点(Node)用来保存获取同步状态失败的线程引用、等待状态以及前驱和 后继节点,节点的属性类型与名称以及描述。

变量waitStatus则表示当前Node结点的等待状态,共有5种取值CANCELLED、SIGNAL、CONDITION、PROPAGATE、0。

  • CANCELLED(1):表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
  • SIGNAL(-1):表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。
  • CONDITION(-2):表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
  • PROPAGATE(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
  • 0:新结点入队时的默认状态。

注意,负值表示结点处于有效等待状态,而正值表示结点已被取消。所以源码中很多地方用>0、<0来判断结点的状态是否正常

源码如下:

static final class Node { /** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3; /** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened. * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). */ volatile int waitStatus; volatile Node prev; volatile Node next; /** * The thread that enqueued this node. Initialized on * construction and nulled out after use. */ volatile Thread thread; /** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */ Node nextWaiter; /** * Returns true if node is waiting in shared mode. */ final boolean isShared() { return nextWaiter == SHARED; } /** * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. * * @return the predecessor of this node */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
独占式同步状态获取与释放
获取资源

独占式同步状态获取流程,也就是acquire(int arg)方法调用流程,如图5-5所示。

在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列 (或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。

源码解析:

/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { if (!tryAcquire(arg) && //获取到资源 !true,根据&&特性,后面代码不执行。返回 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //加入等待队列尾部等待获取资源,获取到资源后返回是否被中断 selfInterrupt(); //补上中断 }

独占模式获取,忽略中断。 通过调用至少一次实施tryAcquire ,在成功时返回。 否则,线程排队,可能重复blocking and unblocking,调用tryAcquire直到成功为止。 这种方法可以用来实现方法Lock.lock 。

  • 方法流程如下:
  • tryAcquire()尝试直接去获取资源,如果成功则直接返回(这里体现了非公平锁,每个线程获取锁时会尝试直接抢占加塞一次,而CLH队列中可能还有别的线程在等待);

  • addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;

  • acquireQueued()使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。

  • 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

  • 下面看看各个方法

  • tryAcquire
  • 此方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false

    protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException();}

    具体资源的获取交由自定义同步器去实现了。

  • addWaiter
  • 将当前线程加入队列的队尾

    private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; //尝试快速方式直接放到队尾。失败后调用enq方法 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { //cas pred.next = node; return node; } } enq(node); //加入队尾 return node;}

    2.1 enq(node)方法:

    private Node enq(final Node node) { for (;;) { //cas自旋,直到加入队列成功 Node t = tail; if (t == null) { // Must initialize 队列为空 if (compareAndSetHead(new Node())) tail = head; } else { //加入尾节点 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } }}
  • acquireQueued
  • 线程进入等待状态,直到其它线程唤醒自己去获取资源,成功之后返回是否被中断过。

    final boolean acquireQueued(final Node node, int arg) { boolean failed = true; //是否失败(拿到资源) try { boolean interrupted = false; //是否被中断 for (;;) { //自旋,直到获取到资源 final Node p = node.predecessor(); //前驱 //前驱是head,则尝试去获取资源(被唤醒或者中断) if (p == head && tryAcquire(arg)) { //获取到了资源,退出自旋 setHead(node); //设置为head p.next = null; // help GC failed = false; return interrupted; } //还不能获取资源,则进入waiting //通过park()进入waiting状态,直到被unpark()。如果不可中断的情况下被中断了,那么会从park()中醒过来,发现拿不到资源,从而继续进入park()等待。 if (shouldParkAfterFailedAcquire(p, node) && //检查前驱状态 parkAndCheckInterrupt()) //park interrupted = true; } } finally { if (failed) // 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待。 cancelAcquire(node); }}

    进入shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()里面去详细看看。

    3.1 shouldParkAfterFailedAcquire

    检查线程状态,将前驱设置为SIGNAL。找到最近一个正常等待的线程,告诉它通知自己(unpark)一下。

    The successor of this node is (or will soon be) blocked (via park), so the current node must unpark its successor when it releases or cancels。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //前驱状态 if (ws == Node.SIGNAL) //已经是SIGNAL /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { //如果前驱CANCELLED放弃了,一直往前搜索,直到找到正常的节点 /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ //前驱正常,设置为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;}

    3.2 parkAndCheckInterrupt

    进入park状态,并且检查interrupt

    /** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted();}

    acquireQueued小结;

  • 结点进入队尾后,检查状态,找到安全休息点;
  • 调用park()进入waiting状态,等待unpark()或interrupt()唤醒自己;
  • 被唤醒后,看自己是不是有资格获取资源。如果是,head指向当前结点,并返回从入队到获取到资源的整个过程中是否被中断过;如果没,继续流程1。
  • 最后总结下:

  • 用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
  • 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
  • acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  • 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
  • 再来看一下开头的流程图:

    释放资源
    • 资源释放方法release。

    它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源

    public final boolean release(int arg) { if (tryRelease(arg)) { //释放资源 Node h = head; //找到头节点 if (h != null && h.waitStatus != 0) unparkSuccessor(h); //唤醒等待队列里的下一个线程 return true; } return false;}
  • tryRelease
  • 这个方法是需要独占模式的自定义同步器去实现的。当资源彻底释放时需要返回true。

    protected boolean tryRelease(int arg) { throw new UnsupportedOperationException();}
  • unparkSuccessor
  • 唤醒节点的字节点,如果存在一个。

    /** * Wakes up node's successor, if one exists. * * @param node the node */private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ //node 一般为当前线程所在节点 int ws = node.waitStatus; //节点状态 if (ws < 0) //置零当前线程所在的结点状态(signalling),允许失败。 compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; //找到下一个节点 if (s == null || s.waitStatus > 0) { //如果为空或取消 s = null; for (Node t = tail; t != null && t != node; t = t.prev) //从后往前找 if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); //unpark 唤醒}
    共享式同步状态获取与释放

    共享式访问资源时,其他共享式的访问均被允许,而独占式访问被阻塞。独占式访问资源时,同一时刻其他访问均被阻塞

    通过调用同步器的acquireShared(int arg)方法可以共享式地获取同步状态。

    源码:

    https://www.cnblogs.com/waterystone/p/4920797.html

    此方法是共享模式下线程获取共享资源的顶层入口。它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程忽略中断。下面是acquireShared()的源码:

    1 public final void acquireShared(int arg) {2 if (tryAcquireShared(arg) < 0)3 doAcquireShared(arg);4 }

    这里tryAcquireShared()依然需要自定义同步器去实现。但是AQS已经把其返回值的语义定义好了:负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。所以这里acquireShared()的流程就是:

  • tryAcquireShared()尝试获取资源,成功则直接返回;
  • 失败则通过doAcquireShared()进入等待队列,直到获取到资源为止才返回。
  • 3.3.1 doAcquireShared(int)

    此方法用于将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回。下面是doAcquireShared()的源码:

    1 private void doAcquireShared(int arg) { 2 final Node node = addWaiter(Node.SHARED);//加入队列尾部 3 boolean failed = true;//是否成功标志 4 try { 5 boolean interrupted = false;//等待过程中是否被中断过的标志 6 for (;;) { 7 final Node p = node.predecessor();//前驱 8 if (p == head) {//如果到head的下一个,因为head是拿到资源的线程,此时node被唤醒,很可能是head用完资源来唤醒自己的 9 int r = tryAcquireShared(arg);//尝试获取资源10 if (r >= 0) {//成功11 setHeadAndPropagate(node, r);//将head指向自己,还有剩余资源可以再唤醒之后的线程12 p.next = null; // help GC13 if (interrupted)//如果等待过程中被打断过,此时将中断补上。14 selfInterrupt();15 failed = false;16 return;17 }18 }19 20 //判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt()21 if (shouldParkAfterFailedAcquire(p, node) &&22 parkAndCheckInterrupt())23 interrupted = true;24 }25 } finally {26 if (failed)27 cancelAcquire(node);28 }29 }

    有木有觉得跟acquireQueued()很相似?对,其实流程并没有太大区别。只不过这里将补中断的selfInterrupt()放到doAcquireShared()里了,而独占模式是放到acquireQueued()之外,其实都一样,不知道Doug Lea是怎么想的。

    跟独占模式比,还有一点需要注意的是,这里只有线程是head.next时(“老二”),才会去尝试获取资源,有剩余的话还会唤醒之后的队友。那么问题就来了,假如老大用完后释放了5个资源,而老二需要6个,老三需要1个,老四需要2个。老大先唤醒老二,老二一看资源不够,他是把资源让给老三呢,还是不让?答案是否定的!老二会继续park()等待其他线程释放资源,也更不会去唤醒老三和老四了。独占模式,同一时刻只有一个线程去执行,这样做未尝不可;但共享模式下,多个线程是可以同时执行的,现在因为老二的资源需求量大,而把后面量小的老三和老四也都卡住了。当然,这并不是问题,只是AQS保证严格按照入队顺序唤醒罢了(保证公平,但降低了并发)。

    setHeadAndPropagate(Node, int)

    1 private void setHeadAndPropagate(Node node, int propagate) { 2 Node h = head; 3 setHead(node);//head指向自己 4 //如果还有剩余量,继续唤醒下一个邻居线程 5 if (propagate > 0 || h == null || h.waitStatus < 0) { 6 Node s = node.next; 7 if (s == null || s.isShared()) 8 doReleaseShared(); 9 }10 }

    此方法在setHead()的基础上多了一步,就是自己苏醒的同时,如果条件符合(比如还有剩余资源),还会去唤醒后继结点,毕竟是共享模式!

    doReleaseShared()我们留着下一小节的releaseShared()里来讲。

    小结

    OK,至此,acquireShared()也要告一段落了。让我们再梳理一下它的流程:

  • tryAcquireShared()尝试获取资源,成功则直接返回;
  • 失败则通过doAcquireShared()进入等待队列park(),直到被unpark()/interrupt()并成功获取到资源才返回。整个等待过程也是忽略中断的。
  • 其实跟acquire()的流程大同小异,只不过多了个自己拿到资源后,还会去唤醒后继队友的操作(这才是共享嘛)

    2.3 重入锁(ReentrantLock)

    重入锁ReentrantLock,顾名思义,就是支持重进入的锁,它表示该锁能够支持一个线程对资源的重复加锁。除此之外,该锁的还支持获取锁时的公平和非公平性选择。

    synchronized关键字隐式的支持重进入,比如一个synchronized修饰的递归方法,在方法执行时,执行线程在获取了锁之后仍能连续多次地获得该锁。

    ReentrantLock虽然没能像synchronized关键字一样支持隐式的重进入,但是在调用lock()方 法时,已经获取到锁的线程,能够再次调用lock()方法获取锁而不被阻塞。

    实现重新进入
  • 线程再次获取锁。锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再 次成功获取。

  • 锁的最终释放。线程重复n次获取了锁,随后在第n次释放该锁后,其他线程能够获取到 该锁。锁的最终释放要求锁对于获取进行计数自增,计数表示当前锁被重复获取的次数,而锁 被释放时,计数自减,当计数等于0时表示锁已经成功释放。

  • 公平锁/非公平锁

    如果在绝对时间上,先对锁进行获取的请求一定先被满足(FIFO),那么这个锁是公平的,反之,是不公平的。公平的获取锁,也就是等待时间最长的线程最优先获取锁,也可以说锁获取是顺序的。公平的锁机制往往没有非公平的效率高,但是,并不是任何场景都是以TPS作为 唯一的指标,公平锁能够减少“饥饿”发生的概率,等待越久的请求越是能够得到优先满足。

    ReentrantLock是通过组合自定义同步器来实现锁的获取与释放,以非公平性(默认的)实 现为例,获取同步状态的代码如代码

    final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); //当前线程 int c = getState(); if (c == 0) { //资源可用 if (compareAndSetState(0, acquires)) { //cas设置同步状态 setExclusiveOwnerThread(current); //设置当前拥有独占访问权的线程 return true; } } else if (current == getExclusiveOwnerThread()) { //当前线程,再次获取锁,实现锁的重入 int nextc = c + acquires; //增加同步状态值 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }

    释放锁

    锁的释放需要减少重入的锁,直到同步状态为0才释放锁。

    protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { //状态为0释放 free = true; setExclusiveOwnerThread(null); } setState(c); return free;}

    公平锁的获取代码

    /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && //公平锁的实现方式 compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }}

    相比于非公平锁,公平锁仅多了hasQueuedPredecessors判断条件,加入了同步队列中当前节点是否有前驱节点的判断,如果该方法返回true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁。

    小结:

    公平性锁保证了锁的获取按照FIFO原则,而代价是进行大量的线程切换。非公平性锁虽然可能造成线程“饥饿”,但极少的线程切换,保证了其更大的吞吐量。

    2.4 读写锁

    读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读 线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写 锁,使得并发性相比一般的排他锁有了很大提升。

    Java并发包提供读写锁的实现是 ReentrantReadWriteLock,它提供的特性如表

    • 写锁的获取与释放

    写锁是一个支持重进入的排它锁。

    • 读锁的获取与释放

    读锁是一个支持重进入的共享锁

    • 锁降级

    锁降级指的是写锁降级成为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读 锁,这种分段完成的过程不能称之为锁降级。锁降级是指把持住(当前拥有的)写锁,再获取到 读锁,随后释放(先前拥有的)写锁的过程。

    锁降级中读锁的获取是否必要呢?答案是必要的。主要是为了保证数据的可见性,如果 当前线程不获取读锁而是直接释放写锁,假设此刻另一个线程(记作线程T)获取了写锁并修 改了数据,那么当前线程无法感知线程T的数据更新。如果当前线程获取读锁,即遵循锁降级 的步骤,则线程T将会被阻塞,直到当前线程使用数据并释放读锁之后,线程T才能获取写锁进 行数据更新。

    RentrantReadWriteLock不支持锁升级(把持读锁、获取写锁,最后释放读锁的过程)。目的 也是保证数据可见性,如果读锁已被多个线程获取,其中任意线程成功获取了写锁并更新了 数据,则其更新对其他获取到读锁的线程是不可见的。

    相关推荐

    相关文章