MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java条件变量的应用与实例

2024-08-138.0k 阅读

Java条件变量的基本概念

在多线程编程的场景中,Java的条件变量(Condition Variables)是一种非常重要的同步工具,它基于Java的管程(Monitor)模型。管程作为一种同步机制,包含了一个锁以及一些条件变量。条件变量提供了一种线程间协调的方式,允许线程在满足特定条件时才继续执行,否则进入等待状态。

在Java中,java.util.concurrent.locks.Condition 接口就代表了条件变量。它是在Java 5.0引入的 java.util.concurrent.locks 包中的一部分,与传统的 Object 类的 wait()notify()notifyAll() 方法类似,但在功能上更为强大和灵活。

每个 Condition 实例都必须与一个 Lock 实例相关联,通过 LocknewCondition() 方法来创建 Condition 对象。例如:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionExample {
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();

    // 其他代码
}

这里,ReentrantLock 是一种可重入锁,通过它的 newCondition() 方法创建了一个 Condition 实例。

条件变量的核心方法

await() 方法

await() 方法是 Condition 接口中最重要的方法之一。当一个线程调用 await() 方法时,它会释放与该 Condition 关联的 Lock,并进入等待状态,直到其他线程调用该 Conditionsignal()signalAll() 方法唤醒它,或者在等待过程中线程被中断。

当线程被唤醒后,它会尝试重新获取与该 Condition 关联的 Lock。只有获取到锁后,线程才会从 await() 方法返回并继续执行后续代码。

以下是一个简单的示例,展示了 await() 方法的使用:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class AwaitExample {
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    private boolean ready = false;

    public void awaitMethod() {
        lock.lock();
        try {
            while (!ready) {
                condition.await();
            }
            System.out.println("线程继续执行");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            lock.unlock();
        }
    }

    public void signalMethod() {
        lock.lock();
        try {
            ready = true;
            condition.signal();
        } finally {
            lock.unlock();
        }
    }
}

在这个例子中,awaitMethod() 方法中的线程在 readyfalse 时调用 condition.await() 进入等待状态,并释放锁。signalMethod() 方法会将 ready 设置为 true 并调用 condition.signal() 唤醒等待的线程。

signal() 方法

signal() 方法用于唤醒一个在该 Condition 上等待的线程。如果有多个线程在等待,那么会随机选择一个线程被唤醒。被唤醒的线程不会马上执行,而是要等到当前持有锁的线程释放锁后,它才有机会获取锁并继续执行。

在上述 AwaitExample 类中的 signalMethod() 方法里,condition.signal() 就使用了这个方法,当 ready 被设置为 true 后,唤醒一个在 condition 上等待的线程。

signalAll() 方法

signalAll() 方法会唤醒所有在该 Condition 上等待的线程。这些被唤醒的线程都会竞争获取与该 Condition 关联的 Lock。一旦某个线程获取到锁,它就会从 await() 方法返回并继续执行,其他未获取到锁的线程则继续等待获取锁。

例如,我们可以对上述代码稍作修改,让多个线程等待并通过 signalAll() 唤醒:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class SignalAllExample {
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    private boolean ready = false;

    public void awaitMethod() {
        lock.lock();
        try {
            while (!ready) {
                condition.await();
            }
            System.out.println(Thread.currentThread().getName() + " 继续执行");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            lock.unlock();
        }
    }

    public void signalAllMethod() {
        lock.lock();
        try {
            ready = true;
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }
}

然后通过以下方式测试:

public class Main {
    public static void main(String[] args) {
        SignalAllExample example = new SignalAllExample();
        Thread thread1 = new Thread(() -> example.awaitMethod(), "线程1");
        Thread thread2 = new Thread(() -> example.awaitMethod(), "线程2");
        thread1.start();
        thread2.start();

        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        example.signalAllMethod();
    }
}

在这个例子中,thread1thread2 都会在 awaitMethod() 中等待,当 signalAllMethod() 调用 condition.signalAll() 后,两个线程都会被唤醒并竞争锁,获取到锁的线程会继续执行并输出相应信息。

Java条件变量的应用场景

生产者 - 消费者模型

生产者 - 消费者模型是多线程编程中经典的应用场景,它描述了多个生产者线程向一个共享缓冲区添加数据,同时多个消费者线程从该缓冲区中取出数据的过程。在这个模型中,条件变量可以很好地协调生产者和消费者线程的工作。

假设我们有一个固定大小的缓冲区,当缓冲区满时,生产者线程需要等待;当缓冲区空时,消费者线程需要等待。以下是一个基于条件变量实现的生产者 - 消费者模型示例:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProducerConsumer {
    private final int capacity;
    private int count = 0;
    private final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public ProducerConsumer(int capacity) {
        this.capacity = capacity;
    }

    public void produce(int item) throws InterruptedException {
        lock.lock();
        try {
            while (count == capacity) {
                notFull.await();
            }
            count++;
            System.out.println(Thread.currentThread().getName() + " 生产了: " + item);
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public int consume() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                notEmpty.await();
            }
            count--;
            int item = count;
            System.out.println(Thread.currentThread().getName() + " 消费了: " + item);
            notFull.signal();
            return item;
        } finally {
            lock.unlock();
        }
    }
}

我们可以通过以下方式测试这个生产者 - 消费者模型:

public class Main {
    public static void main(String[] args) {
        ProducerConsumer pc = new ProducerConsumer(5);
        Thread producer1 = new Thread(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    pc.produce(i);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "生产者1");

        Thread consumer1 = new Thread(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    pc.consume();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "消费者1");

        producer1.start();
        consumer1.start();

        try {
            producer1.join();
            consumer1.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在这个示例中,ProducerConsumer 类中的 produce() 方法在缓冲区满时通过 notFull.await() 等待,当有空间时生产数据并通过 notEmpty.signal() 唤醒可能等待的消费者线程。consume() 方法在缓冲区空时通过 notEmpty.await() 等待,当有数据时消费数据并通过 notFull.signal() 唤醒可能等待的生产者线程。

线程间的顺序执行

在某些场景下,我们需要多个线程按照特定的顺序执行。例如,线程A先执行,然后线程B执行,最后线程C执行。条件变量可以有效地实现这种线程间的顺序控制。

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class OrderedThreads {
    private final Lock lock = new ReentrantLock();
    private final Condition condition1 = lock.newCondition();
    private final Condition condition2 = lock.newCondition();
    private boolean step1Done = false;
    private boolean step2Done = false;

    public void threadA() {
        lock.lock();
        try {
            System.out.println("线程A开始执行");
            // 模拟一些工作
            Thread.sleep(1000);
            step1Done = true;
            condition1.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void threadB() {
        lock.lock();
        try {
            while (!step1Done) {
                condition1.await();
            }
            System.out.println("线程B开始执行");
            // 模拟一些工作
            Thread.sleep(1000);
            step2Done = true;
            condition2.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void threadC() {
        lock.lock();
        try {
            while (!step2Done) {
                condition2.await();
            }
            System.out.println("线程C开始执行");
            // 模拟一些工作
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

通过以下方式测试:

public class Main {
    public static void main(String[] args) {
        OrderedThreads ot = new OrderedThreads();
        Thread a = new Thread(() -> ot.threadA(), "线程A");
        Thread b = new Thread(() -> ot.threadB(), "线程B");
        Thread c = new Thread(() -> ot.threadC(), "线程C");

        c.start();
        b.start();
        a.start();

        try {
            a.join();
            b.join();
            c.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在这个例子中,threadBstep1Donefalse 时通过 condition1.await() 等待,threadA 执行完相应工作后设置 step1Donetrue 并通过 condition1.signal() 唤醒 threadB。同理,threadC 等待 step2Donetrue 后被 threadB 唤醒执行。

资源池管理

在资源池管理场景中,例如数据库连接池、线程池等,条件变量可以用于协调资源的分配和回收。当资源池中的资源耗尽时,请求资源的线程需要等待;当有资源被释放回资源池时,等待的线程可以被唤醒获取资源。

以下是一个简单的模拟资源池的示例:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ResourcePool {
    private final int maxResources;
    private int availableResources;
    private final Lock lock = new ReentrantLock();
    private final Condition available = lock.newCondition();

    public ResourcePool(int maxResources) {
        this.maxResources = maxResources;
        this.availableResources = maxResources;
    }

    public void acquire() throws InterruptedException {
        lock.lock();
        try {
            while (availableResources == 0) {
                available.await();
            }
            availableResources--;
            System.out.println(Thread.currentThread().getName() + " 获取了资源");
        } finally {
            lock.unlock();
        }
    }

    public void release() {
        lock.lock();
        try {
            availableResources++;
            System.out.println(Thread.currentThread().getName() + " 释放了资源");
            available.signal();
        } finally {
            lock.unlock();
        }
    }
}

测试代码如下:

public class Main {
    public static void main(String[] args) {
        ResourcePool pool = new ResourcePool(3);
        Thread thread1 = new Thread(() -> {
            try {
                pool.acquire();
                // 使用资源
                Thread.sleep(2000);
                pool.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "线程1");

        Thread thread2 = new Thread(() -> {
            try {
                pool.acquire();
                // 使用资源
                Thread.sleep(2000);
                pool.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "线程2");

        Thread thread3 = new Thread(() -> {
            try {
                pool.acquire();
                // 使用资源
                Thread.sleep(2000);
                pool.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "线程3");

        Thread thread4 = new Thread(() -> {
            try {
                pool.acquire();
                // 使用资源
                Thread.sleep(2000);
                pool.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "线程4");

        thread1.start();
        thread2.start();
        thread3.start();
        thread4.start();

        try {
            thread1.join();
            thread2.join();
            thread3.join();
            thread4.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在这个示例中,ResourcePool 类管理了一定数量的资源。acquire() 方法在资源不足时通过 available.await() 等待,release() 方法在释放资源后通过 available.signal() 唤醒等待获取资源的线程。

条件变量与传统 wait()/notify() 方法的比较

功能差异

传统的 wait()notify()notifyAll() 方法是定义在 Object 类中的,它们与内置锁(也称为监视器锁)紧密绑定。每个对象都有一个内置锁,当一个线程调用对象的 wait() 方法时,它必须持有该对象的内置锁,并且会释放该锁进入等待状态。当其他线程调用 notify()notifyAll() 时,等待的线程会被唤醒并尝试重新获取内置锁。

Condition 接口提供了更丰富和灵活的功能。它与 Lock 接口配合使用,一个 Lock 可以创建多个 Condition 实例,每个 Condition 实例可以用于不同的条件等待和唤醒。例如,在生产者 - 消费者模型中,我们可以使用一个 Condition 表示缓冲区不满的条件,另一个 Condition 表示缓冲区不空的条件,这使得代码逻辑更加清晰。

线程安全与可重入性

内置锁是可重入的,即同一个线程可以多次获取同一个内置锁而不会造成死锁。ReentrantLock 同样是可重入的,并且基于 ReentrantLock 创建的 Condition 也继承了这种可重入性。

然而,使用传统的 wait()/notify() 方法时,由于与内置锁紧密耦合,在复杂的多线程场景中,可能会因为锁的获取和释放顺序不当导致线程安全问题。而 ConditionLock 配合使用,通过显式的锁操作,使得代码在控制线程同步方面更加可控,降低了出现线程安全问题的风险。

异常处理

当线程在 Condition 上调用 await() 方法时,它会抛出 InterruptedException,这使得线程在等待过程中可以被中断并进行相应的处理。而传统的 wait() 方法也会抛出 InterruptedException,但在实际应用中,由于内置锁的使用方式相对简单,可能在处理中断时不够灵活。

例如,在使用 Condition 的场景中,我们可以在 await() 方法的 catch 块中进行更细致的处理,如记录日志、重置线程状态等:

try {
    condition.await();
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    // 记录日志或进行其他处理
    System.out.println("线程被中断");
}

相比之下,传统的 wait() 方法在处理中断时,可能只是简单地在 catch 块中重新抛出异常或者忽略异常,无法进行更丰富的操作。

应用场景适应性

在简单的多线程同步场景中,传统的 wait()/notify() 方法可能已经足够满足需求。例如,一个简单的生产者 - 消费者模型,只有一个缓冲区和简单的同步需求,使用内置锁和 wait()/notify() 方法可以快速实现。

然而,在复杂的多线程应用中,如涉及多个条件等待、更细粒度的锁控制以及灵活的线程间协调的场景,Condition 接口则展现出了更大的优势。例如,在一个大型的分布式系统中,多个服务之间的资源共享和同步可能涉及多种不同的条件,此时使用 Condition 可以更好地组织和管理线程间的协作。

条件变量使用中的注意事项

锁的获取与释放

在使用 Condition 时,必须先获取与之关联的 Lock。所有对 Condition 的操作,如 await()signal()signalAll(),都必须在持有该 Lock 的情况下进行。在操作完成后,一定要记得释放锁,通常使用 finally 块来确保锁的正确释放,以避免死锁或其他线程安全问题。

例如:

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
lock.lock();
try {
    // 对Condition的操作
    condition.await();
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
} finally {
    lock.unlock();
}

避免虚假唤醒

虽然在大多数情况下,await() 方法会在接收到 signal()signalAll() 信号后被唤醒,但在某些特殊情况下,线程可能会被虚假唤醒,即没有收到信号也会从 await() 方法返回。为了避免这种情况,应该在 await() 方法调用时使用 while 循环检查条件,而不是简单的 if 语句。

例如:

lock.lock();
try {
    while (!conditionMet) {
        condition.await();
    }
    // 条件满足后的操作
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
} finally {
    lock.unlock();
}

合理使用 signal() 和 signalAll()

signal() 方法只唤醒一个等待的线程,而 signalAll() 方法唤醒所有等待的线程。在选择使用哪个方法时,需要根据具体的应用场景来决定。如果只有一个线程需要被唤醒执行特定任务,使用 signal() 方法可以减少不必要的线程竞争,提高效率。但如果多个线程都需要被唤醒以继续执行不同的任务,那么就应该使用 signalAll() 方法。

例如,在生产者 - 消费者模型中,如果只有一个消费者线程需要被唤醒处理数据,使用 signal() 即可;但如果有多个消费者线程分别处理不同类型的数据,那么可能需要使用 signalAll() 唤醒所有消费者线程。

线程中断处理

如前所述,await() 方法会抛出 InterruptedException,在捕获该异常时,应该妥善处理线程的中断状态。通常建议在 catch 块中调用 Thread.currentThread().interrupt() 来重新设置中断状态,以便上层调用者能够知晓线程被中断的情况。

例如:

try {
    condition.await();
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    // 其他处理
}

这样可以确保线程在被中断后,后续的代码能够正确处理中断状态,避免出现意外的行为。

性能考量

虽然 Condition 提供了强大的功能,但在高并发场景下,频繁地使用 await()signal()signalAll() 方法可能会带来一定的性能开销。因为这些操作涉及线程状态的切换和锁的竞争。为了优化性能,可以考虑减少不必要的等待和唤醒操作,合理设置等待条件,尽量减少线程在等待状态和运行状态之间的切换次数。

例如,在资源池管理中,可以对资源的获取和释放进行更精细的控制,只有在真正需要资源或有资源可用时才进行唤醒操作,避免无效的唤醒导致线程频繁竞争锁。

通过深入理解和正确使用Java条件变量,开发者可以在多线程编程中实现更高效、更灵活的线程间协作,解决各种复杂的同步问题。同时,注意使用过程中的各种细节和注意事项,能够避免常见的线程安全问题,提升程序的稳定性和性能。