Java 并发容器与阻塞队列:ConcurrentHashMap / CopyOnWriteArrayList / BlockingQueue..

并发容器

Vector和Hashtable早期也是为了并发而去开发的工具,但由于它们的并发性能太差以致于目前已经被淘汰。而造成性能差的主要原因在于,它们的底层所有方法都采用Synchronized所修饰。

我们都知道 ArrayList 和 HashMap 不是线程安全的,主要问题是因为在多线程同时写操作的时候,被添加的数据很容易被另外一个持有相同索引的线程写操作所覆盖,且 HashMap 在扩容的时候也将出现并发问题。

但可以使用 Collections.synchronizedList() 或 Collections.synchronizedMap() 将其变为线程安全的。但本质上这个工具类也只是把这两者的底层实现使用Synchronized修饰..

随后经过发展,出现了 ConcurrentHashMap 和 CopyOnWriteArrayList ,它们替代了上面的同步List和Map。在绝大多数并发情况下,其二者都要比前二者的性能更好。

唯一的例外是,Collections.synchronizedLIst() 所创建的List在写多读少的情况下优于CopyOnWriteArrayList, 这也就意味着它适合于读多写少的场景,而主要原因是因为 CopyOnWriteArrayList 在每次写入的时候都会重新复制一份列表。而 ConcurrentHashMap 则没有例外,它在任何场景下都优于通过 Collections.synchronizedMap() 创建的HashMap。

ConcurrentHashMap

ConcurrentHashMap 在put的时候主要使用了 CAS 进行赋值。其中结构与 HashMap 类似,默认为一个以链表为元素的哈希表,但当链表长度大于8的时候,该链表会被转换成红黑树。

而之所以阈值设置为8是因为作者使用泊松分布对其哈希算法进行概率计算,链表数量为8的情况下非常小,除非是哈希算法出现了问题。以致于我们几乎不可能碰到被转换为红黑树的场景,但为了应对这样可能的场景下的效率,所以做出了这样的设计。

ConcurrentHashMap 虽说是线程安全的,但也是仅能保证同时put的时候的线程安全,当我们的操作是一种组合操作的时候,也就失去了线程安全!

public class ConcurrentHashMapTest implements Runnable {
    static ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            Integer score = map.get("score");
            Integer newScore = score + 1; // 在此处可能别的线程就已经修改了“score”的值!
            map.put("score", newScore);
        }
    }
}

本来我们认为,在上述场景中应该也是能够保证线程安全的,但忘记了 ConcurrentHashMap 并没有采用悲观锁的思路,而是使用 CAS 实现的乐观锁。因此它提供了一个 map.replace() 以应对上述场景,其思路与之前原子类中的 .compareAndSet() 类似。

    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            while (true) {
                Integer score = map.get("score");
                Integer newScore = score + 1;
                if (map.replace("score", score, newScore)) {
                    break;
                }
            }
        }
    }

同理,当碰到组合操作的时候,应该先看下该类有没有提供相对于的实现。这里就不详细阐述。

CopyOnWriteArrayList

显然,CopyOnWriteArrayList 是为了代替 Vecto r和 SynchronizedList 而出现的。而它的适用场景如前述所说,适合于读多而写少的环境。

究其原因是因为,每次 CopyOnWriteArrayList 进行写入操作的时候,都会将自己的数据去复制一份副本,而写入的操作在副本之上,当修改完毕后再将之前的指针执行副本。这也就意味着,读取和写入之间并不会造成冲突,也就是写入并不会阻塞读取操作。而只有在写入的时候会通过 ReentrantLock 进行加锁。

其次,它在读取的时候完全没有进行加锁,只是直接根据索引返回底层数组中的数据。

这些也就使他在保证写入时线程安全的同时,保证了读取的效率。

但这也意味着它存在着两个问题。

    • 数据一致性问题: 虽然它能保证数据的最终一致性,但在读取之时进行写入,那么将意味着读取无法及时的获得最新的数据。
    • 内存占用问题: 因为它在写入的时候采用了复制的策略,因此在每次写入的时候都将导致内存出现两个数组。

    阻塞队列与非阻塞并发队列

    阻塞队列是指拥有阻塞功能的队列。通常阻塞队列的一端是给生产者放数据用,另一端是给消费者拿数据用,由于阻塞队列有阻塞功能,因此生产者和消费者可以多线程进行任务。

    下面将叙述的阻塞队列均继承于 BlockingQueue 接口,因此阻塞队列的方法统一,且可以分为三类,

    1. put() 、 take(),使用 put() 和 take() 加入(拿去)的时候队列已经满了,那么就会阻塞住。
    2. add() 、 remove(),使用 add() 和 remove() 加入(删除)的时候队列已经满了,那么就会抛出异常。而使用 element() 会返回队列首个元素,如果为空,也将抛出异常。
    3. offer()、poll() 取出删除、peek() 取出不删除,执行他们会返回一个是否成功的布尔值。

    常用阻塞队列

    可以发现,以下阻塞队列几乎都有被用于线程池的工作队列之中

    • ArrayBlockingQueue 有界,需要我们在构造方法中设置容量。
    • LinkedBlockingQueue 无界,容量为Integer.MAX_VALUE,因此可理解为无界。
    • SynchronousQueue , 仅提供于连接生产者与消费者的桥梁,其内部容量为0。
    • PriorityBlockingQueue 无界,支持设置队列的优先级,而非通过先进先出顺序。

    使用 ArrayBlockingQueue 的简单例子

    建立一个容量为3的 ArrayBlsockingQueue ,通过它模拟一个仅拥有3个位置的 “面试等待室”。面试官逐个面试,且10个面试者逐个进入。

    public class ArrayBlockingQueueTest {
        static BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
    
        static class Interviewer implements Runnable{
    
            @SneakyThrows
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    String t = "candidate" + i;
                    queue.put(t);
                    System.out.println(t + "开始等待面试");
                }
            }
        }
    
       static class Customer implements Runnable{
    
            @SneakyThrows
            @Override
            public void run() {
                while(!queue.isEmpty()){
                    Thread.sleep(1000);
                    String candidate = queue.take();
                    System.out.println(candidate + "面试完毕");
                }
            }
        }
    
        public static void main(String[] args) {
           new Thread(new Interviewer()).start();
           new Thread(new Customer()).start();
        }
    }
    

    非阻塞并发队列

    前述均是会阻塞的并发队列,但其实也有一个非阻塞队列 ConcurrentLinkedQueue。顾名思义,其采用链表作为数据结构。其后采用 CAS 来实现线程安全。

    它比较适用于性能要求很高的并发场景,但用到的相对较少。