并发容器
Vector和Hashtable早期也是为了并发而去开发的工具,但由于它们的并发性能太差以致于目前已经被淘汰。而造成性能差的主要原因在于,它们的底层所有方法都采用Synchronized所修饰。
我们都知道 ArrayList 和 HashMap 不是线程安全的,主要问题是因为在多线程同时写操作的时候,被添加的数据很容易被另外一个持有相同索引的线程写操作所覆盖,且 HashMap 在扩容的时候也将出现并发问题。
但可以使用 Collections.synchronizedList() 或 Collections.synchronizedMap() 将其变为线程安全的。但本质上这个工具类也只是把这两者的底层实现使用Synchronized修饰..
随后经过发展,出现了 ConcurrentHashMap 和 CopyOnWriteArrayList ,它们替代了上面的同步List和Map。在绝大多数并发情况下,其二者都要比前二者的性能更好。
唯一的例外是,Collections.synchronizedLIst() 所创建的List在写多读少的情况下优于CopyOnWriteArrayList, 这也就意味着它适合于读多写少的场景,而主要原因是因为 CopyOnWriteArrayList 在每次写入的时候都会重新复制一份列表。而 ConcurrentHashMap 则没有例外,它在任何场景下都优于通过 Collections.synchronizedMap() 创建的HashMap。
ConcurrentHashMap
ConcurrentHashMap 在put的时候主要使用了 CAS 进行赋值。其中结构与 HashMap 类似,默认为一个以链表为元素的哈希表,但当链表长度大于8的时候,该链表会被转换成红黑树。
而之所以阈值设置为8是因为作者使用泊松分布对其哈希算法进行概率计算,链表数量为8的情况下非常小,除非是哈希算法出现了问题。以致于我们几乎不可能碰到被转换为红黑树的场景,但为了应对这样可能的场景下的效率,所以做出了这样的设计。
ConcurrentHashMap 虽说是线程安全的,但也是仅能保证同时put的时候的线程安全,当我们的操作是一种组合操作的时候,也就失去了线程安全!
public class ConcurrentHashMapTest implements Runnable {
static ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
Integer score = map.get("score");
Integer newScore = score + 1; // 在此处可能别的线程就已经修改了“score”的值!
map.put("score", newScore);
}
}
}
本来我们认为,在上述场景中应该也是能够保证线程安全的,但忘记了 ConcurrentHashMap 并没有采用悲观锁的思路,而是使用 CAS 实现的乐观锁。因此它提供了一个 map.replace() 以应对上述场景,其思路与之前原子类中的 .compareAndSet() 类似。
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
while (true) {
Integer score = map.get("score");
Integer newScore = score + 1;
if (map.replace("score", score, newScore)) {
break;
}
}
}
}
同理,当碰到组合操作的时候,应该先看下该类有没有提供相对于的实现。这里就不详细阐述。
CopyOnWriteArrayList
显然,CopyOnWriteArrayList 是为了代替 Vecto r和 SynchronizedList 而出现的。而它的适用场景如前述所说,适合于读多而写少的环境。
究其原因是因为,每次 CopyOnWriteArrayList 进行写入操作的时候,都会将自己的数据去复制一份副本,而写入的操作在副本之上,当修改完毕后再将之前的指针指向副本。这也就意味着,读取和写入之间并不会造成冲突,也就是写入并不会阻塞读取操作。而只有在写入的时候会通过 ReentrantLock 进行加锁。
其次,它在读取的时候完全没有进行加锁,只是直接根据索引返回底层数组中的数据。
这些也就使他在保证写入时线程安全的同时,保证了读取的效率。
但这也意味着它存在着两个问题:
- 数据一致性问题: 虽然它能保证数据的最终一致性,但在读取之时进行写入,那么将意味着读取无法及时的获得最新的数据。
内存占用问题: 因为它在写入的时候采用了复制的策略,因此在每次写入的时候都将导致内存出现两个数组。
阻塞队列与非阻塞并发队列
阻塞队列是指拥有阻塞功能的队列。通常阻塞队列的一端是给生产者放数据用,另一端是给消费者拿数据用,由于阻塞队列有阻塞功能,因此生产者和消费者可以多线程进行任务。
下面将叙述的阻塞队列均继承于 BlockingQueue 接口,因此阻塞队列的方法统一,且可以分为三类,
- put() 、 take(),使用 put() 和 take() 加入(拿去)的时候队列已经满了,那么就会阻塞住。
- add() 、 remove(),使用 add() 和 remove() 加入(删除)的时候队列已经满了,那么就会抛出异常。而使用 element() 会返回队列首个元素,如果为空,也将抛出异常。
- offer()、poll() 取出删除、peek() 取出不删除,执行他们会返回一个是否成功的布尔值。
常用阻塞队列
可以发现,以下阻塞队列几乎都有被用于线程池的工作队列之中
- ArrayBlockingQueue 有界,需要我们在构造方法中设置容量。
- LinkedBlockingQueue 无界,容量为Integer.MAX_VALUE,因此可理解为无界。
- SynchronousQueue , 仅提供于连接生产者与消费者的桥梁,其内部容量为0。
- PriorityBlockingQueue 无界,支持设置队列的优先级,而非通过先进先出顺序。
使用 ArrayBlockingQueue 的简单例子
建立一个容量为3的 ArrayBlsockingQueue ,通过它模拟一个仅拥有3个位置的 “面试等待室”。面试官逐个面试,且10个面试者逐个进入。
public class ArrayBlockingQueueTest {
static BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
static class Interviewer implements Runnable{
@SneakyThrows
@Override
public void run() {
for (int i = 0; i < 10; i++) {
String t = "candidate" + i;
queue.put(t);
System.out.println(t + "开始等待面试");
}
}
}
static class Customer implements Runnable{
@SneakyThrows
@Override
public void run() {
while(!queue.isEmpty()){
Thread.sleep(1000);
String candidate = queue.take();
System.out.println(candidate + "面试完毕");
}
}
}
public static void main(String[] args) {
new Thread(new Interviewer()).start();
new Thread(new Customer()).start();
}
}
非阻塞并发队列
前述均是会阻塞的并发队列,但其实也有一个非阻塞队列 ConcurrentLinkedQueue。顾名思义,其采用链表作为数据结构。其后采用 CAS 来实现线程安全。
它比较适用于性能要求很高的并发场景,但用到的相对较少。
本页的评论功能已关闭