Java中Barrier的原理和应用场景
Java中Barrier的基本概念
在Java并发编程领域,Barrier(屏障)是一种强大的同步工具,它允许一组线程在某个点上相互等待,直到所有线程都到达该点,然后再继续执行。Barrier的核心作用在于协调多个线程的执行进度,确保它们在特定阶段能够同步进行。
Java提供了两种主要类型的Barrier实现,分别是CyclicBarrier
(循环屏障)和CountDownLatch
(倒计时门闩),它们虽然都用于线程同步,但在原理和应用场景上有一些显著的区别。
CyclicBarrier
CyclicBarrier
的设计初衷是为了满足这样一种需求:一组线程需要反复地在某个同步点上等待彼此。它可以被重置并再次使用,这就是“Cyclic”(循环)一词的由来。
CyclicBarrier
内部维护了一个计数器,其初始值为参与同步的线程数量。每当一个线程调用await()
方法时,计数器就会减1。当计数器的值变为0时,意味着所有线程都已到达屏障点,此时屏障被打破,所有等待的线程将被释放,继续执行后续代码。如果需要,CyclicBarrier
可以被重置并再次使用。
下面是一个简单的代码示例,展示了CyclicBarrier
的基本用法:
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
public static void main(String[] args) {
int parties = 3; // 线程数量
CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
System.out.println("所有线程都已到达屏障点,开始执行屏障动作");
});
for (int i = 0; i < parties; i++) {
new Thread(new Worker(barrier)).start();
}
}
static class Worker implements Runnable {
private final CyclicBarrier barrier;
Worker(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " 正在执行任务");
// 模拟一些工作
Thread.sleep((long) (Math.random() * 1000));
System.out.println(Thread.currentThread().getName() + " 已到达屏障点");
barrier.await();
System.out.println(Thread.currentThread().getName() + " 屏障已打破,继续执行后续任务");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
在上述代码中,我们创建了一个CyclicBarrier
实例,它需要3个线程(parties
为3)到达屏障点才能触发后续动作。每个Worker
线程在执行完自己的任务后,调用barrier.await()
方法等待其他线程。当所有3个线程都调用了await()
方法后,CyclicBarrier
的屏障被打破,所有线程将继续执行后续代码。并且,CyclicBarrier
的屏障动作(在构造函数中传入的Runnable)也会被执行。
CountDownLatch
CountDownLatch
是另一种类型的同步屏障,它的作用是允许一个或多个线程等待,直到其他一组线程完成一系列操作。与CyclicBarrier
不同,CountDownLatch
只能使用一次,一旦计数器归零,它就不能被重置。
CountDownLatch
内部也维护了一个计数器,初始值为需要等待的操作数量。每当一个线程完成其操作时,调用countDown()
方法将计数器减1。当计数器的值变为0时,所有等待在await()
方法上的线程将被释放。
以下是一个使用CountDownLatch
的示例代码:
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
public static void main(String[] args) {
int tasks = 5; // 需要完成的任务数量
CountDownLatch latch = new CountDownLatch(tasks);
for (int i = 0; i < tasks; i++) {
new Thread(new Worker(latch)).start();
}
try {
System.out.println("主线程等待所有任务完成");
latch.await();
System.out.println("所有任务已完成,主线程继续执行");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static class Worker implements Runnable {
private final CountDownLatch latch;
Worker(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " 正在执行任务");
// 模拟一些工作
Thread.sleep((long) (Math.random() * 1000));
System.out.println(Thread.currentThread().getName() + " 任务完成");
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
在这个示例中,主线程创建了一个CountDownLatch
实例,其初始计数器值为5,表示有5个任务需要完成。每个Worker
线程在完成自己的任务后,调用latch.countDown()
方法将计数器减1。主线程通过调用latch.await()
方法等待,直到计数器变为0,即所有任务都完成后,主线程才继续执行后续代码。
Barrier的原理剖析
CyclicBarrier的原理
CyclicBarrier
的实现基于ReentrantLock
和Condition
。它通过ReentrantLock
来保证对内部状态的线程安全访问,通过Condition
来实现线程的等待和唤醒。
CyclicBarrier
的核心属性包括:
parties
:表示需要同步的线程数量,即屏障的“方数”。count
:当前等待的线程数量,初始值等于parties
。generation
:用于标识当前的屏障代,每次屏障被打破并重置时,generation
会更新。
当一个线程调用await()
方法时,首先会获取ReentrantLock
,然后检查count
的值。如果count
大于1,说明还有其他线程未到达,当前线程会通过Condition
进入等待状态,并将count
减1。如果count
等于1,说明当前线程是最后一个到达的线程,此时会执行以下操作:
- 打破屏障,唤醒所有等待在
Condition
上的线程。 - 重置
count
为parties
,更新generation
。 - 执行在构造函数中传入的屏障动作(如果有)。
以下是简化后的CyclicBarrier
的await()
方法实现逻辑:
public int await() throws InterruptedException, BrokenBarrierException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // 最后一个线程到达
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// 等待其他线程到达
for (;;) {
try {
trip.await();
if (!g.equals(generation))
return index;
} catch (InterruptedException ie) {
if (g.equals(generation) &&!g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
}
} finally {
lock.unlock();
}
}
CountDownLatch的原理
CountDownLatch
的实现基于AQS
(AbstractQueuedSynchronizer)框架。AQS
是Java并发包中许多同步器的基础框架,它通过一个FIFO队列来管理等待获取同步状态的线程。
CountDownLatch
的核心属性是Sync
,它是AQS
的一个内部子类。Sync
类通过一个整数状态(state
)来表示计数器的值。初始时,state
的值等于需要等待的操作数量。
当一个线程调用countDown()
方法时,会尝试将state
减1。如果state
减为0,说明所有操作都已完成,此时会唤醒所有等待在AQS
队列中的线程。
当一个线程调用await()
方法时,会首先检查state
的值。如果state
为0,说明所有操作已完成,线程可以直接返回;否则,线程会被加入到AQS
队列中等待,直到state
变为0或者线程被中断。
以下是简化后的CountDownLatch
的countDown()
和await()
方法实现逻辑:
public void countDown() {
sync.releaseShared(1);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0)? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
Barrier的应用场景
CyclicBarrier的应用场景
- 并行计算:在并行计算中,通常需要多个线程协同完成一个任务。例如,在矩阵乘法运算中,可以将矩阵划分为多个子矩阵,每个线程负责计算一部分子矩阵的乘积。当所有线程完成自己负责的子矩阵计算后,需要等待所有线程都完成,然后再进行下一步的合并操作。这时就可以使用
CyclicBarrier
来同步这些线程。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class MatrixMultiplication {
private static final int MATRIX_SIZE = 100;
private static final int THREADS = 4;
private static final int[][] matrixA = new int[MATRIX_SIZE][MATRIX_SIZE];
private static final int[][] matrixB = new int[MATRIX_SIZE][MATRIX_SIZE];
private static final int[][] result = new int[MATRIX_SIZE][MATRIX_SIZE];
public static void main(String[] args) {
initializeMatrices();
CyclicBarrier barrier = new CyclicBarrier(THREADS, () -> {
// 合并结果的操作
for (int i = 0; i < MATRIX_SIZE; i++) {
for (int j = 0; j < MATRIX_SIZE; j++) {
int sum = 0;
for (int k = 0; k < MATRIX_SIZE; k++) {
sum += matrixA[i][k] * matrixB[k][j];
}
result[i][j] = sum;
}
}
});
for (int i = 0; i < THREADS; i++) {
new Thread(new MatrixWorker(barrier, i)).start();
}
}
static void initializeMatrices() {
for (int i = 0; i < MATRIX_SIZE; i++) {
for (int j = 0; j < MATRIX_SIZE; j++) {
matrixA[i][j] = (int) (Math.random() * 10);
matrixB[i][j] = (int) (Math.random() * 10);
}
}
}
static class MatrixWorker implements Runnable {
private final CyclicBarrier barrier;
private final int taskId;
MatrixWorker(CyclicBarrier barrier, int taskId) {
this.barrier = barrier;
this.taskId = taskId;
}
@Override
public void run() {
int startRow = taskId * (MATRIX_SIZE / THREADS);
int endRow = (taskId == THREADS - 1)? MATRIX_SIZE : (taskId + 1) * (MATRIX_SIZE / THREADS);
for (int i = startRow; i < endRow; i++) {
for (int j = 0; j < MATRIX_SIZE; j++) {
int sum = 0;
for (int k = 0; k < MATRIX_SIZE; k++) {
sum += matrixA[i][k] * matrixB[k][j];
}
result[i][j] = sum;
}
}
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
- 多阶段游戏或任务:在一些需要多阶段执行的游戏或任务中,每个阶段可能需要多个参与者同时完成自己的部分后才能进入下一阶段。例如,在一个多人在线策略游戏中,每个玩家在每个回合需要同时做出决策,当所有玩家都做出决策后,游戏进入下一回合。
CyclicBarrier
可以方便地实现这种同步机制。
CountDownLatch的应用场景
- 等待所有任务完成:这是
CountDownLatch
最常见的应用场景。例如,在一个Web爬虫应用中,可能需要同时启动多个线程去抓取不同网页的内容,主线程需要等待所有线程完成抓取任务后,再对抓取到的内容进行汇总和分析。
import java.util.concurrent.CountDownLatch;
public class WebCrawler {
public static void main(String[] args) {
String[] urls = {"http://example.com", "http://another-example.com", "http://third-example.com"};
int tasks = urls.length;
CountDownLatch latch = new CountDownLatch(tasks);
for (String url : urls) {
new Thread(new Crawler(latch, url)).start();
}
try {
latch.await();
System.out.println("所有网页已抓取完成,开始分析数据");
// 这里可以添加数据分析的代码
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static class Crawler implements Runnable {
private final CountDownLatch latch;
private final String url;
Crawler(CountDownLatch latch, String url) {
this.latch = latch;
this.url = url;
}
@Override
public void run() {
try {
System.out.println("开始抓取 " + url);
// 模拟网页抓取操作
Thread.sleep((long) (Math.random() * 2000));
System.out.println("完成抓取 " + url);
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
- 初始化资源:在应用启动时,可能需要初始化多个资源,例如数据库连接、文件系统挂载等。可以使用
CountDownLatch
来确保所有资源都初始化完成后,应用才正式开始运行。主线程可以等待所有初始化线程完成后,再启动应用的核心业务逻辑。
总结与对比
CyclicBarrier
和CountDownLatch
虽然都是Java中用于线程同步的屏障工具,但它们在原理和应用场景上有明显的区别。
CyclicBarrier
适用于需要反复同步的场景,它允许一组线程在某个点上等待彼此,并且可以被重置和再次使用。其实现基于ReentrantLock
和Condition
,通过内部的计数器和generation
来管理线程的同步和屏障的重置。
CountDownLatch
则主要用于一个或多个线程等待其他一组线程完成特定操作的场景,它只能使用一次,一旦计数器归零就不能重置。CountDownLatch
的实现基于AQS
框架,通过state
来表示计数器的值,并利用AQS
的队列来管理等待的线程。
在实际应用中,根据具体的需求选择合适的Barrier工具可以有效地提高并发编程的效率和正确性。如果需要多次同步线程,并且同步点是循环出现的,那么CyclicBarrier
是一个很好的选择;如果只需要等待一组任务完成一次,那么CountDownLatch
更为合适。同时,理解它们的原理有助于我们在复杂的并发场景中更好地使用和优化代码。
希望通过本文的介绍和示例,你对Java中Barrier的原理和应用场景有了更深入的理解,并能在实际项目中灵活运用这些同步工具。