5.3 AbstractQueuedSynchronizer

2016-06-26 11:37:43 4,714 6

AbstractQueuedSynchronizer是JAVA 5构建同步组件的基础,通常简称为AQS。AQS是java并发包中最核心的一个类,没有之一。可以说没有弄懂AQS,就没有真正的掌握java并发包,其是在LockSupport和Unsafe类的基础上实现的,有了前面的介绍,理解AQS就更为容易。

AQS的主要使用方式继承,我们通过编写一个类继承AbstractQueuedSynchronizer,并覆写其特定的方法,来实现具有不同功能的同步组件。

一、同步组件基础知识介绍

同步组件从总体上区分,可以分为:

独占式同步组件(又称独占锁):在任一时刻只能有一个线程获取到锁,可以执行。其他线程被阻塞,进去等待队列。类似于synchronized关键字的作用

共享式同步组件(又称共享锁):共享锁允许多个线程同时运行,通常情况下,共享锁内部维护了若干个执行许可(后文简称许可),每个线程执行的时候获取一个许可,运行结束的时候释放许可。当一个线程请求执行时,如果共享锁已经没有许可,那么该线程进入等待队列。

注:可以认为独占式同步组件是共享式同步组件的一个特例,其只有1个许可。

从这个角度理解的话 ,我们认为不论是独占式同步组件还是共享式同步组件,都应该提供获取许可释放许可的功能。

  • 获取许可:如果线程能够成功获取许可,则拥有执行的权利,可用许可总数量-1,如果没有获取到许可,线程被阻塞,并被加入等待队列。

  • 释放许可:拥有执行许可的线程运行完成后,释放许可,可用许可数量为+1。如果等待队列中存在被阻塞的线程,要负责唤醒等待的线程。

可以看到,在获取许可和释放许可的过程中,都有两个非常重要的基本内容要维护:1、可用许可的数量 2、等待队列

二、AQS对同步组件基本功能的支持介绍

队列同步器AbstractQueuedSynchronizer作为构建同步组件的基础框架。支持独占式地获取同步状态,也可以支持共享式的获取同步状态,这样就可以方便实现不同类型的同步组件(例如:ReentryLockReentryReadWriteLock、和CountDownLatch等都是在AQS的基础上编写的)。

不管是独占式同步组件还是共享式同步组件,使用的过程本质上都是获取许可与释放许可。而获取与释放许可过程中最关键的又是对于许可数量的维护以及等待队列的维护。而AQS对这二者都提供了支持:

可用许可的数量的支持它使用了1个一个int类型state变量,表示同步状态,也就是说笔者所说执行许可。初始值默认为0。最简单的情况下,假设我们限定最多有5个许可,那么state值为0就表示当前一个许可都未使用。线程每获取一个许可,state变量+1,表示已经使用了几个许可。当然state变量的含义是由开发者定义的,你完全可以用负数例如state为-5时,表示5个许可都被释放完。

等待队列的支持 :通过内置的FIFO队列来完成资源获取线程的排队工作,队列的节点是基于内部静态类Node实现

需要注意的是,AQS对于等待队列的支持是完全的,也就是对开发者完全屏蔽了被阻塞线程的入队与出队操作的实现细节;而对可有许可数量(也就是对state变量)的维护只是提供了部分支持,需要开发者覆写特定的方法才能正常工作。

1、对于等待队列的支持是完全的含义:

AQS是基于模板方法设计的。可以做的是帮助我们在调用模板方法的时候,自动帮我们维护线程的等待队列。被阻塞的线程会被封装到一个Node对象中,加入队列。被阻塞的线程执行结束之后,从队列中移除这个Node。AQS中维护的两个变量分别表示线程等待队列的头和尾:

private transient volatile Node head;
private transient volatile Node tail;

因为等待队列是AQS维护的,所以并没有方法提供可以直接对队列进行修改。不过了解AQS是如何维护队列的细节对于我们理解是有好处的,在后文我们会详细分析。目前情况下,我们只需要知道AQS是通过这两个变量完成一个队列的维护即可。

2、对于state变量支持是部分的含义:

state变量的定义如下:

private volatile int state;

对于许可数量的维护(也就是对于state变量的维护),必须由开发人员自己来实现。例如对于一个独占式同步组件,可能我们会希望state的最大值为1,而对于一个共享式同步组件,我们会希望state变量的最大值是大于1的其他正整数。有时我们可能一次只获取一个许可,有时可能又会一次获取多个许可。

作为一个基础框架,AQS是无法知道我们想要如何操作。因此不像对于等待队列的维护,AQS提供了一些方法,可以让我们修改state变量,如下所示:

protected final int getState() {
    return state;
}
protected final void setState(int newState) {
    state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

3、如何保证state变量的维护与等待队列的一致性

因为获取许可与释放许可的过程中,会同时涉及state变量的维护和等待队列的维护。为了减少操作的复杂性, AQS是基于模板设计模式实现的,

通过对模板方法的调用,可以保证对state变量和等待队列维护的一致性。

AQS模板方法列表:

方法名称描述
void acquire(int arg)独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用重写的tryAcquire(int arg)方法
void acquireInterruptibly(int arg)与tryAcquire(int arg)相同,但是该方法响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException
boolean tryAcquireNanos(int arg, long nanosTimeout)在acquireInterruptibly(int arg)基础上增加了超时限制,如果当前线程在超时实现内没有获取到同步状态,那么将会返回false,如果获取到了就返回true
boolean release(int arg)独占式释放同步状态,该方法会在释放同步状态后,将同步队列中第一个节点包含的线程唤醒
void acquireShared(int arg)共享式的获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式获取的主要区别是在同一时刻可以有多个线程获取到同步状态。
void acquireSharedInterruptibly(int arg)与void acquireInterruptibly(int arg)相同,该方法响应中断
boolean tryAcquireSharedNanos(int arg, long nanosTimeout)在acquireSharedInterruptibly(int arg)的基础上增加了超时限制。
boolean releaseShared(int arg)共享式释放同步状态
Collection<Thread> getQueuedThreads()获取等待在同步队列上的线程集合。

同步器提供的模板方法基本上分为3类,独占式获取与释放同步状态、共享式获取与释放同步状态和查询同步队列中的线程等待情况。自定义同步组件将使用同步器提供的模板方法来实现自己的同步语义。

以AQS模板方法acquire(int arg)为例,这是一个获取执行许可的方法,arg表示获取许可的数量。其源码如下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) && //功能1:state变量的维护,开发者需要覆写此方法
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//功能2:线程等待队列的维护,AQS覆写
        selfInterrupt();//不响应中断,也就是不抛出异常,目前不在讨论范围,后面会介绍
}

可以看到在模板方法,分别调用了state变量维护相关方法tryAcquire(arg)以及等待队列维护相关方法acquireQueued(addWaiter(Node.EXCLUSIVE), arg))。

对于线程等待队列的维护,AQS已完成,我们不要考虑。

对于state变量的维护,调用的是需要开发者覆写的tryAcquire(int arg)方法实现。在覆写过的tryAcquire(int arg)方法中,通过调用getState()、setState()、compareAndSetState(int expect, int update)方法,从而实现对state变量的维护与访问。

4、模板方法与需要开发者覆写的方法的对应关系

AQS中定义了8个模板方法,对应4个需要开发者覆写的方法:

组件类型模板方法需要覆写的方法
独占式同步组件

void acquire(int arg)

boolean tryAcquire(int arg)

void acquireInterruptibly(int arg)

boolean tryAcquireNanos(int arg, long nanosTimeout)

boolean tryAcquireNanos(int arg, long nanosTimeout)

boolean tryRelease(int arg)

共享式同步组件

void acquireShared(int arg)

int tryAcquireShared(int arg)

void acquireSharedInterruptibly(int arg)

boolean tryAcquireSharedNanos(int arg, long nanosTimeout)

boolean releaseShared(int arg)

boolean tryReleaseShared(int arg)

此外,还需要一个isHeldExclusively()方法需要覆写,不过其没有对应的模板方法。

通常情况下,我们实现的同步组件要不就是独占式,要不就是共享式,但是也有例外,例如ReadWriteLock同时实现了独占式与共享式。

因此当我们需要实现一个独占式同步组件时,只需要覆写AQS的tryAcquire和tryRelease即可;当实现一个共享式同步组件的时候,只需要实现tryAcquireShared和tryReleaseShared即可。isHeldExclusively()是可选的。默认情况下,需要覆写的方法的实现都是抛出一个UnsupportedOperationException。

5、编写同步组件需要注意的地方

1)使用新的接口和实现包装同步组件:在我们编写一个同步组件的时候,例如我们想实现一个独占锁,假设为Sync,其继承了AQS。只需要在Sync类中覆写tryRelease和tryAcquire即可,但是由于继承AQS的时候,会把tryAcquireShared、tryReleaseShared等共享锁方法也继承下来。而Sync并不会实现这些共享式同步组件的方法,因为Sync只是一个独占锁而已,从业务含义上,因此应该将这些方法屏蔽,从而防止用户误操作。按照最佳实现,屏蔽的方式是定义一个新的接口,假设用Mutex表示,这个接口只定义了独占锁相关方法,再编写一个类MutexImpl实现Mutex接口,而对于同步组件Sync类的操作,都封装在MutexImpl中。

2)同步组件推荐定义为静态内部类:因为某个同步组件通常是为实现特定的目的而实现,可能只适用于特定的场合。如果某个同步组件不具备通用性,我们应该将其定义为一个私有的静态内部类。结合第一点,我们编写的同步组件Sync应该是MutexImpl的一个私有的静态内部类。

三、使用AQS

只有掌握了同步器的工作原理才能更加深入的理解并发包中的其他并发组件。所以下面通过一个独占锁的示例来深入了解一下同步器的工作原理。顾名思义,独占锁就是同一时刻只能有一个线程获取到锁,而其他获取锁的线程只能处于同步队列中等待,只有获取锁的线程释放了锁,后继的线程才能够获取锁。

案例代码:

Mutex接口:

//不可重入的独占锁接口
public interface Mutex {
    //获取锁
    public void lock();
    //释放锁
    public void release();
}

MutexImpl

//实现
public class MutexImpl implements Mutex{
    // 仅需要将操作代理到Sync上即可
    private Sync sync=new Sync();
    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void release() {
        sync.release(1);
    }

    //独占式同步组件实现
    private static class Sync extends AbstractQueuedSynchronizer{

        @Override
        protected boolean tryAcquire(int arg) {
            return compareAndSetState(0,1);
        }

        @Override
        protected boolean tryRelease(int arg) {
            return compareAndSetState(1,0);
        }
    }
}

测试类MutexMain:

public class MutexMain {
    public static void main(String[] args) throws InterruptedException {
        Mutex mutex=new MutexImpl();
        for (int i = 0; i <5 ; i++) {
            new MutexThread("线程"+i,mutex).start();
        }
    }
    static class MutexThread extends Thread{
        private Mutex mutex;

        public MutexThread(String name,Mutex mutex) {
            this.mutex = mutex;
            this.setName(name);
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+"启动..");
            mutex.lock();
            System.out.println(Thread.currentThread().getName()+"获取锁成功..");
            try {
                System.out.println(Thread.currentThread().getName()+"开始执行,当前时间:"+new Date().toLocaleString());
                Thread.currentThread().sleep(1000);//假设线程执行需要1秒钟
                System.out.println(Thread.currentThread().getName()+"结束执行,当前时间:"+new Date().toLocaleString());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                System.out.println(Thread.currentThread().getName()+"释放锁..");
                mutex.release();
            }

        }
    }
}

案例代码某次输出:

线程1启动..

线程1获取锁成功..

线程0启动..

线程2启动..

线程3启动..

线程4启动..

线程1开始执行,当前时间:2016-6-25 15:08:06

线程1结束执行,当前时间:2016-6-25 15:08:07

线程1释放锁..

线程0获取锁成功..

线程0开始执行,当前时间:2016-6-25 15:08:07

线程0结束执行,当前时间:2016-6-25 15:08:08

线程0释放锁..

线程2获取锁成功..

线程2开始执行,当前时间:2016-6-25 15:08:08

线程2结束执行,当前时间:2016-6-25 15:08:09

线程2释放锁..

线程3获取锁成功..

线程3开始执行,当前时间:2016-6-25 15:08:09

线程3结束执行,当前时间:2016-6-25 15:08:10

线程3释放锁..

线程4获取锁成功..

线程4开始执行,当前时间:2016-6-25 15:08:10

线程4结束执行,当前时间:2016-6-25 15:08:11

线程4释放锁..

可以看到,我们的独占锁的确是起作用了,任意一时刻只有一个线程在运行。

请读者注意输出结果线程启动的顺序:1,0,2,3,4。线程1先获取到了锁并执行,而0、2、3、4被加入到了等待队列。而后面获取到锁的顺序也是0,2,3,4。这是因为AQS内部是使用一个FIFO队列,所以先进入等待队列的先获取到锁。

不可重入演示

public class MutextNoReentryMain {
    public static void main(String[] args) {
        Mutex mutex=new MutexImpl();
        mutex.lock();
        mutex.lock();//重复lock
        System.out.println("运行结束");
    }
}

上述代码中,永远不会打印出"运行结束"这句话,程序会一直阻塞,因为我们的锁是不可重入的。

说明:所谓不可重入,指的是,一个线程在释放锁之前,不能再次获取这个锁。上述代码中,第一次调用lock时,主线程获取到锁,可以运行,可以在主线程第二次获取锁的时候,因为锁已经被占用了,所以第二次无法获取。由于我们对于一个线程无法获取锁时,就会对其进行阻塞,并加入等待队列。因此第二次获取不到锁,其结果是导致主线程被阻塞了,最终程序就会一直被阻塞。