Java并发工具类:CountDownLatch与CyclicBarrier
Java并发工具类:CountDownLatch
在Java的并发编程领域中,CountDownLatch
是一个非常有用的工具类,它允许一个或多个线程等待,直到其他一组线程完成一系列操作。从本质上来说,CountDownLatch
维护了一个计数器,这个计数器的初始值就是需要等待完成的操作数量。当每个操作完成时,计数器就会减1,当计数器减到0时,所有等待的线程就会被释放。
1. CountDownLatch
的基本使用
让我们通过一个简单的示例来理解CountDownLatch
的基本用法。假设我们有一个主线程,它需要等待3个子线程完成任务后才能继续执行。
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
public static void main(String[] args) {
// 创建一个CountDownLatch,初始计数值为3
CountDownLatch latch = new CountDownLatch(3);
// 创建并启动3个子线程
for (int i = 0; i < 3; i++) {
new Thread(new Worker(latch)).start();
}
try {
// 主线程等待,直到CountDownLatch的计数值为0
latch.await();
System.out.println("所有子线程已完成任务,主线程继续执行");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static class Worker implements Runnable {
private final CountDownLatch latch;
Worker(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
// 模拟子线程执行任务
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " 完成任务");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 子线程任务完成,将CountDownLatch的计数值减1
latch.countDown();
}
}
}
}
在上述代码中:
- 首先创建了一个
CountDownLatch
对象,其初始计数值为3,意味着需要等待3个子线程完成任务。 - 然后通过循环启动了3个子线程,每个子线程在完成任务后,调用
latch.countDown()
方法将计数器减1。 - 主线程调用
latch.await()
方法进入等待状态,直到计数器变为0,此时主线程才会继续执行。
2. CountDownLatch
的原理剖析
CountDownLatch
是基于AQS(AbstractQueuedSynchronizer)框架实现的。AQS是一个用于构建锁和同步器的框架,它内部维护了一个FIFO队列,用于存储等待的线程。
CountDownLatch
内部有一个静态内部类Sync
,它继承自AQS
。Sync
类中的state
变量用于表示CountDownLatch
的计数值。当调用CountDownLatch
的构造函数时,state
会被初始化为传入的计数值。
countDown()
方法:当调用countDown()
方法时,实际上是调用了Sync
类的releaseShared(int releases)
方法。这个方法会尝试将state
减1,如果state
减为0,就会唤醒所有等待在AQS队列中的线程。await()
方法:调用await()
方法时,会调用Sync
类的acquireSharedInterruptibly(int arg)
方法。该方法会检查state
是否为0,如果不为0,当前线程就会被加入到AQS队列中等待。当state
变为0时,等待在队列中的线程会被唤醒。
3. CountDownLatch
的应用场景
- 多线程任务汇总:例如在一个数据分析系统中,可能需要从多个数据源获取数据,每个数据源由一个线程负责读取。当所有数据源的数据都读取完成后,再进行数据的汇总和分析。这时就可以使用
CountDownLatch
来确保主线程在所有数据读取线程完成后再进行汇总操作。 - 并发性能测试:在进行并发性能测试时,希望所有的测试线程同时开始执行,以模拟真实的并发场景。可以使用一个
CountDownLatch
,初始计数值为测试线程的数量。每个测试线程在启动时调用await()
方法等待,主线程在准备好所有测试条件后,调用countDown()
方法释放所有线程,使它们同时开始执行。
Java并发工具类:CyclicBarrier
CyclicBarrier
也是Java并发包中的一个重要工具类,它允许一组线程互相等待,直到到达某个公共的屏障点(barrier point)。与CountDownLatch
不同的是,CyclicBarrier
可以被重复使用,即当所有线程到达屏障点后,屏障可以被重置,以便再次使用。
1. CyclicBarrier
的基本使用
以下是一个简单的示例,展示了CyclicBarrier
的基本用法。假设有5个线程,它们需要在某个点上进行同步,然后继续执行。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
public static void main(String[] args) {
// 创建一个CyclicBarrier,需要5个线程到达屏障点
CyclicBarrier barrier = new CyclicBarrier(5, () -> {
System.out.println("所有线程已到达屏障点,开始执行屏障动作");
});
// 创建并启动5个子线程
for (int i = 0; i < 5; i++) {
new Thread(new Worker(barrier)).start();
}
}
static class Worker implements Runnable {
private final CyclicBarrier barrier;
Worker(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
// 模拟子线程执行任务
Thread.sleep((long) (Math.random() * 2000));
System.out.println(Thread.currentThread().getName() + " 到达屏障点");
// 等待其他线程到达屏障点
barrier.await();
System.out.println(Thread.currentThread().getName() + " 继续执行");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
在上述代码中:
- 创建了一个
CyclicBarrier
对象,指定需要5个线程到达屏障点。同时,传入了一个Runnable任务,当所有线程到达屏障点时,会执行这个任务。 - 每个子线程在执行任务后,调用
barrier.await()
方法等待其他线程。当5个线程都调用了await()
方法后,会执行传入的Runnable任务,然后所有线程继续执行后续代码。
2. CyclicBarrier
的原理剖析
CyclicBarrier
内部主要包含一个Generation
对象和一个计数器。Generation
对象用于表示当前的屏障代,每次屏障被重置时,Generation
对象会更新。计数器用于记录当前还未到达屏障点的线程数量。
await()
方法:当一个线程调用await()
方法时,首先会将计数器减1。如果计数器的值变为0,说明所有线程都已到达屏障点,此时会执行传入的Runnable任务,并重置计数器和Generation
对象。如果计数器的值不为0,当前线程会被阻塞,直到其他线程调用await()
方法使计数器变为0。reset()
方法:调用reset()
方法会重置CyclicBarrier
,将计数器恢复到初始值,并更新Generation
对象。同时,会中断所有正在等待的线程,抛出BrokenBarrierException
异常。
3. CyclicBarrier
的应用场景
- 多阶段计算:在一些复杂的计算任务中,可能需要将任务分为多个阶段,每个阶段需要多个线程协同完成。例如在图像渲染中,可能需要多个线程分别处理图像的不同部分,在每个阶段完成后,所有线程需要同步,然后进入下一个阶段。
CyclicBarrier
就可以用于实现这种多阶段的同步。 - 并行迭代算法:在一些并行迭代算法中,每次迭代都需要所有线程完成当前迭代的计算后,才能开始下一次迭代。
CyclicBarrier
可以保证所有线程在每次迭代开始时都处于同步状态。
CountDownLatch与CyclicBarrier的比较
虽然CountDownLatch
和CyclicBarrier
都用于线程同步,但它们之间存在一些显著的区别:
- 可重用性:
CountDownLatch
是一次性的,一旦计数器减为0,就不能再次使用。而CyclicBarrier
可以被重复使用,每次所有线程到达屏障点后,屏障可以重置,以便下一轮使用。 - 同步方式:
CountDownLatch
是让一个或多个线程等待一组线程完成操作,是一种单向的同步。而CyclicBarrier
是让一组线程相互等待,到达某个公共的屏障点,是一种双向的同步。 - 应用场景:
CountDownLatch
更适合用于需要等待一组任务完成后再进行下一步操作的场景,例如任务汇总。而CyclicBarrier
更适合用于需要多线程反复协同完成任务的场景,例如多阶段计算。
实际案例深入分析
1. 使用CountDownLatch实现多线程数据采集与汇总
假设我们要从多个数据库表中采集数据,每个表由一个线程负责采集,采集完成后将数据汇总到一个结果集中。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class DataCollectionExample {
public static void main(String[] args) {
int tableCount = 3;
CountDownLatch latch = new CountDownLatch(tableCount);
List<String> result = new ArrayList<>();
for (int i = 0; i < tableCount; i++) {
new Thread(new DataCollector(latch, result, "Table" + (i + 1))).start();
}
try {
latch.await();
System.out.println("所有数据采集完成,汇总结果: " + result);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static class DataCollector implements Runnable {
private final CountDownLatch latch;
private final List<String> result;
private final String tableName;
DataCollector(CountDownLatch latch, List<String> result, String tableName) {
this.latch = latch;
this.result = result;
this.tableName = tableName;
}
@Override
public void run() {
try {
// 模拟数据采集
Thread.sleep((long) (Math.random() * 2000));
String data = "Data from " + tableName;
result.add(data);
System.out.println(Thread.currentThread().getName() + " 采集数据: " + data);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}
}
}
在这个例子中,CountDownLatch
确保了主线程在所有数据采集线程完成后才进行数据汇总的输出。
2. 使用CyclicBarrier实现多线程图像处理的多阶段同步
假设我们要对一张图片进行多阶段处理,每个阶段都需要多个线程协同工作。
import java.awt.image.BufferedImage;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class ImageProcessingExample {
public static void main(String[] args) {
int threadCount = 4;
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
System.out.println("所有线程完成当前阶段,进入下一阶段");
});
BufferedImage image = new BufferedImage(100, 100, BufferedImage.TYPE_INT_RGB);
for (int i = 0; i < threadCount; i++) {
new Thread(new ImageProcessor(barrier, image, i)).start();
}
}
static class ImageProcessor implements Runnable {
private final CyclicBarrier barrier;
private final BufferedImage image;
private final int threadIndex;
ImageProcessor(CyclicBarrier barrier, BufferedImage image, int threadIndex) {
this.barrier = barrier;
this.image = image;
this.threadIndex = threadIndex;
}
@Override
public void run() {
for (int phase = 0; phase < 3; phase++) {
try {
// 模拟图像处理
Thread.sleep((long) (Math.random() * 2000));
System.out.println(Thread.currentThread().getName() + " 完成第 " + (phase + 1) + " 阶段处理");
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
}
在这个例子中,CyclicBarrier
使得多个线程在每个图像处理阶段结束时进行同步,然后进入下一个阶段。
总结两者的使用注意事项
- CountDownLatch:
- 在使用
CountDownLatch
时,要确保countDown()
方法在合适的位置调用,否则可能导致等待的线程永远无法继续执行。 - 由于
CountDownLatch
是一次性的,所以在设计时要考虑清楚是否真的只需要一次性的同步。如果需要重复使用同步机制,CyclicBarrier
可能是更好的选择。
- 在使用
- CyclicBarrier:
CyclicBarrier
的await()
方法可能会抛出BrokenBarrierException
异常,在实际使用中需要妥善处理这个异常。例如,当某个线程在等待屏障时被中断,屏障会进入损坏状态,其他等待的线程就会抛出这个异常。- 在设置
CyclicBarrier
的初始计数值时,要确保这个值与实际参与同步的线程数量一致,否则可能会导致线程死锁或意外的行为。
通过深入理解CountDownLatch
和CyclicBarrier
的原理、用法及区别,开发者可以更有效地在Java并发编程中利用这两个工具类,实现复杂的线程同步需求,提高程序的性能和可靠性。无论是多线程任务汇总、并发性能测试,还是多阶段计算、并行迭代算法等场景,都能通过合理选择和使用这两个工具类来优化程序的设计和实现。