MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java中Barrier的原理和应用场景

2023-12-193.5k 阅读

Java中Barrier的基本概念

在Java并发编程领域,Barrier(屏障)是一种强大的同步工具,它允许一组线程在某个点上相互等待,直到所有线程都到达该点,然后再继续执行。Barrier的核心作用在于协调多个线程的执行进度,确保它们在特定阶段能够同步进行。

Java提供了两种主要类型的Barrier实现,分别是CyclicBarrier(循环屏障)和CountDownLatch(倒计时门闩),它们虽然都用于线程同步,但在原理和应用场景上有一些显著的区别。

CyclicBarrier

CyclicBarrier的设计初衷是为了满足这样一种需求:一组线程需要反复地在某个同步点上等待彼此。它可以被重置并再次使用,这就是“Cyclic”(循环)一词的由来。

CyclicBarrier内部维护了一个计数器,其初始值为参与同步的线程数量。每当一个线程调用await()方法时,计数器就会减1。当计数器的值变为0时,意味着所有线程都已到达屏障点,此时屏障被打破,所有等待的线程将被释放,继续执行后续代码。如果需要,CyclicBarrier可以被重置并再次使用。

下面是一个简单的代码示例,展示了CyclicBarrier的基本用法:

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample {
    public static void main(String[] args) {
        int parties = 3; // 线程数量
        CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
            System.out.println("所有线程都已到达屏障点,开始执行屏障动作");
        });

        for (int i = 0; i < parties; i++) {
            new Thread(new Worker(barrier)).start();
        }
    }

    static class Worker implements Runnable {
        private final CyclicBarrier barrier;

        Worker(CyclicBarrier barrier) {
            this.barrier = barrier;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " 正在执行任务");
                // 模拟一些工作
                Thread.sleep((long) (Math.random() * 1000));
                System.out.println(Thread.currentThread().getName() + " 已到达屏障点");
                barrier.await();
                System.out.println(Thread.currentThread().getName() + " 屏障已打破,继续执行后续任务");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}

在上述代码中,我们创建了一个CyclicBarrier实例,它需要3个线程(parties为3)到达屏障点才能触发后续动作。每个Worker线程在执行完自己的任务后,调用barrier.await()方法等待其他线程。当所有3个线程都调用了await()方法后,CyclicBarrier的屏障被打破,所有线程将继续执行后续代码。并且,CyclicBarrier的屏障动作(在构造函数中传入的Runnable)也会被执行。

CountDownLatch

CountDownLatch是另一种类型的同步屏障,它的作用是允许一个或多个线程等待,直到其他一组线程完成一系列操作。与CyclicBarrier不同,CountDownLatch只能使用一次,一旦计数器归零,它就不能被重置。

CountDownLatch内部也维护了一个计数器,初始值为需要等待的操作数量。每当一个线程完成其操作时,调用countDown()方法将计数器减1。当计数器的值变为0时,所有等待在await()方法上的线程将被释放。

以下是一个使用CountDownLatch的示例代码:

import java.util.concurrent.CountDownLatch;

public class CountDownLatchExample {
    public static void main(String[] args) {
        int tasks = 5; // 需要完成的任务数量
        CountDownLatch latch = new CountDownLatch(tasks);

        for (int i = 0; i < tasks; i++) {
            new Thread(new Worker(latch)).start();
        }

        try {
            System.out.println("主线程等待所有任务完成");
            latch.await();
            System.out.println("所有任务已完成,主线程继续执行");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static class Worker implements Runnable {
        private final CountDownLatch latch;

        Worker(CountDownLatch latch) {
            this.latch = latch;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " 正在执行任务");
                // 模拟一些工作
                Thread.sleep((long) (Math.random() * 1000));
                System.out.println(Thread.currentThread().getName() + " 任务完成");
                latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

在这个示例中,主线程创建了一个CountDownLatch实例,其初始计数器值为5,表示有5个任务需要完成。每个Worker线程在完成自己的任务后,调用latch.countDown()方法将计数器减1。主线程通过调用latch.await()方法等待,直到计数器变为0,即所有任务都完成后,主线程才继续执行后续代码。

Barrier的原理剖析

CyclicBarrier的原理

CyclicBarrier的实现基于ReentrantLockCondition。它通过ReentrantLock来保证对内部状态的线程安全访问,通过Condition来实现线程的等待和唤醒。

CyclicBarrier的核心属性包括:

  1. parties:表示需要同步的线程数量,即屏障的“方数”。
  2. count:当前等待的线程数量,初始值等于parties
  3. generation:用于标识当前的屏障代,每次屏障被打破并重置时,generation会更新。

当一个线程调用await()方法时,首先会获取ReentrantLock,然后检查count的值。如果count大于1,说明还有其他线程未到达,当前线程会通过Condition进入等待状态,并将count减1。如果count等于1,说明当前线程是最后一个到达的线程,此时会执行以下操作:

  1. 打破屏障,唤醒所有等待在Condition上的线程。
  2. 重置countparties,更新generation
  3. 执行在构造函数中传入的屏障动作(如果有)。

以下是简化后的CyclicBarrierawait()方法实现逻辑:

public int await() throws InterruptedException, BrokenBarrierException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Generation g = generation;

        if (g.broken)
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        int index = --count;
        if (index == 0) {  // 最后一个线程到达
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 等待其他线程到达
        for (;;) {
            try {
                trip.await();
                if (!g.equals(generation))
                    return index;
            } catch (InterruptedException ie) {
                if (g.equals(generation) &&!g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }
        }
    } finally {
        lock.unlock();
    }
}

CountDownLatch的原理

CountDownLatch的实现基于AQS(AbstractQueuedSynchronizer)框架。AQS是Java并发包中许多同步器的基础框架,它通过一个FIFO队列来管理等待获取同步状态的线程。

CountDownLatch的核心属性是Sync,它是AQS的一个内部子类。Sync类通过一个整数状态(state)来表示计数器的值。初始时,state的值等于需要等待的操作数量。

当一个线程调用countDown()方法时,会尝试将state减1。如果state减为0,说明所有操作都已完成,此时会唤醒所有等待在AQS队列中的线程。

当一个线程调用await()方法时,会首先检查state的值。如果state为0,说明所有操作已完成,线程可以直接返回;否则,线程会被加入到AQS队列中等待,直到state变为0或者线程被中断。

以下是简化后的CountDownLatchcountDown()await()方法实现逻辑:

public void countDown() {
    sync.releaseShared(1);
}

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0)? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c - 1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

Barrier的应用场景

CyclicBarrier的应用场景

  1. 并行计算:在并行计算中,通常需要多个线程协同完成一个任务。例如,在矩阵乘法运算中,可以将矩阵划分为多个子矩阵,每个线程负责计算一部分子矩阵的乘积。当所有线程完成自己负责的子矩阵计算后,需要等待所有线程都完成,然后再进行下一步的合并操作。这时就可以使用CyclicBarrier来同步这些线程。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class MatrixMultiplication {
    private static final int MATRIX_SIZE = 100;
    private static final int THREADS = 4;
    private static final int[][] matrixA = new int[MATRIX_SIZE][MATRIX_SIZE];
    private static final int[][] matrixB = new int[MATRIX_SIZE][MATRIX_SIZE];
    private static final int[][] result = new int[MATRIX_SIZE][MATRIX_SIZE];

    public static void main(String[] args) {
        initializeMatrices();

        CyclicBarrier barrier = new CyclicBarrier(THREADS, () -> {
            // 合并结果的操作
            for (int i = 0; i < MATRIX_SIZE; i++) {
                for (int j = 0; j < MATRIX_SIZE; j++) {
                    int sum = 0;
                    for (int k = 0; k < MATRIX_SIZE; k++) {
                        sum += matrixA[i][k] * matrixB[k][j];
                    }
                    result[i][j] = sum;
                }
            }
        });

        for (int i = 0; i < THREADS; i++) {
            new Thread(new MatrixWorker(barrier, i)).start();
        }
    }

    static void initializeMatrices() {
        for (int i = 0; i < MATRIX_SIZE; i++) {
            for (int j = 0; j < MATRIX_SIZE; j++) {
                matrixA[i][j] = (int) (Math.random() * 10);
                matrixB[i][j] = (int) (Math.random() * 10);
            }
        }
    }

    static class MatrixWorker implements Runnable {
        private final CyclicBarrier barrier;
        private final int taskId;

        MatrixWorker(CyclicBarrier barrier, int taskId) {
            this.barrier = barrier;
            this.taskId = taskId;
        }

        @Override
        public void run() {
            int startRow = taskId * (MATRIX_SIZE / THREADS);
            int endRow = (taskId == THREADS - 1)? MATRIX_SIZE : (taskId + 1) * (MATRIX_SIZE / THREADS);

            for (int i = startRow; i < endRow; i++) {
                for (int j = 0; j < MATRIX_SIZE; j++) {
                    int sum = 0;
                    for (int k = 0; k < MATRIX_SIZE; k++) {
                        sum += matrixA[i][k] * matrixB[k][j];
                    }
                    result[i][j] = sum;
                }
            }

            try {
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}
  1. 多阶段游戏或任务:在一些需要多阶段执行的游戏或任务中,每个阶段可能需要多个参与者同时完成自己的部分后才能进入下一阶段。例如,在一个多人在线策略游戏中,每个玩家在每个回合需要同时做出决策,当所有玩家都做出决策后,游戏进入下一回合。CyclicBarrier可以方便地实现这种同步机制。

CountDownLatch的应用场景

  1. 等待所有任务完成:这是CountDownLatch最常见的应用场景。例如,在一个Web爬虫应用中,可能需要同时启动多个线程去抓取不同网页的内容,主线程需要等待所有线程完成抓取任务后,再对抓取到的内容进行汇总和分析。
import java.util.concurrent.CountDownLatch;

public class WebCrawler {
    public static void main(String[] args) {
        String[] urls = {"http://example.com", "http://another-example.com", "http://third-example.com"};
        int tasks = urls.length;
        CountDownLatch latch = new CountDownLatch(tasks);

        for (String url : urls) {
            new Thread(new Crawler(latch, url)).start();
        }

        try {
            latch.await();
            System.out.println("所有网页已抓取完成,开始分析数据");
            // 这里可以添加数据分析的代码
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static class Crawler implements Runnable {
        private final CountDownLatch latch;
        private final String url;

        Crawler(CountDownLatch latch, String url) {
            this.latch = latch;
            this.url = url;
        }

        @Override
        public void run() {
            try {
                System.out.println("开始抓取 " + url);
                // 模拟网页抓取操作
                Thread.sleep((long) (Math.random() * 2000));
                System.out.println("完成抓取 " + url);
                latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
  1. 初始化资源:在应用启动时,可能需要初始化多个资源,例如数据库连接、文件系统挂载等。可以使用CountDownLatch来确保所有资源都初始化完成后,应用才正式开始运行。主线程可以等待所有初始化线程完成后,再启动应用的核心业务逻辑。

总结与对比

CyclicBarrierCountDownLatch虽然都是Java中用于线程同步的屏障工具,但它们在原理和应用场景上有明显的区别。

CyclicBarrier适用于需要反复同步的场景,它允许一组线程在某个点上等待彼此,并且可以被重置和再次使用。其实现基于ReentrantLockCondition,通过内部的计数器和generation来管理线程的同步和屏障的重置。

CountDownLatch则主要用于一个或多个线程等待其他一组线程完成特定操作的场景,它只能使用一次,一旦计数器归零就不能重置。CountDownLatch的实现基于AQS框架,通过state来表示计数器的值,并利用AQS的队列来管理等待的线程。

在实际应用中,根据具体的需求选择合适的Barrier工具可以有效地提高并发编程的效率和正确性。如果需要多次同步线程,并且同步点是循环出现的,那么CyclicBarrier是一个很好的选择;如果只需要等待一组任务完成一次,那么CountDownLatch更为合适。同时,理解它们的原理有助于我们在复杂的并发场景中更好地使用和优化代码。

希望通过本文的介绍和示例,你对Java中Barrier的原理和应用场景有了更深入的理解,并能在实际项目中灵活运用这些同步工具。