首页 体育 教育 财经 社会 娱乐 军事 国内 科技 互联网 房产 国际 女人 汽车 游戏

从ReentrantLock的实现看AQS的原理及应用

2020-01-14

一般来说,自界说同步器要么是独占办法,要么是同享办法,它们也只需完结tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。AQS也支撑自界说同步器一起完结独占和同享两种办法,如ReentrantReadWriteLock。ReentrantLock是独占锁,所以完结了tryAcquire-tryRelease。

以非公正锁为例,这儿首要论述一下非公正锁与AQS之间办法的相关之处,详细每一处中心办法的效果会在文章后边详细进行论述。

为了协助咱们了解ReentrantLock和AQS之间办法的交互进程,以非公正锁为例,咱们将加锁和解锁的交互流程独自拎出来着重一下,以便于对后续内容的了解。

加锁:

经过ReentrantLock的加锁办法Lock进行加锁操作。

会调用到内部类Sync的Lock办法,因为Sync#lock是笼统办法,依据ReentrantLock初始化挑选的公正锁和非公正锁,履行相关内部类的Lock办法,实质上都会履行AQS的Acquire办法。

AQS的Acquire办法会履行tryAcquire办法,可是因为tryAcquire需求自界说同步器完结,因而履行了ReentrantLock中的tryAcquire办法,因为ReentrantLock是经过公正锁和非公正锁内部类完结的tryAcquire办法,因而会依据锁类型不同,履行不同的tryAcquire。

tryAcquire是获取锁逻辑,获取失利后,会履行结构AQS的后续逻辑,跟ReentrantLock自界说同步器无关。

解锁:

经过ReentrantLock的解锁办法Unlock进行解锁。

Unlock会调用内部类Sync的Release办法,该办法承继于AQS。

Release中会调用tryRelease办法,tryRelease需求自界说同步器完结,tryRelease只在ReentrantLock中的Sync完结,因而能够看出,开释锁的进程,并不区别是否为公正锁。

开释成功后,一切处理由AQS结构完结,与自界说同步器无关。

经过上面的描绘,大约能够总结出ReentrantLock加锁解锁时API层中心办法的映射联系。

ReentrantLock中公正锁和非公正锁在底层是相同的,这儿以非公正锁为例进行剖析。

在非公正锁中,有一段这样的代码:

// java.util.concurrent.locks.ReentrantLock
static final class NonfairSync extends Sync {
 final void lock {
 if )
 setExclusiveOwnerThread);
 else
 acquire;
}

看一下这个Acquire是怎样写的:

// java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire {
 if  acquireQueued, arg))
 selfInterrupt;
}

再看一下tryAcquire办法:

// java.util.concurrent.locks.AbstractQueuedSynchronizer
protected boolean tryAcquire {
 throw new UnsupportedOperationException;
}

能够看出,这儿仅仅AQS的简略完结,详细获取锁的完结办法是由各自的公正锁和非公正锁独自完结的。假如该办法回来了True,则阐明当时线程获取锁成功,就不必往后履行了;假如获取失利,就需求参加到等候行列中。下面会详细解说线程是何时以及怎样被参加进等候行列中的。

当履行Acquire时,会经过tryAcquire获取锁。在这种状况下,假如获取锁失利,就会调用addWaiter参加到等候行列中去。

获取锁失利后,会履行addWaiter参加等候行列,详细完结办法如下:

// java.util.concurrent.locks.AbstractQueuedSynchronizer
private Node addWaiter {
 Node node = new Node, mode);
 // Try the fast path of enq; backup to full enq on failure
 Node pred = tail;
 if  {
 node.prev = pred;
 if ) {
 pred.next = node;
 return node;
 enq;
 return node;
private final boolean compareAndSetTail {
 return unsafe.compareAndSwapObject;
}

首要的流程如下:

经过当时的线程和锁方式新建一个节点。

Pred指针指向尾节点Tail。

将New中Node的Prev指针指向Pred。

经过compareAndSetTail办法,完结尾节点的设置。这个办法首要是对tailOffset和Expect进行比较,假如tailOffset的Node和Expect的Node地址是相同的,那么设置Tail的值为Update的值。

// java.util.concurrent.locks.AbstractQueuedSynchronizer
static {
 try {
 stateOffset = unsafe.objectFieldOffset);
 headOffset = unsafe.objectFieldOffset);
 tailOffset = unsafe.objectFieldOffset);
 waitStatusOffset = unsafe.objectFieldOffset);
 nextOffset = unsafe.objectFieldOffset);
 } catch  { 
 throw new Error; 
}

从AQS的静态代码块能够看出,都是获取一个目标的特点相关于该目标在内存傍边的偏移量,这样咱们就能够依据这个偏移量在目标内存傍边找到这个特点。tailOffset指的是tail对应的偏移量,所以这个时分会将new出来的Node置为当时行列的尾节点。一起,因为是双向链表,也需求将前一个节点指向尾节点。

// java.util.concurrent.locks.AbstractQueuedSynchronizer
private Node enq {
 for  {
 Node t = tail;
 if  { // Must initialize
 if ))
 tail = head;
 } else {
 node.prev = t;
 if ) {
 t.next = node;
 return t;
}

假如没有被初始化,需求进行初始化一个头结点出来。但请留意,初始化的头结点并不是当时线程节点,而是调用了无参结构函数的节点。假如阅历了初始化或许并发导致行列中有元素,则与之前的办法相同。其实,addWaiter便是一个在双端链表添加尾节点的操作,需求留意的是,双端链表的头结点是一个无参结构函数的头结点。

总结一下,线程获取锁的时分,进程大体如下:

当没有线程获取到锁时,线程1获取锁成功。

线程2请求锁,可是锁被线程1占有。

回到上边的代码,hasQueuedPredecessors是公正锁加锁时判别等候行列中是否存在有用节点的办法。假如回来False,阐明当时线程能够争夺同享资源;假如回来True,阐明行列中存在有用节点,当时线程有必要参加到等候行列中。

// java.util.concurrent.locks.ReentrantLock
public final boolean hasQueuedPredecessors {
 // The correctness of this depends on head being initialized
 // before tail and on head.next being accurate if the current
 // thread is first in queue.
 Node t = tail; // Read fields in reverse initialization order
 Node h = head;
 Node s;
 return h != t  == null || s.thread != Thread.currentThread);
}

看到这儿,咱们了解一下h != t == null || s.thread != Thread.currentThread);为什么要判其他头结点的下一个节点?第一个节点贮存的数据是什么?

双向链表中,第一个节点为虚节点,其实并不存储任何信息,仅仅占位。实在的第一个有数据的节点,是在第二个节点开端的。当h != t时: 假如 == null,等候行列正在有线程进行初始化,但仅仅进行到了Tail指向Head,没有将Head指向Tail,此刻行列中有元素,需求回来True。 假如 != null,阐明此刻行列中至少有一个有用节点。假如此刻s.thread == Thread.currentThread,阐明等候行列的第一个有用节点中的线程与当时线程相同,那么当时线程是能够获取资源的;假如s.thread != Thread.currentThread,阐明等候行列的第一个有用节点线程与当时线程不同,当时线程有必要参加进等候行列。

// java.util.concurrent.locks.AbstractQueuedSynchronizer#enq
if  { // Must initialize
 if ))
 tail = head;
} else {
 node.prev = t;
 if ) {
 t.next = node;
 return t;
}

节点入队不是原子操作,所以会呈现时间短的head != tail,此刻Tail指向最终一个节点,并且Tail指向Head。假如Head没有指向Tail,这种状况下也需求将相关线程参加行列中。所以这块代码是为了处理极点状况下的并发问题。

回到开端的源码:

// java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire {
 if  acquireQueued, arg))
 selfInterrupt;
}

上文解说了addWaiter办法,这个办法其实便是把对应的线程以Node的数据结构方式参加到双端行列里,回来的是一个包括该线程的Node。而这个Node会作为参数,进入到acquireQueued办法中。acquireQueued办法能够对排队中的线程进行“获锁”操作。

总的来说,一个线程获取锁失利了,被放入等候行列,acquireQueued会把放入行列中的线程不断去获取锁,直到获取成功或许不再需求获取。

下面咱们从“何时出行列?”和“怎样出行列?”两个方历来剖析一下acquireQueued源码:

// java.util.concurrent.locks.AbstractQueuedSynchronizer
final boolean acquireQueued {
 // 符号是否成功拿到资源
 boolean failed = true;
 try {
 // 符号等候进程中是否中止过
 boolean interrupted = false;
 // 开端自旋,要么获取锁,要么中止
 for  {
 // 获取当时节点的前驱节点
 final Node p = node.predecessor;
 // 假如p是头结点,阐明当时节点在实在数据行列的首部,就测验获取锁
 if ) {
 // 获取锁成功,头指针移动到当时node
 setHead;
 p.next = null; // help GC
 failed = false;
 return interrupted;
 // 阐明p为头节点且当时没有获取到锁或许是p不为头结点,这个时分就要判别当时node是否要被堵塞,避免无限循环糟蹋资源。详细两个办法下面细细剖析
 if  parkAndCheckInterrupt)
 interrupted = true;
 } finally {
 if 
 cancelAcquire;
}

注:setHead办法是把当时节点置为虚节点,但并没有修正waitStatus,因为它是一向需求用的数据。

// java.util.concurrent.locks.AbstractQueuedSynchronizer
private void setHead {
 head = node;
 node.thread = null;
 node.prev = null;
// java.util.concurrent.locks.AbstractQueuedSynchronizer
// 靠前驱节点判别当时线程是否应该被堵塞
private static boolean shouldParkAfterFailedAcquire {
 // 获取头结点的节点状况
 int ws = pred.waitStatus;
 // 阐明头结点处于唤醒状况
 if 
 return true; 
 // 经过枚举值咱们知道waitStatus 0是撤销状况
 if  {
 do {
 // 循环向前查找撤销节点,把撤销节点从行列中除掉
 node.prev = pred = pred.prev;
 } while ;
 return false;
}

parkAndCheckInterrupt首要用于挂起当时线程,堵塞调用栈,回来当时线程的中止状况。

// java.util.concurrent.locks.AbstractQueuedSynchronizer
private final boolean parkAndCheckInterrupt {
 LockSupport.park;
 return Thread.interrupted;
}

上述办法的流程图如下:

从上图能够看出,跳出当时循环的条件是当“前置节点是头结点,且当时线程获取锁成功”。为了避免因死循环导致CPU资源被糟蹋,咱们会判别前置节点的状况来决议是否要将当时线程挂起,详细挂起流程用流程图标明如下:

从行列中开释节点的疑虑打消了,那么又有新问题了:

shouldParkAfterFailedAcquire中撤销节点是怎样生成的呢?什么时分会把一个节点的waitStatus设置为-1?

是在什么时间开释节点告诉到被挂起的线程呢?

acquireQueued办法中的Finally代码:

// java.util.concurrent.locks.AbstractQueuedSynchronizer
final boolean acquireQueued {
 boolean failed = true;
 try {
 for  {
 final Node p = node.predecessor;
 if ) {
 failed = false;
 } finally {
 if 
 cancelAcquire;
}

经过cancelAcquire办法,将Node的状况符号为CANCELLED。接下来,咱们逐行来剖析这个办法的原理:

// java.util.concurrent.locks.AbstractQueuedSynchronizer
private void cancelAcquire {
 // 将无效节点过滤
 if 
 return;
 // 设置该节点不相关任何线程,也便是虚节点
 node.thread = null;
 Node pred = node.prev;
 // 经过前驱节点,越过撤销状况的node
 while 
 node.prev = pred = pred.prev;
 // 获取过滤后的前驱节点的后继节点
 Node predNext = pred.next;
 // 把当时node的状况设置为CANCELLED
 node.waitStatus = Node.CANCELLED;
 // 假如当时节点是尾节点,将从后往前的第一个非撤销状况的节点设置为尾节点
 // 更新失利的话,则进入else,假如更新成功,将tail的后继节点设置为null
 if ) {
 compareAndSetNext;
 } else {
 int ws;
 // 假如当时节点不是head的后继节点,1:判别当时节点前驱节点的是否为SIGNAL,2:假如不是,则把前驱节点设置为SINGAL看是否成功
 // 假如1和2中有一个为true,再判别当时节点的线程是否为null
 // 假如上述条件都满意,把当时节点的前驱节点的后继指针指向当时节点的后继节点
 if  == Node.SIGNAL || )) pred.thread != null) {
 Node next = node.next;
 if 
 compareAndSetNext;
 } else {
 // 假如当时节点是head的后继节点,或许上述条件不满意,那就唤醒当时节点的后继节点
 unparkSuccessor;
 node.next = node; // help GC
}

当时的流程:

获取当时节点的前驱节点,假如前驱节点的状况是CANCELLED,那就一向往前遍历,找到第一个waitStatus = 0的节点,将找到的Pred节点和当时Node相关,将当时Node设置为CANCELLED。

依据当时节点的方位,考虑以下三种状况:

当时节点是尾节点。

当时节点是Head的后继节点。

当时节点不是Head的后继节点,也不是尾节点。

依据上述第二条,咱们来剖析每一种状况的流程。

当时节点是尾节点。

当时节点是Head的后继节点。

当时节点不是Head的后继节点,也不是尾节点。

经过上面的流程,咱们关于CANCELLED节点状况的发生和改变现已有了大致的了解,可是为什么一切的改变都是对Next指针进行了操作,而没有对Prev指针进行操作呢?什么状况下会对Prev指针进行操作?

履行cancelAcquire的时分,当时节点的前置节点或许现已从行列中出去了,假如此刻修正Prev指针,有或许会导致Prev指向另一个现已移除行列的Node,因而这块改变Prev指针不安全。 shouldParkAfterFailedAcquire办法中,会履行下面的代码,其实便是在处理Prev指针。shouldParkAfterFailedAcquire是获取锁失利的状况下才会履行,进入该办法后,阐明同享资源已被获取,当时节点之前的节点都不会呈现改变,因而这个时分改变Prev指针比较安全。

do {
 node.prev = pred = pred.prev;
} while  {
 sync.release;
}

能够看到,实质开释锁的当地,是经过结构来完结的。

// java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean release {
 if ) {
 Node h = head;
 if 
 unparkSuccessor;
 return true;
 return false;
}

在ReentrantLock里边的公正锁和非公正锁的父类Sync界说了可重入锁的开释锁机制。

// java.util.concurrent.locks.ReentrantLock.Sync
// 办法回来当时锁是不是没有被线程持有
protected final boolean tryRelease {
 // 削减可重入次数
 int c = getState - releases;
 // 当时线程不是持有锁的线程,抛出反常
 if  != getExclusiveOwnerThread)
 throw new IllegalMonitorStateException;
 boolean free = false;
 // 假如持有线程悉数开释,将当时独占锁一切线程设置为null,并更新state
 if  {
 free = true;
 setExclusiveOwnerThread;
 setState;
 return free;
}

咱们来解说下述源码:

// java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean release {
 // 上边自界说的tryRelease假如回来true,阐明该锁没有被任何线程持有
 if ) {
 // 获取头结点
 Node h = head;
 // 头结点不为空并且头结点的waitStatus不是初始化节点状况,免除线程挂起状况
 if 
 unparkSuccessor;
 return true;
 return false;
}

这儿的判别条件为什么是h != null h.waitStatus != 0?

h == null Head还没初始化。初始状况下,head == null,第一个节点入队,Head会被初始化一个虚拟节点。所以说,这儿假如还没来得及入队,就会呈现head == null 的状况。 h != null waitStatus == 0 标明后继节点对应的线程仍在运转中,不需求唤醒。 h != null waitStatus 0 标明后继节点或许被堵塞了,需求唤醒。

再看一下unparkSuccessor办法:

// java.util.concurrent.locks.AbstractQueuedSynchronizer
private void unparkSuccessor {
 // 获取头结点waitStatus
 int ws = node.waitStatus;
 if 
 compareAndSetWaitStatus;
 // 获取当时节点的下一个节点
 Node s = node.next;
 // 假如下个节点是null或许下个节点被cancelled,就找到行列最开端的非cancelled的节点
 if  {
 s = null;
 // 就从尾部节点开端找,到队首,找到行列第一个waitStatus 0的节点。
 for 
 if 
 s = t;
 // 假如当时节点的下个节点不为空,并且状况 =0,就把当时节点unpark
 if 
 LockSupport.unpark;
}

为什么要从后往前找第一个非Cancelled的节点呢?原因如下。

之前的addWaiter办法:

// java.util.concurrent.locks.AbstractQueuedSynchronizer
private Node addWaiter {
 Node node = new Node, mode);
 // Try the fast path of enq; backup to full enq on failure
 Node pred = tail;
 if  {
 node.prev = pred;
 if ) {
 pred.next = node;
 return node;
 enq;
 return node;
}

咱们从这儿能够看到,节点入队并不是原子操作,也便是说,node.prev = pred; compareAndSetTail 这两个当地能够看作Tail入队的原子操作,可是此刻pred.next = node;还没履行,假如这个时分履行了unparkSuccessor办法,就没办法早年往后找了,所以需求从后往前找。还有一点原因,在发生CANCELLED状况节点的时分,先断开的是Next指针,Prev指针并未断开,因而也是有必要要从后往前遍历才能够遍历完悉数的Node。

综上所述,假如是早年往后找,因为极点状况下入队的非原子操作和CANCELLED节点发生进程中止开Next指针的操作,或许会导致无法遍历一切的节点。所以,唤醒对应的线程后,对应的线程就会持续往下履行。持续履行acquireQueued办法今后,中止怎样处理?

唤醒后,会履行return Thread.interrupted;,这个函数回来的是当时履行线程的中止状况,并铲除。

// java.util.concurrent.locks.AbstractQueuedSynchronizer
private final boolean parkAndCheckInterrupt {
 LockSupport.park;
 return Thread.interrupted;
}

再回到acquireQueued代码,当parkAndCheckInterrupt回来True或许False的时分,interrupted的值不同,但都会履行下次循环。假如这个时分获取锁成功,就会把当时interrupted回来。

// java.util.concurrent.locks.AbstractQueuedSynchronizer
final boolean acquireQueued {
 boolean failed = true;
 try {
 boolean interrupted = false;
 for  {
 final Node p = node.predecessor;
 if ) {
 setHead;
 p.next = null; // help GC
 failed = false;
 return interrupted;
 if  parkAndCheckInterrupt)
 interrupted = true;
 } finally {
 if 
 cancelAcquire;
}

假如acquireQueued为True,就会履行selfInterrupt办法。

// java.util.concurrent.locks.AbstractQueuedSynchronizer
static void selfInterrupt {
 Thread.currentThread.interrupt;
}

该办法其实是为了中止线程。但为什么获取了锁今后还要中止线程呢?这部分归于Java供给的协作式中止常识内容,感兴趣同学能够查阅一下。这儿简略介绍一下:

傍边断线程被唤醒时,并不知道被唤醒的原因,或许是当时线程在等候中被中止,也或许是开释了锁今后被唤醒。因而咱们经过Thread.interrupted办法查看中止符号,并记载下来,假如发现该线程被中止过,就再中止一次。

线程在等候资源的进程中被唤醒,唤醒后仍是会不断地去测验获取锁,直到抢到锁停止。也便是说,在整个流程中,并不呼应中止,仅仅记载中止记载。最终抢到锁回来了,那么假如被中止过的话,就需求弥补一次中止。

这儿的处理办法首要是运用线程池中根本运作单元Worder中的runWorker,经过Thread.interrupted进行额定的判别处理,感兴趣的同学能够看下ThreadPoolExecutor源码。

咱们在1.3末节中提出了一些问题,现在来答复一下。

Q:某个线程获取锁失利的后续流程是什么呢? A:存在某种排队等候机制,线程持续等候,依然保存获取锁的或许,获取锁流程仍在持续。 Q:已然说到了排队等候机制,那么就必定会有某种行列构成,这样的行列是什么数据结构呢? A:是CLH变体的FIFO双端行列。 Q:处于排队等候机制中的线程,什么时分能够有时机获取锁呢? A:能够详细看下2.3.1.3末节。 Q:假如处于排队等候机制中的线程一向无法获取锁,需求一向等候么?仍是有其他战略来处理这一问题? A:线程地点节点的状况会变成撤销状况,撤销状况的节点会从行列中开释,详细可见2.3.2末节。 Q:Lock函数经过Acquire办法进行加锁,可是详细是怎样加锁的呢? A:AQS的Acquire会调用tryAcquire办法,tryAcquire由各个自界说同步器完结,经过tryAcquire完结加锁进程。

ReentrantLock的可重入性是AQS很好的使用之一,在了解完上述常识点今后,咱们很简单得知ReentrantLock完结可重入的办法。在ReentrantLock里边,不管是公正锁仍是非公正锁,都有一段逻辑。

公正锁:

// java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire
if  {
 if  compareAndSetState) {
 setExclusiveOwnerThread;
 return true;
else if ) {
 int nextc = c + acquires;
 if 
 throw new Error;
 return true;
}

非公正锁:

// java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire
if  {
 if ){
 setExclusiveOwnerThread;
 return true;
else if ) {
 int nextc = c + acquires;
 if  // overflow
 throw new Error;
 return true;
}

从上面这两段都能够看到,有一个同步状况State来操控全体可重入的状况。State是Volatile润饰的,用于确保必定的可见性和有序性。

// java.util.concurrent.locks.AbstractQueuedSynchronizer
private volatile int state;

接下来看State这个字段首要的进程:

State初始化的时分为0,标明没有任何线程持有锁。

当有线程持有该锁时,值就会在本来的基础上+1,同一个线程屡次取得锁是,就会屡次+1,这儿便是可重入的概念。

解锁也是对这个字段-1,一向到0,此线程对锁开释。

除了上边ReentrantLock的可重入性的使用,AQS作为并发编程的结构,为许多其他同步东西供给了杰出的处理方案。下面列出了JUC中的几种同步东西,大体介绍一下AQS的使用场景:

热门文章

随机推荐

推荐文章