[TOC]
创新互联建站主要从事成都网站建设、成都做网站、网页设计、企业做网站、公司建网站等业务。立足成都服务连平,10余年网站建设经验,价格优惠、服务专业,欢迎来电咨询建站服务:028-86922220
ArrayBlockingQueue 1.8 源码浅析
一,简介
ArrayBlockingQueue 是一个用数组实现的有界队列;此队列按照先进先出(FIFO)的规则对元素进行排序;默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序的访问队列,即先阻塞的线程先访问队列;非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问;为了保证公平性,通常会降低吞吐量。
二,类UML图

三,基本成员
    /** The queued items */
    // 记录数据的数组
    final Object[] items;
    /** items index for next take, poll, peek or remove */
    // 索引用于 take,poll,peek,remove 等方法
    int takeIndex;
    /** items index for next put, offer, or add */
    // 索引用于 put,offer,or add 等方法
    int putIndex;
    /** Number of elements in the queue */
    // 总数
    int count;
    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */
    /** Main lock guarding all access */
    // 队列的锁
    final ReentrantLock lock;
    /** Condition for waiting takes */
    // 用于让线程等待,消费时队列为空
    private final Condition notEmpty;
    /** Condition for waiting puts */
    // 用于让线程等待,生产时队列满
    private final Condition notFull;四,常用方法
构造方法
我们看下两个构造,其实也就是一个,注意没有无参构造,初始化时必须要给出容量。
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
    // 初始化一个ArrayBlockingQueue
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        // 初始化一个数组
        this.items = new Object[capacity];
        // 初始化一个锁
        lock = new ReentrantLock(fair);
        // 用来存放消费者的阻塞线程
        notEmpty = lock.newCondition();
        // 用来存放生产者的线程
        notFull =  lock.newCondition();
    }add 方法
可以看出add调用的是offer方法,详情请看offer方法。
    public boolean add(E e) {
        // 调用父类的方法
        return super.add(e);
    }
    // 父类 AbstractQueue 的add方法
     public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }注意:add 插入失败会抛异常。
offer 方法
    // offer加入元素
    public boolean offer(E e) {
        // 不能为null
        checkNotNull(e);
        // 获取锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 如果数组满了,返回false
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
    // enqueue
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        // 获取数组
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        // 唤醒消费阻塞的队列
        notEmpty.signal();
    }
注意:offer还有一个重载方法,带有超时时间的插入,支持中断offer(E e, long timeout, TimeUnit unit)。
put 方法
public void put(E e) throws InterruptedException {
        // 不能为null
        checkNotNull(e);
        // 获取锁
        final ReentrantLock lock = this.lock;
        // 支持中断
        lock.lockInterruptibly();
        try {
            // 等于数组的容量
            while (count == items.length)
                // 等待
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }注意:put和前面的offer要区别,offer方法队列满是返回false,put方法是让线程等待,根据自己的场景用合适的方法。
poll 方法
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }注意:poll也有一个重载方法,带有超时和中断poll(long timeout, TimeUnit unit)。
take 方法
    // 消费
    public E take() throws InterruptedException {
        // 获取锁
        final ReentrantLock lock = this.lock;
        // 支持中断
        lock.lockInterruptibly();
        try {
            // 队列为空
            while (count == 0)
                // 阻塞
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }注意:take和poll也是一对方法,poll队列为空返回null,take是让线程等待,直到唤醒。
peek 方法
// 获取队尾的元素 不删除
    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return itemAt(takeIndex); // null when queue is empty
        } finally {
            lock.unlock();
        }
    }size 方法
    // 统计个数 size是准确值
    public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }五,总结
ArrayBlockingQueue 是有界的,所以我们在初始化是容量要设计好,因为它是不可以扩容的,还有我觉得这个队列适合一些稳定并发量的系统,如果并发量突然变大,导致队列满,会造成大量的线程等待,影响系统的响应;我们通过阅读源码也发现队列的源码是很轻量的,使用起来也很简单,让人很好理解;使用这个队列一定要注意put,offer,take,poll这两组方法,根据自己的业务场景选择是直接返回(响应速度快)还是阻塞线程。
参考:《Java 并发编程的艺术》
名称栏目:ArrayBlockingQueue1.8源码浅析
链接地址:http://www.scyingshan.cn/article/gcihsc.html

 建站
建站
 咨询
咨询 售后
售后
 建站咨询
建站咨询 
 