MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java信号量的使用与示例

2022-03-172.6k 阅读

Java信号量的基本概念

什么是信号量

信号量(Semaphore)是一种用于控制对共享资源访问的同步工具,它通过一个计数器来管理对资源的访问许可。在Java中,java.util.concurrent.Semaphore类提供了信号量的实现。信号量的计数器初始值表示可用资源的数量,当一个线程获取信号量时,计数器会减一;当一个线程释放信号量时,计数器会加一。如果计数器的值为零,意味着没有可用资源,此时尝试获取信号量的线程将被阻塞,直到有其他线程释放信号量。

信号量的作用

信号量主要用于解决多线程环境下对共享资源的并发访问控制问题。例如,在一个数据库连接池的场景中,连接池中有一定数量的数据库连接,多个线程可能同时需要获取连接进行数据库操作。使用信号量可以有效地控制同时获取连接的线程数量,避免过多线程同时访问连接池导致资源耗尽或性能问题。

信号量还可以用于实现线程间的同步。比如,在生产者 - 消费者模型中,信号量可以用来控制生产者向缓冲区中放入数据的时机以及消费者从缓冲区中取出数据的时机,确保缓冲区不会出现数据丢失或越界访问的情况。

Java信号量的使用

构造函数

Semaphore类提供了多个构造函数,最常用的有以下两种:

  1. Semaphore(int permits):创建具有给定许可数的信号量,并设置为非公平信号量。这里的permits就是初始的资源数量。
  2. Semaphore(int permits, boolean fair):创建具有给定许可数的信号量,并根据fair参数设置是否为公平信号量。如果fairtrue,则表示该信号量是公平的,即等待时间最长的线程将优先获得许可;如果为false,则为非公平信号量,在许可可用时,线程可以抢占许可。

获取许可

线程通过调用Semaphoreacquire()方法来获取许可。如果当前有可用许可(即计数器大于零),则获取许可并将计数器减一;如果没有可用许可,则线程将被阻塞,直到有其他线程释放许可。acquire()方法有以下几种重载形式:

  1. void acquire():获取一个许可,若当前无可用许可,则线程一直阻塞。
  2. void acquire(int permits):获取指定数量的许可,若当前没有足够的可用许可,则线程一直阻塞。
  3. boolean acquire(long timeout, TimeUnit unit):在指定的时间内尝试获取一个许可,如果在指定时间内获取到许可,则返回true,否则返回false
  4. boolean acquire(int permits, long timeout, TimeUnit unit):在指定的时间内尝试获取指定数量的许可,如果在指定时间内获取到足够的许可,则返回true,否则返回false

释放许可

线程使用完共享资源后,需要调用Semaphorerelease()方法来释放许可,使计数器加一,从而让其他等待的线程有机会获取许可。release()方法也有几种重载形式:

  1. void release():释放一个许可。
  2. void release(int permits):释放指定数量的许可。

查询信号量状态

Semaphore类还提供了一些方法用于查询信号量的状态:

  1. int availablePermits():返回当前可用的许可数。
  2. int getQueueLength():返回等待获取许可的线程估计数。
  3. boolean hasQueuedThreads():查询是否有线程正在等待获取许可。

代码示例

简单的信号量示例

下面通过一个简单的示例来演示信号量的基本使用。假设有一个停车场,停车场有固定数量的停车位,多个车辆线程尝试进入停车场停车,停车完毕后离开停车场释放停车位。

import java.util.concurrent.Semaphore;

public class ParkingLot {
    private static final int PARKING_SPACES = 5;
    private final Semaphore semaphore;

    public ParkingLot() {
        semaphore = new Semaphore(PARKING_SPACES);
    }

    public void park(String carName) {
        try {
            semaphore.acquire();
            System.out.println(carName + " 进入停车场,当前停车位:" + semaphore.availablePermits());
            // 模拟停车操作
            Thread.sleep(2000);
            System.out.println(carName + " 离开停车场,当前停车位:" + semaphore.availablePermits());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore.release();
        }
    }

    public static void main(String[] args) {
        ParkingLot parkingLot = new ParkingLot();
        String[] cars = {"Car1", "Car2", "Car3", "Car4", "Car5", "Car6"};
        for (String car : cars) {
            new Thread(() -> parkingLot.park(car)).start();
        }
    }
}

在这个示例中,ParkingLot类表示停车场,构造函数中创建了一个初始许可数为5的信号量,代表停车场有5个停车位。park方法模拟车辆停车和离开的过程,车辆线程首先调用acquire方法获取许可(停车位),如果获取成功则进入停车场停车,停车完毕后调用release方法释放许可(停车位)。在main方法中,创建了6个车辆线程尝试进入停车场,由于停车位只有5个,所以会有一个线程暂时等待,直到有其他车辆离开释放停车位。

生产者 - 消费者模型中的信号量应用

接下来看一个在生产者 - 消费者模型中使用信号量的示例。假设有一个缓冲区,生产者线程向缓冲区中放入数据,消费者线程从缓冲区中取出数据。

import java.util.concurrent.Semaphore;

public class ProducerConsumer {
    private static final int BUFFER_SIZE = 5;
    private final Semaphore empty = new Semaphore(BUFFER_SIZE);
    private final Semaphore full = new Semaphore(0);
    private final Object[] buffer = new Object[BUFFER_SIZE];
    private int in = 0;
    private int out = 0;

    public void produce(Object item) {
        try {
            empty.acquire();
            buffer[in] = item;
            System.out.println("生产者放入:" + item);
            in = (in + 1) % BUFFER_SIZE;
            full.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public Object consume() {
        try {
            full.acquire();
            Object item = buffer[out];
            System.out.println("消费者取出:" + item);
            out = (out + 1) % BUFFER_SIZE;
            empty.release();
            return item;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static void main(String[] args) {
        ProducerConsumer pc = new ProducerConsumer();
        Thread producer = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                pc.produce("Data" + i);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        Thread consumer = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                pc.consume();
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        producer.start();
        consumer.start();
    }
}

在这个示例中,ProducerConsumer类表示生产者 - 消费者模型。empty信号量表示缓冲区中的空闲位置,初始许可数为缓冲区大小BUFFER_SIZEfull信号量表示缓冲区中的已占用位置,初始许可数为0。生产者线程在放入数据前先获取empty信号量的许可,放入数据后释放full信号量的许可;消费者线程在取出数据前先获取full信号量的许可,取出数据后释放empty信号量的许可。通过这种方式,有效地控制了生产者和消费者对缓冲区的访问,避免了数据竞争和缓冲区溢出等问题。

信号量在多线程并发控制中的应用

再来看一个在多线程并发控制中使用信号量的示例,假设我们有一个任务队列,多个线程需要从任务队列中获取任务并执行,但同时只允许一定数量的线程执行任务。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;

public class TaskExecutor {
    private static final int MAX_ACTIVE_THREADS = 3;
    private final Semaphore semaphore;
    private final BlockingQueue<Runnable> taskQueue;

    public TaskExecutor() {
        semaphore = new Semaphore(MAX_ACTIVE_THREADS);
        taskQueue = new LinkedBlockingQueue<>();
        for (int i = 0; i < MAX_ACTIVE_THREADS; i++) {
            new Thread(() -> {
                while (true) {
                    try {
                        Runnable task = taskQueue.take();
                        semaphore.acquire();
                        try {
                            task.run();
                        } finally {
                            semaphore.release();
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }).start();
        }
    }

    public void submitTask(Runnable task) {
        taskQueue.add(task);
    }

    public static void main(String[] args) {
        TaskExecutor executor = new TaskExecutor();
        for (int i = 0; i < 10; i++) {
            int taskNumber = i;
            executor.submitTask(() -> {
                System.out.println("任务 " + taskNumber + " 开始执行");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务 " + taskNumber + " 执行完毕");
            });
        }
    }
}

在这个示例中,TaskExecutor类负责管理任务的提交和执行。构造函数中创建了一个许可数为MAX_ACTIVE_THREADS(这里为3)的信号量,并启动了MAX_ACTIVE_THREADS个线程来执行任务。submitTask方法用于将任务添加到任务队列中。每个执行线程从任务队列中取出任务,在执行任务前先获取信号量的许可,执行完毕后释放许可。这样就保证了同时最多只有MAX_ACTIVE_THREADS个线程在执行任务,有效地控制了并发度,避免系统资源过度消耗。

公平与非公平信号量的对比

下面通过一个示例来对比公平信号量和非公平信号量的行为。

import java.util.concurrent.Semaphore;

public class FairnessComparison {
    private static final int PERMITS = 1;
    private static final int THREADS = 10;

    public static void main(String[] args) {
        // 公平信号量
        Semaphore fairSemaphore = new Semaphore(PERMITS, true);
        System.out.println("公平信号量:");
        testSemaphore(fairSemaphore);

        // 非公平信号量
        Semaphore unfairSemaphore = new Semaphore(PERMITS, false);
        System.out.println("\n非公平信号量:");
        testSemaphore(unfairSemaphore);
    }

    private static void testSemaphore(Semaphore semaphore) {
        for (int i = 0; i < THREADS; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + " 获取到许可");
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getName() + " 释放许可");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
            }, "Thread-" + i).start();
        }
    }
}

在这个示例中,testSemaphore方法用于测试给定信号量的行为。创建了10个线程尝试获取许可,每个线程获取许可后睡眠1秒再释放许可。对于公平信号量,线程将按照等待顺序获取许可;而对于非公平信号量,在许可可用时,线程可能会抢占许可,导致等待时间长的线程不一定优先获取许可。通过观察输出结果,可以明显看到公平信号量和非公平信号量在获取许可顺序上的差异。

信号量与其他同步工具的比较

与锁(Lock)的比较

  1. 功能侧重:锁主要用于保证同一时间只有一个线程能够访问临界区,侧重于互斥访问。而信号量不仅可以实现互斥(当许可数为1时),还可以控制同时访问资源的线程数量,更加灵活。
  2. 资源管理:锁通常用于保护一段代码块或资源,没有直接的资源计数概念。信号量通过许可数来明确表示可用资源的数量,在资源管理方面更加直观,适用于管理有限数量的共享资源。
  3. 公平性:锁可以通过ReentrantLock的构造函数设置公平性,信号量同样可以在构造时设置公平性。但公平性的实现机制略有不同,公平锁会按照线程请求的顺序来分配锁,公平信号量会按照线程等待的顺序来分配许可。

与条件变量(Condition)的比较

  1. 同步机制:条件变量通常与锁配合使用,用于线程间的复杂同步,例如线程A等待某个条件满足后才能继续执行,线程B在满足条件时通知线程A。信号量也可以实现线程间的同步,但它更侧重于资源的访问控制,通过许可的获取和释放来协调线程的执行。
  2. 应用场景:条件变量适用于需要根据特定条件进行线程间协作的场景,比如生产者 - 消费者模型中,消费者线程等待缓冲区有数据时才消费。信号量则更适合控制对共享资源的并发访问数量,如数据库连接池的管理。

信号量使用中的注意事项

死锁风险

在使用信号量时,如果获取和释放许可的顺序不当,可能会导致死锁。例如,线程A获取了信号量S1,然后试图获取信号量S2,而线程B获取了信号量S2,然后试图获取信号量S1,此时两个线程都在等待对方释放信号量,就会产生死锁。为了避免死锁,在多信号量的场景下,应该按照固定的顺序获取信号量。

许可数量的设置

许可数量的设置需要根据实际应用场景来合理调整。如果许可数量设置过大,可能无法达到控制并发访问的目的;如果设置过小,可能会导致线程等待时间过长,影响系统性能。例如,在数据库连接池场景中,需要根据数据库的负载能力、应用程序的并发请求数量等因素来确定合适的连接数(即信号量的许可数)。

异常处理

在获取和释放许可的过程中,应该正确处理可能抛出的异常。特别是在acquire方法中,可能会抛出InterruptedException,此时应该合理处理中断,例如恢复中断状态或进行必要的清理操作,以确保程序的健壮性。

信号量在实际项目中的应用案例

数据库连接池

在许多企业级应用中,数据库连接是一种宝贵的资源。使用信号量可以有效地管理数据库连接池中的连接数量。当一个线程需要获取数据库连接时,先从信号量获取许可,如果获取成功则从连接池中取出一个连接;使用完毕后释放连接并归还许可。通过这种方式,可以避免过多线程同时请求数据库连接导致数据库过载。

资源池管理

除了数据库连接池,信号量还可以用于管理其他类型的资源池,如线程池、文件句柄池等。在这些场景中,信号量可以控制同时使用资源的线程数量,确保资源的合理分配和高效利用。

分布式系统中的资源控制

在分布式系统中,不同节点可能需要共享一些远程资源。信号量可以在分布式环境下用于控制对这些共享资源的访问。例如,在分布式缓存系统中,使用信号量可以控制同时访问缓存的客户端数量,避免缓存被过度访问导致性能问题。

通过以上对Java信号量的详细介绍、代码示例、与其他同步工具的比较以及使用注意事项和实际应用案例的讲解,相信读者对Java信号量有了全面而深入的理解,能够在实际项目中灵活运用信号量来解决多线程并发访问控制的问题。