MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java CountDownLatch的使用场景

2022-11-095.3k 阅读

Java CountDownLatch 基础概念

在Java并发编程领域中,CountDownLatch是一个非常有用的工具类,它位于java.util.concurrent包下。CountDownLatch允许一个或多个线程等待,直到其他一组线程完成一系列操作。简单来说,它就像是一个倒计时器,初始化时设置一个计数,当每个相关线程完成自己的任务后,计数递减,当计数减为0时,等待的线程就会被释放继续执行。

CountDownLatch主要有两个关键的方法:

  • await():调用该方法的线程会被阻塞,直到CountDownLatch的计数为0。
  • countDown():将CountDownLatch的计数减1。

下面通过一个简单的示例代码来初步了解CountDownLatch的基本使用:

import java.util.concurrent.CountDownLatch;

public class CountDownLatchExample {
    public static void main(String[] args) {
        // 创建一个CountDownLatch,初始计数为3
        CountDownLatch latch = new CountDownLatch(3);

        // 创建并启动三个线程
        new Thread(new Worker(latch, "Worker1")).start();
        new Thread(new Worker(latch, "Worker2")).start();
        new Thread(new Worker(latch, "Worker3")).start();

        try {
            // 主线程等待,直到latch的计数为0
            latch.await();
            System.out.println("所有工作线程已完成任务,主线程继续执行");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static class Worker implements Runnable {
        private final CountDownLatch latch;
        private final String name;

        public Worker(CountDownLatch latch, String name) {
            this.latch = latch;
            this.name = name;
        }

        @Override
        public void run() {
            try {
                System.out.println(name + " 开始工作");
                // 模拟工作
                Thread.sleep(2000);
                System.out.println(name + " 完成工作");
                // 工作完成,计数减1
                latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

在上述代码中,CountDownLatch的初始计数为3,代表有三个任务需要完成。每个Worker线程在完成自己的任务后,调用countDown()方法将计数减1。主线程调用await()方法等待,直到计数变为0,此时主线程继续执行后续的操作。

应用场景之多线程任务协同

多个子线程完成任务后主线程汇总结果

在实际的开发中,经常会遇到这样的场景:需要并行执行多个子任务,然后在所有子任务完成后,主线程对这些子任务的结果进行汇总处理。例如,在一个数据分析的应用中,需要从多个数据源并行读取数据,然后在所有数据读取完成后进行统一的分析。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class DataAnalysisExample {
    public static void main(String[] args) {
        int dataSourceCount = 3;
        CountDownLatch latch = new CountDownLatch(dataSourceCount);

        List<Integer> dataList = new ArrayList<>();

        // 创建并启动多个数据读取线程
        for (int i = 0; i < dataSourceCount; i++) {
            new Thread(new DataReader(latch, dataList, i)).start();
        }

        try {
            // 主线程等待所有数据读取完成
            latch.await();
            System.out.println("所有数据已读取完毕,开始分析");
            // 进行数据分析
            int sum = 0;
            for (Integer data : dataList) {
                sum += data;
            }
            System.out.println("数据分析结果:数据总和为 " + sum);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static class DataReader implements Runnable {
        private final CountDownLatch latch;
        private final List<Integer> dataList;
        private final int dataSourceIndex;

        public DataReader(CountDownLatch latch, List<Integer> dataList, int dataSourceIndex) {
            this.latch = latch;
            this.dataList = dataList;
            this.dataSourceIndex = dataSourceIndex;
        }

        @Override
        public void run() {
            try {
                // 模拟从数据源读取数据
                int data = (dataSourceIndex + 1) * 10;
                System.out.println("从数据源 " + dataSourceIndex + " 读取到数据:" + data);
                dataList.add(data);
                // 数据读取完成,计数减1
                latch.countDown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

在上述代码中,DataReader线程模拟从不同数据源读取数据,每个线程读取完数据后将数据添加到dataList中,并调用countDown()方法。主线程在所有数据读取线程完成后,对dataList中的数据进行汇总分析。

子线程间任务的有序协同

CountDownLatch还可以用于实现子线程之间任务的有序协同。比如在一个复杂的游戏开发场景中,可能有多个游戏元素需要初始化,并且某些元素的初始化依赖于其他元素的初始化完成。

import java.util.concurrent.CountDownLatch;

public class GameElementInitializationExample {
    public static void main(String[] args) {
        // 初始化两个CountDownLatch
        CountDownLatch element1Latch = new CountDownLatch(1);
        CountDownLatch element2Latch = new CountDownLatch(1);

        // 创建并启动游戏元素1初始化线程
        new Thread(new GameElementInitializer(element1Latch, "游戏元素1")).start();

        // 创建并启动游戏元素2初始化线程,该线程依赖游戏元素1初始化完成
        new Thread(new GameElementInitializer(element2Latch, "游戏元素2", element1Latch)).start();

        try {
            // 主线程等待所有游戏元素初始化完成
            element1Latch.await();
            element2Latch.await();
            System.out.println("所有游戏元素初始化完成,游戏可以开始");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static class GameElementInitializer implements Runnable {
        private final CountDownLatch latch;
        private final String elementName;
        private final CountDownLatch dependencyLatch;

        public GameElementInitializer(CountDownLatch latch, String elementName) {
            this.latch = latch;
            this.elementName = elementName;
            this.dependencyLatch = null;
        }

        public GameElementInitializer(CountDownLatch latch, String elementName, CountDownLatch dependencyLatch) {
            this.latch = latch;
            this.elementName = elementName;
            this.dependencyLatch = dependencyLatch;
        }

        @Override
        public void run() {
            if (dependencyLatch != null) {
                try {
                    // 等待依赖的游戏元素初始化完成
                    dependencyLatch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            try {
                System.out.println(elementName + " 开始初始化");
                // 模拟初始化操作
                Thread.sleep(2000);
                System.out.println(elementName + " 初始化完成");
                // 初始化完成,计数减1
                latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

在这个例子中,游戏元素2的初始化线程依赖于游戏元素1的初始化完成。游戏元素2的初始化线程在启动后,首先调用dependencyLatch.await()等待游戏元素1初始化完成,然后再进行自身的初始化操作。

应用场景之性能测试与并发控制

模拟并发请求进行性能测试

在进行系统性能测试时,常常需要模拟大量并发请求来评估系统在高并发情况下的性能表现。CountDownLatch可以帮助我们实现这一点。

import java.util.concurrent.CountDownLatch;

public class PerformanceTestingExample {
    public static void main(String[] args) {
        int threadCount = 10;
        CountDownLatch startLatch = new CountDownLatch(1);
        CountDownLatch endLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            new Thread(new RequestSender(startLatch, endLatch)).start();
        }

        long startTime = System.currentTimeMillis();
        // 释放所有线程,开始并发请求
        startLatch.countDown();

        try {
            // 等待所有请求完成
            endLatch.await();
            long endTime = System.currentTimeMillis();
            System.out.println("所有请求完成,总耗时:" + (endTime - startTime) + " 毫秒");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static class RequestSender implements Runnable {
        private final CountDownLatch startLatch;
        private final CountDownLatch endLatch;

        public RequestSender(CountDownLatch startLatch, CountDownLatch endLatch) {
            this.startLatch = startLatch;
            this.endLatch = endLatch;
        }

        @Override
        public void run() {
            try {
                // 等待开始信号
                startLatch.await();
                System.out.println(Thread.currentThread().getName() + " 开始发送请求");
                // 模拟发送请求
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + " 请求发送完成");
                // 请求完成,计数减1
                endLatch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

在上述代码中,startLatch用于控制所有请求发送线程同时开始发送请求,而endLatch用于等待所有请求发送完成。通过这种方式,可以方便地测量系统处理大量并发请求的性能。

控制并发访问资源的线程数量

有时候,我们需要控制同时访问某个资源的线程数量,以避免资源过载。CountDownLatch可以辅助实现这一功能。

import java.util.concurrent.CountDownLatch;

public class ResourceAccessControlExample {
    public static void main(String[] args) {
        int maxConcurrentAccess = 3;
        CountDownLatch accessLatch = new CountDownLatch(maxConcurrentAccess);

        for (int i = 0; i < 10; i++) {
            new Thread(new ResourceAccessor(accessLatch)).start();
        }
    }

    static class ResourceAccessor implements Runnable {
        private final CountDownLatch accessLatch;

        public ResourceAccessor(CountDownLatch accessLatch) {
            this.accessLatch = accessLatch;
        }

        @Override
        public void run() {
            try {
                // 尝试获取访问许可
                accessLatch.await();
                System.out.println(Thread.currentThread().getName() + " 获得访问资源的许可");
                // 模拟访问资源
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName() + " 访问资源完成");
                // 释放访问许可
                accessLatch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

在这个例子中,accessLatch的初始计数为maxConcurrentAccess,代表最多允许maxConcurrentAccess个线程同时访问资源。每个线程在访问资源前调用await()方法获取许可,访问完成后调用countDown()方法释放许可,从而实现对并发访问资源线程数量的控制。

与其他并发工具的对比

与CyclicBarrier的对比

CountDownLatchCyclicBarrier都是用于线程同步的工具类,但它们在使用场景和功能上有一些区别。

  • 可重用性
    • CountDownLatch的计数只能从初始值递减到0,一旦计数为0,CountDownLatch就不能再被重置使用。例如在前面的数据分析示例中,数据读取完成后,CountDownLatch就完成了使命,无法再次用于相同的操作。
    • CyclicBarrier可以循环使用。当所有线程到达屏障点后,CyclicBarrier可以被重置,然后再次使用。例如在一个多阶段的游戏任务中,每个阶段都需要所有玩家准备好后同时开始,CyclicBarrier就非常适合这种场景。
  • 线程角色
    • CountDownLatch主要用于一个或多个线程等待一组线程完成任务,通常是主线程等待子线程完成。比如在多线程任务协同的场景中,主线程等待子线程读取数据完成后进行汇总分析。
    • CyclicBarrier则是用于一组线程相互等待,所有线程都到达屏障点后才会继续执行。例如在一个并行计算的场景中,多个线程分别计算不同部分的数据,然后在某个点上需要等待所有线程计算完成后,再进行下一步的合并计算。

下面通过一个简单的代码示例来展示CyclicBarrier的使用:

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample {
    public static void main(String[] args) {
        int parties = 3;
        CyclicBarrier barrier = new CyclicBarrier(parties, new Runnable() {
            @Override
            public void run() {
                System.out.println("所有线程已到达屏障点,开始下一步操作");
            }
        });

        for (int i = 0; i < parties; i++) {
            new Thread(new WorkerWithBarrier(barrier, i)).start();
        }
    }

    static class WorkerWithBarrier implements Runnable {
        private final CyclicBarrier barrier;
        private final int workerIndex;

        public WorkerWithBarrier(CyclicBarrier barrier, int workerIndex) {
            this.barrier = barrier;
            this.workerIndex = workerIndex;
        }

        @Override
        public void run() {
            try {
                System.out.println("Worker " + workerIndex + " 开始工作");
                // 模拟工作
                Thread.sleep(2000);
                System.out.println("Worker " + workerIndex + " 到达屏障点");
                barrier.await();
                System.out.println("Worker " + workerIndex + " 继续执行下一步操作");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}

在上述代码中,CyclicBarrierparties参数设置为3,代表有三个线程需要到达屏障点。当所有三个线程都调用await()方法到达屏障点后,会执行CyclicBarrier构造函数中传入的Runnable任务,然后所有线程继续执行后续操作。

与Semaphore的对比

SemaphoreCountDownLatch也有不同的应用场景。

  • 功能侧重点
    • Semaphore主要用于控制同时访问某个资源的线程数量,它维护了一组许可。例如在数据库连接池的场景中,Semaphore可以用来控制同时获取数据库连接的线程数量,避免过多线程同时获取连接导致数据库负载过高。
    • CountDownLatch更侧重于线程之间的任务协同,一个或多个线程等待其他一组线程完成任务。比如在一个分布式系统中,多个节点需要完成数据同步,主线程等待所有节点同步完成后再进行下一步操作,这种场景使用CountDownLatch更为合适。
  • 计数操作
    • Semaphore的许可可以被线程获取和释放,当许可数量为0时,尝试获取许可的线程会被阻塞,直到有其他线程释放许可。例如在一个停车场管理系统中,Semaphore可以模拟停车场的停车位,当停车位满了(许可数量为0),新的车辆(线程)就需要等待。
    • CountDownLatch的计数只能递减,不能递增,一旦计数为0,等待的线程就会被释放,并且不能再次重置计数。

下面通过一个简单的代码示例来展示Semaphore的使用:

import java.util.concurrent.Semaphore;

public class SemaphoreExample {
    public static void main(String[] args) {
        int availablePermits = 3;
        Semaphore semaphore = new Semaphore(availablePermits);

        for (int i = 0; i < 5; i++) {
            new Thread(new ResourceUser(semaphore)).start();
        }
    }

    static class ResourceUser implements Runnable {
        private final Semaphore semaphore;

        public ResourceUser(Semaphore semaphore) {
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            try {
                // 获取许可
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName() + " 获得许可,开始使用资源");
                // 模拟使用资源
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName() + " 使用资源完成,释放许可");
                // 释放许可
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

在上述代码中,Semaphore的初始许可数量为3,代表最多允许3个线程同时使用资源。每个线程在使用资源前调用acquire()方法获取许可,使用完成后调用release()方法释放许可。

注意事项与常见问题

异常处理

在使用CountDownLatch时,await()方法可能会抛出InterruptedException。这通常是因为在等待过程中,线程被中断。例如,在主线程等待子线程完成任务的过程中,如果主线程被外部请求中断,就会抛出该异常。

import java.util.concurrent.CountDownLatch;

public class InterruptedExceptionExample {
    public static void main(String[] args) {
        CountDownLatch latch = new CountDownLatch(1);

        Thread waitingThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    latch.await();
                    System.out.println("等待线程继续执行");
                } catch (InterruptedException e) {
                    System.out.println("等待线程被中断");
                    Thread.currentThread().interrupt();
                }
            }
        });

        waitingThread.start();

        // 模拟主线程执行其他任务
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 中断等待线程
        waitingThread.interrupt();
    }
}

在上述代码中,waitingThread调用await()方法等待CountDownLatch的计数为0。主线程在休眠1秒后中断waitingThreadwaitingThread捕获InterruptedException并处理,同时调用Thread.currentThread().interrupt()重新设置中断状态,以便上层调用者能够知晓线程被中断的情况。

计数设置不当

如果CountDownLatch的初始计数设置不当,可能会导致程序出现逻辑错误。例如,如果计数设置过小,可能会使等待的线程过早被释放,从而导致任务未完成就继续执行后续操作;如果计数设置过大,可能会使等待的线程永远无法被释放。

在实际应用中,需要根据具体的业务逻辑准确设置CountDownLatch的初始计数。例如在多线程任务协同的场景中,要确保计数与需要完成的子任务数量一致。

避免死锁

虽然CountDownLatch本身不会直接导致死锁,但在复杂的多线程场景中,如果使用不当,可能会与其他同步机制一起导致死锁。例如,如果一个线程在等待CountDownLatch的同时,又持有其他线程需要的锁,而这些线程又在等待CountDownLatch的计数为0,就可能会形成死锁。

为了避免死锁,在设计多线程程序时,需要仔细规划线程的执行顺序和同步机制的使用,确保不会出现循环等待的情况。同时,要注意对共享资源的访问控制,避免多个线程同时对同一资源进行竞争操作。

优化与扩展

提高性能

在高并发场景下,CountDownLatch的性能可能会成为瓶颈。为了提高性能,可以考虑以下几点:

  • 减少不必要的等待:尽量优化子任务的执行逻辑,减少子任务的执行时间,从而减少主线程等待的时间。例如在数据读取的场景中,可以优化数据库查询语句,提高数据读取速度。
  • 并行化处理:对于可以并行执行的子任务,尽量充分利用多核CPU的优势,增加并行度。例如在数据分析的场景中,如果数据量较大,可以将数据进行分块,然后使用更多的线程并行处理这些数据块。

扩展功能

有时候,CountDownLatch的基本功能可能无法满足复杂的业务需求,需要对其进行扩展。例如,可以通过继承CountDownLatch类,添加一些自定义的功能。

import java.util.concurrent.CountDownLatch;

public class ExtendedCountDownLatch extends CountDownLatch {
    private int completedTaskCount = 0;

    public ExtendedCountDownLatch(int count) {
        super(count);
    }

    @Override
    public void countDown() {
        super.countDown();
        completedTaskCount++;
        System.out.println("当前已完成任务数量:" + completedTaskCount);
    }

    public int getCompletedTaskCount() {
        return completedTaskCount;
    }
}

在上述代码中,ExtendedCountDownLatch继承自CountDownLatch,并添加了一个completedTaskCount变量来记录已完成的任务数量,同时重写了countDown()方法,在每次计数减1时更新已完成任务数量,并输出当前已完成任务数量。通过这种方式,可以扩展CountDownLatch的功能,以满足特定的业务需求。

实际项目中的应用案例

分布式系统中的数据同步

在分布式系统中,数据同步是一个常见的需求。例如,在一个分布式数据库系统中,有多个节点需要同步数据。可以使用CountDownLatch来确保所有节点的数据同步完成后,再进行下一步操作。

假设每个节点都有一个数据同步任务,当所有节点的同步任务完成后,系统需要进行一致性检查。代码示例如下:

import java.util.concurrent.CountDownLatch;

public class DistributedDataSyncExample {
    public static void main(String[] args) {
        int nodeCount = 5;
        CountDownLatch latch = new CountDownLatch(nodeCount);

        for (int i = 0; i < nodeCount; i++) {
            new Thread(new DataSyncTask(latch, i)).start();
        }

        try {
            // 等待所有节点数据同步完成
            latch.await();
            System.out.println("所有节点数据同步完成,开始一致性检查");
            // 进行一致性检查
            performConsistencyCheck();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static class DataSyncTask implements Runnable {
        private final CountDownLatch latch;
        private final int nodeIndex;

        public DataSyncTask(CountDownLatch latch, int nodeIndex) {
            this.latch = latch;
            this.nodeIndex = nodeIndex;
        }

        @Override
        public void run() {
            try {
                System.out.println("节点 " + nodeIndex + " 开始数据同步");
                // 模拟数据同步操作
                Thread.sleep((long) (Math.random() * 3000));
                System.out.println("节点 " + nodeIndex + " 数据同步完成");
                // 数据同步完成,计数减1
                latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static void performConsistencyCheck() {
        System.out.println("一致性检查中...");
        // 模拟一致性检查操作
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("一致性检查完成");
    }
}

在上述代码中,每个DataSyncTask线程模拟一个节点的数据同步任务,完成后调用countDown()方法。主线程等待所有节点同步完成后,进行一致性检查。

微服务架构中的服务调用

在微服务架构中,一个业务请求可能需要调用多个微服务。有时候,需要等待所有相关微服务的响应后,再进行统一的处理。例如,在一个电商系统中,一个订单查询请求可能需要调用商品服务、库存服务和用户服务。

import java.util.concurrent.CountDownLatch;

public class MicroserviceInvocationExample {
    public static void main(String[] args) {
        int serviceCount = 3;
        CountDownLatch latch = new CountDownLatch(serviceCount);

        // 调用商品服务
        new Thread(new MicroserviceInvoker(latch, "商品服务")).start();
        // 调用库存服务
        new Thread(new MicroserviceInvoker(latch, "库存服务")).start();
        // 调用用户服务
        new Thread(new MicroserviceInvoker(latch, "用户服务")).start();

        try {
            // 等待所有微服务响应
            latch.await();
            System.out.println("所有微服务响应已收到,开始处理订单查询结果");
            // 处理订单查询结果
            processOrderQueryResult();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static class MicroserviceInvoker implements Runnable {
        private final CountDownLatch latch;
        private final String serviceName;

        public MicroserviceInvoker(CountDownLatch latch, String serviceName) {
            this.latch = latch;
            this.serviceName = serviceName;
        }

        @Override
        public void run() {
            try {
                System.out.println("开始调用 " + serviceName);
                // 模拟微服务调用
                Thread.sleep((long) (Math.random() * 3000));
                System.out.println(serviceName + " 响应已收到");
                // 微服务响应已收到,计数减1
                latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static void processOrderQueryResult() {
        System.out.println("处理订单查询结果中...");
        // 模拟处理订单查询结果操作
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("订单查询结果处理完成");
    }
}

在上述代码中,每个MicroserviceInvoker线程模拟调用一个微服务,完成后调用countDown()方法。主线程等待所有微服务响应后,处理订单查询结果。

通过以上对CountDownLatch的详细介绍,包括其基础概念、应用场景、与其他并发工具的对比、注意事项、优化扩展以及实际项目中的应用案例,相信你对CountDownLatch在Java并发编程中的使用有了更深入的理解和掌握。在实际开发中,可以根据具体的业务需求灵活运用CountDownLatch,提高程序的并发性能和可靠性。