Reentrantlock分析【加塞:排队、排队】:wait nofity释放锁模型

2024/04/02

Java框中的AbstractQueuedSynchronizer[AQS队列同步器]框架:有锁同步[睡眠与唤起]

ReentrantLock是一种常用的Java锁,支持公平与非公平两种模式,公平锁与非公平锁的区别是是否一直遵守先来后到,公平锁:直接进入等待队列,先进入先唤醒,而非公平锁支持先来一次抢断【加塞】,抢断【加塞】不成功,再退化成排队。ReentrantLock 是基于AbstractQueuedSynchronizer框架实现的,AbstractQueuedSynchronizer[队列同步器]是并发包的核心,或者说抽象队列同步器就是锁的本体。

除了ReentrantLock、Semaphore(做流量控制)等也是借助AQS模板实现的,而AQS也是借助CAS与同步队列实现的,AQS会把请求获取锁失败的线程放入一个队列的尾部,然后睡眠。加锁是借助CAS完成,CAS保证只会有一个线程获取锁成功,失败的就进入睡眠。

ReentrantLock本身只实现了Lock、Serializable接口,

public class ReentrantLock implements Lock, Serializable {

它采用的是组合模式,而不是简单的继承,内部有个静态抽象内部类Sync,继承AbstractQueuedSynchronizer,负责AQS框架的部分:

  abstract static class Sync extends AbstractQueuedSynchronizer {

看下ReentrantLock的构造函数,ReentrantLock默认无参构造方法跟有参方法:

public ReentrantLock() {
    sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

默认非公平,如果需要使用公平锁,则需要使用有参构造函数,NonfairSync与FairSync均继承自ReentrantLock中的静态内部类Sync类 ,负责承担AQS的作用。

image.png

ReetrantLock加锁流程–公平锁 排队优先

ReentrantLock在使用时候,一般是

reentrantLock.lock();

<!--临界代码-->
...
reentrantLock.unLock();

公平锁lock()函数的实现

 static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

acquire调用的其实是父类AbstractQueuedSynchronizer的acquire方法,acquire进一步调用子类tryAcquire以及自身的acquireQueued,如果无法获取锁,并且满足某些条件则进入睡眠

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

如果tryAcquire直接就获取成功的话,是无需睡眠的,tryAcquire的作用就像名字一样,试试能不能直接获取到,逻辑如下

        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
		<!--  获取用于同步的state   private volatile int state; 在AbstractQueuedSynchronizer定义-->
            int c = getState();
            if (c == 0) {
            	<!--判断前面是不是有等待的节点,第一次进来肯定没有,这里也是跟非公平锁相差最大的地方,不是唤醒的节点是抢占的节点-->
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {  //  compareAndSetState(0, acquires)这句只会有一个现成成功
                    setExclusiveOwnerThread(current);
                    return true;
            }} 
          <!--另一半流程 可重入逻辑-->
           else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        <!--无法获取锁-->
        return false;
    }
} compareAndSetState将value的值从0更新成1,获得锁,同时将自己设置成占有锁的对象。这个时候,如果有另一个线程进来会怎样,hasQueuedPredecessors仍旧满足条件,但是compareAndSetState会无法满足,从而进入另一半流程,先判断是不是当前线程重新申请锁

if (current == getExclusiveOwnerThread()) 

这部分是可重入锁的原理部分,即一个已经获取锁的线程,重新申请锁,如果是这种情况,则直接更新state即可,也看做锁获取成功,否则认为锁获取失败,假设不是同一个线程,后续如何处理呢?流程会重新回到父类的模板:

   public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

继续走addWaiter 与acquireQueued流程,创建一个enqueues node,再加入队列,然后就可以中断

private Node addWaiter(Node mode) {
    Node node = new Node(mode);
    for (;;) {
        Node oldTail = tail;
        if (oldTail != null) {//尾部插入,头部唤起,先进先出
        	//给定对象的指定偏移地址的地方设值,与此类似操作还有:putInt,putDouble,putLong,putChar等
        	<!--为什么不用赋值?防止切换么?  设置node的前一个是oldTail,可能吧,不让队列设置被中断-->
            U.putObject(node, Node.PREV, oldTail);
            if (compareAndSetTail(oldTail, node)) { //   U.compareAndSwapObject(this, TAIL, expect, update); tail 变量赋值,保证尾部正确性,尾巴是oldTail,新尾巴是node,有且仅有一个线程符合要求
           <!-- 进入锁,有保证,可以赋值-->  
                oldTail.next = node;
                return node;
            }
        } else {
        <! 构建队列,第一个H=T使用来辅助的吗?感觉无实质意义-->
            initializeSyncQueue();
        }
    }
} 第一次进来调用initializeSyncQueue,初始化SyncQueue,利用U.compareAndSwapObject设置Head,并且tail = h,

private final void initializeSyncQueue() {
    Node h;
    if (U.compareAndSwapObject(this, HEAD, null, (h = new Node())))
        tail = h;
}

之后回到for(;;),将该线程新建的Node加入到该队列,这里核心的同步全部依靠Unsafe类的compareAndSet来实现。 for + compareAndSetTail构成自旋,组要保证只有compareAndSetTail【更新多线程共用的对象都要用CAS】,这保证只有一个线程更新tail成功,这里重点是tail,而不是tail内部的值,将其加入到队列之后,即可返回,其他线程可进入addWaiter流程,本线程acquireQueued流程继续,

final boolean acquireQueued(final Node node, int arg) {
    try {
        boolean interrupted = false;
        <!--注意这里有for (;;),for (;;)的目的是防止临界状态的时候,线程未被唤醒-->
        for (;;) {
        		<!--此时node已经加入队列尾部-->
            final Node p = node.predecessor();
            <!--判断当前加入的是不是第一个,如果是,再次尝试获取一次,如果获取tryAcquire成功,说明正好锁被释放了,可以直接获取 这个时候还没睡眠 ,如果是一个等待线程都没有,那么另一个执行线程不会唤醒水,如果有等待线程才存在唤醒,这里有个标志就是shouldParkAfterFailedAcquire设置的signal-->
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return interrupted;
            }
            <!--存在执行到这里,另一个释放的临界点-->
            <!--如果node不是第一个节点,那自己基本上要睡眠的  shouldParkAfterFailedAcquire 找到一个可以唤起自己的前驱节点这里其实就是之前第一个 ,第一次shouldParkAfterFailedAcquire设置了signal,返回了false,所以会从新进入for (;;),如果tryAcquire成功,说明释放了,如果失败,也已经加入等待队列,不会被遗漏。-->
         
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

注意,这里有个for循环,第一次shouldParkAfterFailedAcquire设置了signal,返回了false,所以会从新进入for (;;),如果tryAcquire成功,说明释放了,如果失败,也已经加入等待队列,主要是防止临界点的遗漏。如果加入队列后,还是没有获取到锁,那么parkAndCheckInterrupt会利用LockSupport.park挂起,最终其实是Unsafe的park函数,LockSupport(提供park/unpark操作,睡眠与唤起),是AQS框架中另一个重要类,跟提供CAS的Unsafa共同构建了该体系。

    private final boolean parkAndCheckInterrupt() {
   	 <!--安全挂起-->
        LockSupport.park(this);
        <!--唤醒后判断是否是被中断过,正常不会被中断-->
        return Thread.interrupted();
    }

public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker); //设置block,可能给外部看状态用的
    U.park(false, 0L);   //开始阻塞,等待该线程的unpark函数被调用
    setBlocker(t, null);//清理
}

Thread.interrupted()是用来判断是否被设置中断运行,如果被打断过,返回true,结果就是不用处理该任务了,需要终结,否则还是要处理的。从上面的流程可以看出,公平锁是在AQS的基础上实现的,AQS定义了锁的基本框架与功能:

  • CAS的能力
  • acquire :获取锁
  • tryAcquire :尝试获取,试试能不能一次性成功
  • release:释放锁
  • tryRelease:尝试释放
  • Node head、 Node tail队列 :线程等待队列模型,这里的队列对应是Thread等待队列

备注:对于已经获取到锁的线程,后续的操作就不需要任何同步处理,因为就它自己能操作其他的都无法通过CAS更新,那后续也就无需CAS更新,直接赋值即可。

加锁流程 -非公平锁加塞优先,上来就抢

   static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;


 final boolean nonfairTryAcquire(int acquires) {
		            final Thread current = Thread.currentThread();
		            int c = getState();
		            if (c == 0) {
		            <!--如果可用,上来就抢,不等-->
		                if (compareAndSetState(0, acquires)) {
		                    setExclusiveOwnerThread(current);
		                    return true;
		                }
		            }
		            else if (current == getExclusiveOwnerThread()) {
		                int nextc = c + acquires;
		                if (nextc < 0) // overflow
		                    throw new Error("Maximum lock count exceeded");
		                setState(nextc);
		                return true;
		            }
		            return false;
		        }

可以看到非公平锁在当下可用的时候,首先调用CAS尝试将state从0改为1,如果能改成功则表示能够直接获取到锁,那就可以将exclusiveOwnerThread设置为当前线程,不需要公平锁的判断是否有等待队列的操作,如果获取不到,则走后续acquire流程,而公平锁则要看自己是不是第一个,如果是才会去获取,这里之后的流程两者一致都是要睡眠等待唤起。

唤起流程

唤起其实是调用的unlock,无论是否是公平锁,这里的处理逻辑都一样

public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
         <!--   **这里在之前设置过等待信号量  pred.compareAndSetWaitStatus(ws, Node.SIGNAL)**;-->
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

tryRelease的实现在ReetrantLock中,因为ReetrantLock是可重入锁,所以要看看是不是已经到了最外层的锁,只有到了最外层,才算真正的release:

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }	

之后选择Head之后第一个线程Node进行唤起即可

   private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            node.compareAndSetWaitStatus(ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node p = tail; p != node && p != null; p = p.prev)
                if (p.waitStatus <= 0)
                    s = p;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }        可以看到最终调用的是 LockSupport.unpark(s.thread)将线程唤起。到这里ReentrantLock基本用法的分析就结束了,可以看到它基本是依靠Unsafe的CAS操作+LockSupport的park/unpark实现了锁同步。从上述分析也可以窥探AbstractQueuedSynchronizer框架的一部分,AbstractQueuedSynchronizer实现了线程队列与唤起的基本框架,将lock/unlock的能力交给外部进行定制,只需要实现AbstractQueuedSynchronizer定制的模板,就可以获得不同的锁,但是核心的阻塞/唤起框架已经定了:靠Node队列+CAS更新操作+Unsafe的睡眠/唤起能力实现。	 

ReentrantLock的Condition

ReentrantLock的Condition用来处理生产者-消费者(producer-consumer)问题,即有界缓冲区(bounded-buffer问题,一个是生产者,一个是消费者,除了临界资源的使用,还牵扯缓冲区空与满的处理,如果没有Condition,则只有临界区的概念,producer-consumer模型就不够清晰,这个模型会根据身份与缓存的状态,选择性睡眠与唤醒,而ReentrantLock是无差别的。

  • 当缓冲区已经满了,生产者还想放入新的数据,生产者应该休眠,等待消费者从缓冲区中取走数据后再唤醒它。
  • 当缓冲区已经空了,消费者还想去取消息,可以让消费者进行休眠,待生产者放数据再唤醒它。

很明显这个时候,单纯靠ReentrantLock处理是不友善的

class BoundedBuffer {
   final Lock lock = new ReentrantLock();//锁
   final Condition notFull  = lock.newCondition();//写条件 
   final Condition notEmpty = lock.newCondition();//读条件 

   final Object[] items = new Object[100];//缓存队列
   int putptr/*写索引*/, takeptr/*读索引*/, count/*队列中存在的数据个数*/;

   public void put(Object x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length)//如果队列满了 
         notFull.await();//阻塞写线程
       items[putptr] = x;//赋值 
       if (++putptr == items.length) putptr = 0;//如果写索引写到队列的最后一个位置了,那么置为0
       ++count;//个数++
       notEmpty.signal();//唤醒读线程
     } finally {
       lock.unlock();
     }
   }

   public Object take() throws InterruptedException {
     lock.lock();
     try {
       while (count == 0)//如果队列为空
         notEmpty.await();//阻塞读线程
       Object x = items[takeptr];//取值 
       if (++takeptr == items.length) takeptr = 0;//如果读索引读到队列的最后一个位置了,那么置为0
       --count;//个数--
       notFull.signal();//唤醒写线程
       return x;
     } finally {
       lock.unlock();
     }
   } 
 }
  newCondition 不区分公平锁与非公平锁,await函数会调用   LockSupport.park进行睡眠,同时也会将线程添加到等待队列,同时释放锁,


    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        <!--加入到条件等候队列中去,这个时候不睡眠-->
        Node node = addConditionWaiter();
        <!--释放锁-->
        int savedState = fullyRelease(node);
        int interruptMode = 0;
 		<!--加到睡眠等待队列中去-->
 		
        while (!isOnSyncQueue(node)) {
        <!--睡眠-->
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        <!--重新获取锁-->
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

fullyRelease的作用是就是主动释放锁,这样其他等待锁的可以运行。

final int fullyRelease(Node node) {
    try {
        int savedState = getState();
        if (release(savedState))
            return savedState;
        throw new IllegalMonitorStateException();
    } catch (Throwable t) {
        node.waitStatus = Node.CANCELLED;
        throw t;
    }
}

acquireQueued会重新获取锁,signal唤起之后,看看能不能重新获取到锁,也许不能,可能需要继续等这个时候,等待的不是条件,而是锁。跟Syncronize与wait、notify同样效果。

    final boolean acquireQueued(final Node node, int arg) {
    boolean interrupted = false;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        if (interrupted)
            selfInterrupt();
        throw t;
    }
}

唤醒之后,线程会重新获取Condition绑定的锁,获取锁之后,才会继续运行。signal的原理:

    public final void signal() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        <!--找到condition等待队列-->
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }



    private void doSignal(Node first) {
        do {
        <!--找到第一个有效的等待者-->
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }

transferForSignal唤起等待的线程,LockSupport.unpark,主要是这一句唤起线程,调用这个时候,signal线程还握着锁呢,所以被唤起的线程阻塞在等待锁的线程队列中。

final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
    if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
    <!--其实主要就是LockSupport.unpark-->
        LockSupport.unpark(node.thread);
    return true;
}

signal现成释放锁之后,wait的线程同其他等待锁的线程一起争抢锁,就是普通的锁竞争逻辑。

ReetreentLock的await根object的await有什么不同呢?ReetreentLock 的Condition接口的 await、signal、signalAll 也可以说是普通并发协作 wait、notify、notifyAll 的升级;普通并发协作 wait、notify、notifyAll 需要与synchronized配合使用,显式协作Condition 的 await、signal、signalAll 需要与显式锁Lock配合使用(Lock.newCondition()),调用await、signal、signalAll方法都必须在lock 保护之内,对比

public class BlockingQueue<T> {
    private Queue<T> mQueue = new LinkedList<>();
    private int mCapacity;

    public BlockingQueue(int capacity) {
        this.mCapacity = capacity;
    }

    public synchronized void put(T element) throws InterruptedException{
        while (mQueue.size() == mCapacity){
            wait();
        }
        mQueue.add(element);
        notify();
    }

    public synchronized T take() throws InterruptedException{
        while (mQueue.isEmpty()){
            wait();
        }
        T item = mQueue.remove();
        notify();
        return item;
    }

}

用ReentrantLock的Condition

public class BlockingQueue<T> {
    private Queue<T> mQueue = new LinkedList<>();
    private int mCapacity;
    private Lock mLock = new ReentrantLock();
    private Condition mNotFull = mLock.newCondition();
    private Condition mNotEmpty = mLock.newCondition();

    public BlockingQueue(int capacity) {
        this.mCapacity = capacity;
    }

    public   void put(T element) throws InterruptedException{
        mLock.lockInterruptibly();
        try {
            while (mQueue.size() == mCapacity){
                mNotFull.await();
            }
            mQueue.add(element);
            mNotEmpty.signal();
        }finally {
            mLock.unlock();
        }
    }

    public   T take() throws InterruptedException{
        mLock.lockInterruptibly();
        try {
            while (mQueue.size() == 0){
                mNotEmpty.await();
            }
            T item = mQueue.remove();
            mNotFull.signal();
            return item;
        }finally {
            mLock.unlock();
        }
    }
}

Search

    Table of Contents