控制并发流程
控制并发流程的工具类,主要作用就是帮助程序员更容易地去让线程之间相互配合,来满足业务需要。比如让线程A等待线程B执行完毕后再执行。
CountDownLatch 倒数门闩
通过其名称可以直译为“倒数门闩”,本质意义为设立一个数字,当某个线程的任务执行完毕的时候将其减一,等待数字为0的时候开始工作。因此可以推广到,一个线程等待多个线程结束之后它才可以进行工作。亦可多等一,思路均一致,亦可多个 CountDownLatch 同时使用。
主要方法:
- new CountDOwnLatch(int count),count 为要倒数的数值。
- await(), 调用后开始等待 count 为 0。
- countDown(), 将 count 减 1。
拼团线程
static class GroupBooking implements Runnable {
@SneakyThrows
@Override
public void run() {
System.out.println("等待五人拼团..");
latch.await();
System.out.println("拼团成功");
}
}
主方法与“加入拼团”
public static void main(String[] args) {
new Thread(new GroupBooking()).start();
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
int I = i + 1;
executorService.submit(() -> {
try {
Thread.sleep(new Random().nextInt(5_000));
System.out.println("顾客" + I + "加入拼团");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown(); // 将数字减一
}
});
}
executorService.shutdown(); // shutdown 会等待线程任务全部结束
}
Semaphore 信号量
Semaphore 主要设计了一个“有限许可证”发放机制,主要为了面临有限资源时各个线程根据是否持有许可证进行任务。线程只有拿到 Semaphore 发放的许可证的时候,才准予继续执行,否则进行等待。当许可证数量为 0 的时候,则全部线程都需要等待。
我觉得这东西其实就是一个阻塞队列,不过它更便于使用,以及加入了一些机制。
主要使用流程:
- 初始化 Semaphore 并指定许可证数量,且可以设计是否公平(同公平锁一般)。
- 通过 .acquire() 或 .acquireUninterruptibly() 方法获取许可证, 显然如果使用.acquireUninterruptibly() 意味着在等待期间可被打断。另外同 lock 加锁一般,它还带有 .tryAcquire() 方法。
- 通过 .release() 手动释放许可证,Semaphore 并没有设计自动释放机制,因此需要自己手动释放。
关于公平性值得注意的是,根据 Semaphore 的使用场景来看,一般都是针对极其消耗资源的任务进行控制,必须要根据其业务场景而决定究竟选择是否公平。
// 设置 3 个许可证,且其公平性为true
static Semaphore semaphore = new Semaphore(3, true);
static class Task implements Runnable{
@Override
public void run() {
try {
semaphore.acquire(1); // 默认为获取一个许可证,但亦可自定数量。
System.out.println(Thread.currentThread().getName() + "获取到许可证,正在进行任务");
Thread.sleep(new Random().nextInt(10_000));
} catch (InterruptedException e) {
e.printStackTrace();
} finally{
semaphore.release();
System.out.println(Thread.currentThread().getName() + "任务结束,许可证已释放");
}
}
}
Condition接口 (条件对象)
总而言之,condition 直译为条件,意味着某个线程只有满足某个条件,那么它才会被唤醒。而condition 依附于锁来获得,且一把锁可以获得多个 condition。
如果说 Lock 是用来代替 synchronized, 那么 Condition 就是来代替Object.wait / notify 的,所以在用法和性质上,两者一致。
主要方法:
- lock.newCondition(),创建条件对象。
- await(),阻塞当前线程并释放锁,等待其他线程执行 .signal() 或 .signalAll() 去唤醒,即条件满足。
- signal()、signalAll(),唤醒被当前条件所阻塞的线程
static ReentrantLock lock = new ReentrantLock();
static Condition condition = lock.newCondition();
static void f1(){
lock.lock();
try {
System.out.println("必要条件不满足,开始等待");
condition.await();
System.out.println("必要条件已满足,开始执行");
} catch (InterruptedException e) {
e.printStackTrace();
} finally{
lock.unlock();
}
}
static void f2(){
lock.lock();
try {
Thread.sleep(1000);
condition.signal();
System.out.println("执行必要条件");
} catch (InterruptedException e) {
e.printStackTrace();
} finally{
lock.unlock();
}
}
使用 Condition 设计生产者与消费者模型
简而言之,一共两个条件不满足的时候需要生产者和消费者停止工作。其一为队列已满,那么生产者便不能继续生产;其二为队列为空,那么消费者便不能继续消费。
那么,便创建两个条件,其一为只有满足 notFull 才能生产,其二为只有满足 notNull 才能消费。
static Condition notFull = lock.newCondition();
static Condition notNull = lock.newCondition();
而其后逻辑为,当成功消费一次,那么需要去唤醒 notFull;而成功生产一次,则需要唤醒 notNull。
static class Customer extends Thread {
@Override
public void run() {
lock.lock();
try {
while (true) {
if (queue.size() == 0) {
notNull.await();
}
queue.poll();
System.out.println("已消费 * 1,剩余产品" + queue.size());
notFull.signal();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
static class Producer extends Thread {
@Override
public void run() {
lock.lock();
try {
while (true) {
if (queue.size() == QUEUE_SIZE) {
notFull.await();
}
queue.add(1);
System.out.println("已生产 * 1,剩余容量" + (QUEUE_SIZE - queue.size()));
notNull.signal();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
CyclicBarrier 循环栅栏
同表意,有两个意思,一是循环,二是栅栏。主要使用场景在于多个线程统一进行汇总,且可以循环使用。
假设有三个田径运动员需要从操场入口,汇总到跑到起跑位置,那么起跑处就可有理解为一道栅栏,只有他们三个一起达到该处的时候才允许开始跑步,接下来每圈到起跑处也是需要等待其他人。
且,等到他们到达起跑处的时候,可以执行某个事件。
public class CyclicBarrierTest {
static Integer currentLapNumber = 0; // 当前处于第几圈
@AllArgsConstructor
static class Running implements Runnable {
private int id;
private CyclicBarrier cyclicBarrier;
@SneakyThrows
@Override
public void run() {
System.out.println("id" + id + "已到达操场");
cyclicBarrier.await();
for (int i = 1; i <= 3; i++) {
System.out.println("id" + id + "已跑完第" + currentLapNumber + "圈");
cyclicBarrier.await();
}
}
}
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() {
@Override
public void run() {
if (currentLapNumber == 0) {
System.out.println("全体运动员已达到操场,准备跑步");
} else {
System.out.println("全体运动员已跑完第" + currentLapNumber + "圈");
}
currentLapNumber++;
}
});
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 1; i <= 3; i++) {
executorService.submit(new Running(i, cyclicBarrier));
}
executorService.shutdown();
}
}
本页的评论功能已关闭