取法其上,得乎其中

Java 并发控制流程:CountDownLatch / Semaphore / Condition/ CyclicBarrier

控制并发流程

控制并发流程的工具类,主要作用就是帮助程序员更容易地去让线程之间相互配合,来满足业务需要。比如让线程A等待线程B执行完毕后再执行。

CountDownLatch 倒数门闩

通过其名称可以直译为“倒数门闩”,本质意义为设立一个数字,当某个线程的任务执行完毕的时候将其减一,等待数字为0的时候开始工作。因此可以推广到,一个线程等待多个线程结束之后它才可以进行工作。亦可多等一,思路均一致,亦可多个 CountDownLatch 同时使用。

主要方法:

  • new CountDOwnLatch(int count),count 为要倒数的数值。
  • await(), 调用后开始等待 count 为 0。
  • countDown(), 将 count 减 1。

拼团线程

static class GroupBooking implements Runnable {
    @SneakyThrows
    @Override
    public void run() {
        System.out.println("等待五人拼团..");
        latch.await();
        System.out.println("拼团成功");
    }
}

主方法与“加入拼团”

public static void main(String[] args) {
    new Thread(new GroupBooking()).start();
    ExecutorService executorService = Executors.newFixedThreadPool(5);
    for (int i = 0; i < 5; i++) {
        int I = i + 1;
        executorService.submit(() -> {
            try {
                Thread.sleep(new Random().nextInt(5_000));
                System.out.println("顾客" + I + "加入拼团");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                latch.countDown(); // 将数字减一
            }
        });
    }
    executorService.shutdown(); // shutdown 会等待线程任务全部结束
}

Semaphore 信号量

Semaphore 主要设计了一个“有限许可证”发放机制,主要为了面临有限资源时各个线程根据是否持有许可证进行任务。线程只有拿到 Semaphore 发放的许可证的时候,才准予继续执行,否则进行等待。当许可证数量为 0 的时候,则全部线程都需要等待。

我觉得这东西其实就是一个阻塞队列,不过它更便于使用,以及加入了一些机制。

主要使用流程:

  1. 初始化 Semaphore 并指定许可证数量,且可以设计是否公平(同公平锁一般)。
  2. 通过 .acquire() 或 .acquireUninterruptibly() 方法获取许可证, 显然如果使用.acquireUninterruptibly() 意味着在等待期间可被打断。另外同 lock 加锁一般,它还带有 .tryAcquire() 方法。
  3. 通过 .release() 手动释放许可证,Semaphore 并没有设计自动释放机制,因此需要自己手动释放。

关于公平性值得注意的是,根据 Semaphore 的使用场景来看,一般都是针对极其消耗资源的任务进行控制,必须要根据其业务场景而决定究竟选择是否公平。


// 设置 3 个许可证,且其公平性为true
static Semaphore semaphore = new Semaphore(3, true);

static class Task implements Runnable{
    @Override
    public void run() {
        try {
            semaphore.acquire(1); // 默认为获取一个许可证,但亦可自定数量。
            System.out.println(Thread.currentThread().getName() + "获取到许可证,正在进行任务");
            Thread.sleep(new Random().nextInt(10_000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally{
            semaphore.release();
            System.out.println(Thread.currentThread().getName() + "任务结束,许可证已释放");
        }
    }
}

Condition接口 (条件对象)

总而言之,condition 直译为条件,意味着某个线程只有满足某个条件,那么它才会被唤醒。而condition 依附于锁来获得,且一把锁可以获得多个 condition。

如果说 Lock 是用来代替 synchronized, 那么 Condition 就是来代替Object.wait / notify 的,所以在用法和性质上,两者一致。

主要方法:

  • lock.newCondition(),创建条件对象。
  • await(),阻塞当前线程并释放锁,等待其他线程执行 .signal() 或 .signalAll() 去唤醒,即条件满足。
  • signal()、signalAll(),唤醒被当前条件所阻塞的线程
static ReentrantLock lock = new ReentrantLock();
static Condition condition = lock.newCondition();

static void f1(){
    lock.lock();
    try {
        System.out.println("必要条件不满足,开始等待");
        condition.await();
        System.out.println("必要条件已满足,开始执行");
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally{
        lock.unlock();
    }
}

static void f2(){
    lock.lock();
    try {
        Thread.sleep(1000);
        condition.signal();
        System.out.println("执行必要条件");
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally{
        lock.unlock();
    }
}

使用 Condition 设计生产者与消费者模型

简而言之,一共两个条件不满足的时候需要生产者和消费者停止工作。其一为队列已满,那么生产者便不能继续生产;其二为队列为空,那么消费者便不能继续消费。

那么,便创建两个条件,其一为只有满足 notFull 才能生产,其二为只有满足 notNull 才能消费。

static Condition notFull = lock.newCondition();
static Condition notNull = lock.newCondition();

而其后逻辑为,当成功消费一次,那么需要去唤醒 notFull;而成功生产一次,则需要唤醒 notNull。

static class Customer extends Thread {
    @Override
    public void run() {
        lock.lock();
        try {
            while (true) {
                if (queue.size() == 0) {
                    notNull.await();
                }
                queue.poll();
                System.out.println("已消费 * 1,剩余产品" + queue.size());
                notFull.signal();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

static class Producer extends Thread {
    @Override
    public void run() {
        lock.lock();
        try {
            while (true) {
                if (queue.size() == QUEUE_SIZE) {
                    notFull.await();
                }
                queue.add(1);
                System.out.println("已生产 * 1,剩余容量" + (QUEUE_SIZE - queue.size()));
                notNull.signal();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

}

CyclicBarrier 循环栅栏

同表意,有两个意思,一是循环,二是栅栏。主要使用场景在于多个线程统一进行汇总,且可以循环使用。

假设有三个田径运动员需要从操场入口,汇总到跑到起跑位置,那么起跑处就可有理解为一道栅栏,只有他们三个一起达到该处的时候才允许开始跑步,接下来每圈到起跑处也是需要等待其他人。

且,等到他们到达起跑处的时候,可以执行某个事件。

public class CyclicBarrierTest {
    static Integer currentLapNumber = 0; // 当前处于第几圈

    @AllArgsConstructor
    static class Running implements Runnable {
        private int id;
        private CyclicBarrier cyclicBarrier;

        @SneakyThrows
        @Override
        public void run() {
            System.out.println("id" + id + "已到达操场");
            cyclicBarrier.await();
            for (int i = 1; i <= 3; i++) {
                System.out.println("id" + id + "已跑完第" + currentLapNumber + "圈");
                cyclicBarrier.await();
            }
        }
    }

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() {
            @Override
            public void run() {
                if (currentLapNumber == 0) {
                    System.out.println("全体运动员已达到操场,准备跑步");
                } else {
                    System.out.println("全体运动员已跑完第" + currentLapNumber + "圈");
                }
                currentLapNumber++;
            }
        });

        ExecutorService executorService = Executors.newFixedThreadPool(3);
        for (int i = 1; i <= 3; i++) {
            executorService.submit(new Running(i, cyclicBarrier));
        }
        executorService.shutdown();
    }
}
Java 并发控制流程:CountDownLatch / Semaphore / Condition/ CyclicBarrier

https://ku-m.cn/index.php/archives/598/

作者

KuM

发布时间

2022-07-18

许可协议

CC BY 4.0

本页的评论功能已关闭