登录/注册
GcsSloop
805
占位
1
占位
0
浏览量
占位
粉丝
占位
关注
JAVA并发(4)-并发队列ConcurrentLinkedQueue
GcsSloop
2021-07-29 11:17:03 2021-07-29
49
0

本文开始介绍并发队列,为后面介绍线程池打下基础。并发队列莫非也是出队入队操作,还有一个比较重要的点就是如何保证其线程安全性,有些并发队列保证线程安全是通过lock,有些是通过CAS。 我们从ConcurrentLinkedQueue开始吧。

1. 介绍

ConcurrentLinkedQueue集合框架的一员,是一个无界限且线程安全,基于单向链表的队列。该队列的顺序是FIFO。当多线程访问公共集合时,使用这个类是一个不错的选择。不允许null元素。是一个非阻塞的队列。

它的迭代器是弱一致性的,不会抛出java.util.ConcurrentModificationException,也可能在迭代期间,其他操作也正在进行。**size()**方法,不能保证是正确的,因为在迭代时,其他线程也可以操作该队列。

1.1 类图

![ConcurrentLinkedQueue类图](https://daimanet.oss-cn-beijing.aliyuncs.com/2021/2021-07/2021-07-29/896e9add4bb14cebb7ea8bce3db3656f_0.png "ConcurrentLinkedQueue类图"?x-oss-process=style/watermark) (显示的方法都是公有方法)

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>

继承至AbstractQueue,他提供了队列操作的一个框架,有基本的方法,addremoveelement等等,这些方法基于offerpollpeek(最主要看这几个方法)。

2. 源码分析

2.1 类的整体结构

队列中的元素Node

private static class Node<E> {
        // 保证两个字段的可见性
        volatile E item;
        volatile Node<E> next;

        /**
         * Constructs a new node.  Uses relaxed write because item can
         * only be seen after publication via casNext.
         */
        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }

        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }

        void lazySetNext(Node<E> val) {
            // putOrderedXXX是putXXXVolatile的延迟版本,设置某个值不会被其他线程立即看到(可见性)
            // putOrderedXXX设置的值的修饰应该是volatile,这样该方法才有用

            // 关于为什么使用这个方法,主要目的肯定是提高效率,但是具体原理,我只能告诉大家跟内存屏障有关(我也不太清楚这一块,待我研究后,再写一篇文章)
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }

        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

        // Unsafe类中的东西,可以去了解一下

        private static final sun.misc.Unsafe UNSAFE;
        private static final long itemOffset;
        private static final long nextOffset;

        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = Node.class;
                itemOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

构造器1:

// private transient volatile Node<E> head;
    // private transient volatile Node<E> tail;
    public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
    }

构造器2:

public ConcurrentLinkedQueue(Collection<? extends E> c) {
        Node<E> h = null, t = null;
        for (E e : c) {
            checkNotNull(e);
            Node<E> newNode = new Node<E>(e);
            if (h == null)
                h = t = newNode;
            else {
                t.lazySetNext(newNode);
                t = newNode;
            }
        }
        if (h == null)
            h = t = new Node<E>(null);
        head = h;
        tail = t;
    }

下面开始讲方法,从offerpollpeek从这几个方法入手

2.2 offer

添加元素到队尾。因为队列是无界的,这个方法永远不会返回false

分为三种情况进行分析(一定自己跟着代码debug,一步步的走)

  1. 单线程时(使用IDEA debug一直进入的是 **else if把我搞迷茫了,我会写一个博客来解释原因**)
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
        queue.offer("A");
        queue.offer("B");

以上面的代码,分析每一个步骤。 执行构造函数后: ![单线程初始化](https://daimanet.oss-cn-beijing.aliyuncs.com/2021/2021-07/2021-07-29/896e9add4bb14cebb7ea8bce3db3656f_1.png "单线程初始化"?x-oss-process=style/watermark)

此时链表的head与tail指向哨兵节点

插入"A", 此时没有设置tail('两跳机制',这里的原因后面详见)

![单线程插入'A'](https://daimanet.oss-cn-beijing.aliyuncs.com/2021/2021-07/2021-07-29/896e9add4bb14cebb7ea8bce3db3656f_2.png "单线程插入'A'"?x-oss-process=style/watermark)

插入"B", ![单线程插入'B'](https://daimanet.oss-cn-beijing.aliyuncs.com/2021/2021-07/2021-07-29/896e9add4bb14cebb7ea8bce3db3656f_3.png "单线程插入'B'"?x-oss-process=style/watermark)

单线程情况比较简单

  1. 多线程offer时
public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // p is last node
                // 只有一个线程能够CAS成功,其余的都重试
                if (p.casNext(null, newNode)) {

                    // 延迟设置tail,第一个node入队不会设置tail,第二个node入队才会设置tail
                    //以此类推, '两跳机制'
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            // 这里是有其他线程正在poll操作才会进入,此时只考虑多线程offer的情况,暂不分析
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                // 存在tail被更改前,和更改后的两种情况
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

结合上面的代码,看图

  • 步骤一线程A线程B都执行到
if (p.casNext(null, newNode))

![多线程初始化](https://daimanet.oss-cn-beijing.aliyuncs.com/2021/2021-07/2021-07-29/896e9add4bb14cebb7ea8bce3db3656f_4.png "多线程初始化"?x-oss-process=style/watermark)

线程B即将执行

p = (p != t && t != (t = tail)) ? t : q;
  • 步骤三 此时线程C进入。 此时,p(c) != q(c), 线程C执行
p = (p != t && t != (t = tail)) ? t : q;

执行完后,q(c)**赋值给p(c). 再次循环,此时,q(c) == null, 设置p(c)的next,线程C**将值入队 线程C Offer1.PNG

p = (p != t && t != (t = tail)) ? t : q;

因为p(b) == t(b),所以 q(b) 赋值给 p(b)。继续循环,最后得到 ![多线程offer,B插入](https://daimanet.oss-cn-beijing.aliyuncs.com/2021/2021-07/2021-07-29/896e9add4bb14cebb7ea8bce3db3656f_8.png "多线程offer,B插入"?x-oss-process=style/watermark)

  1. 多线程的另一种情况,回到步骤三,此时线程C把值入队了,但是还没有设置tail 线程C Offer1.PNG

此时,线程C执行**casTail(t, newNode)**,但是现在的tail != t(c), CAS失败, 直接返回。

2.2.1 小结

上面不管是多线程还是单线程,都是努力的去寻找next为null的节点,若为next节点为null,再判断是否满足设置tail的条件。

多线程offer的第一种情况存在设置tail滞后的问题,我把它称之为**"两跳机制",后面讲使用这种机制的原因。 我们看到上面的情况一直没有进入else if (p == q)分支,进入else if分支只会发生在有其他线程在poll时,我们先讲讲poll,再讲讲何时进入else if**分支。

2.3 poll

删除并返回头结点的值

简单提一下单线程多线程poll,着重分析一下polloffer共存的情况

  1. 单线程时 ![单线程poll](https://daimanet.oss-cn-beijing.aliyuncs.com/2021/2021-07/2021-07-29/896e9add4bb14cebb7ea8bce3db3656f_11.png "单线程poll"?x-oss-process=style/watermark) 单线程比较简单,就不画图了,按照上面的queue,进行一步一步的debug就行了
  2. 多线程,只有poll
public E poll() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;

                // casItem这里只有一个线程能够成功,其余的继续下面的代码
                if (item != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }
final void updateHead(Node<E> h, Node<E> p) {
        if (h != p && casHead(h, p))
            // 将之前的头节点,自己指向自己,等待被GC
            h.lazySetNext(h);
    }

从上面代码可以看出,修改itemhead都会使用CAS,这些变量都是被volatile修饰,所以保证了这些变量的线程安全性。不管是单线程还是多线程的poll,它们都是去寻找一个有效的头节点,删除并返回该值,若不是有效的就继续找,若队列为空了,就返回null

最后分析一下,offerpoll共存的情况

Node<E> q = p.next;

线程B进入,进行poll操作 此时,线程B执行了一次内循环,将q(b)赋值给了p(b); ![多线程offer与poll,poll1](https://daimanet.oss-cn-beijing.aliyuncs.com/2021/2021-07/2021-07-29/896e9add4bb14cebb7ea8bce3db3656f_14.png "多线程offer与poll,poll1"?x-oss-process=style/watermark)

Node<E> q = p.next;

![多线程offer与poll,offer2](https://daimanet.oss-cn-beijing.aliyuncs.com/2021/2021-07/2021-07-29/896e9add4bb14cebb7ea8bce3db3656f_16.png "多线程offer与poll,offer2"?x-oss-process=style/watermark)

此时,p(a).next 指向自己(等待被GC), 进入else if (p == q)**分支,线程A**退出,经过一番执行后,最后得到的状态,如下: ![多线程offer与poll,offer3](https://daimanet.oss-cn-beijing.aliyuncs.com/2021/2021-07/2021-07-29/896e9add4bb14cebb7ea8bce3db3656f_17.png "多线程offer与poll,offer3"?x-oss-process=style/watermark)

进入else if (p == q)**分支的情况,只会发生在polloffer**共存的情况下。

2.4 peek

获取首个有效的节点,并返回

public E peek() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;
                if (item != null || (q = p.next) == null) {
                    updateHead(h, p);
                    return item;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

peekpoll的操作类似,这里就贴一下代码就是了。

3. 总结

ConcurrentLinkedQueue是使用非阻塞的方式保证线程的安全性,在设置关系到整个Queue结构的变量时(这些变量都被volatile修饰),都使用CAS的方式对它们进行赋值。

  • size方法是线程不安全的,返回的结果可能不准确

关于“两跳机制”(自己取得名字),

Both head and tail are permitted to lag. In fact, failing to update them every time one could is a significant optimization (fewer CASes). As with LinkedTransferQueue (see the internal documentation for that class), we use a slack threshold of two; that is, we update head/tail when the current pointer appears to be two or more steps away from the first/last node. Since head and tail are updated concurrently and independently, it is possible for tail to lag behind head (why not)? -- ConcurrentLinkedQueue

大致意思,headtail允许被延迟设置。不是每次更新它们是一个重大的优化,这样做就可以更少的CAS(这样在很多线程使用时,积少成多,效率更高)。它的延迟阈值是2,设置head/tail时,当前的结点离first/last有两步或更多的距离。 这就是“两跳机制

我们想不通的地方,可能是这个类或方法的一个优化的地方。向着大佬看齐~

4. 引用

原文: https://www.cnblogs.com/ukyu/p/14832585.html

暂无评论