MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java同步器的使用与实现

2022-06-137.4k 阅读

Java同步器概述

在多线程编程的场景中,为了确保数据的一致性和线程操作的正确性,常常需要使用同步机制。Java提供了多种同步器,这些同步器在不同的应用场景下发挥着重要作用。

1. 什么是同步器

同步器本质上是一种工具,用于协调多个线程对共享资源的访问。它可以控制线程的执行顺序,避免竞争条件和数据不一致的问题。例如,当多个线程同时访问一个共享变量时,同步器可以确保同一时间只有一个线程能够修改该变量,从而保证数据的完整性。

2. 同步器的分类

Java中的同步器主要分为以下几类:

  • 锁(Lock):如ReentrantLock,它提供了比synchronized关键字更灵活的锁机制。
  • 信号量(Semaphore):用于控制同时访问特定资源的线程数量。
  • 倒计时器(CountDownLatch):允许一个或多个线程等待,直到其他一组线程完成一系列操作。
  • 循环栅栏(CyclicBarrier):使一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,所有被阻塞的线程才会继续执行。

锁(Lock)

1. ReentrantLock的使用

ReentrantLock是Java提供的一种可重入锁。所谓可重入,是指同一个线程可以多次获取同一个锁,而不会造成死锁。

import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockExample {
    private static ReentrantLock lock = new ReentrantLock();
    private static int count = 0;

    public static void increment() {
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        Thread thread1 = new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                increment();
            }
        });

        Thread thread2 = new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                increment();
            }
        });

        thread1.start();
        thread2.start();

        try {
            thread1.join();
            thread2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Final count: " + count);
    }
}

在上述代码中,ReentrantLock被用于保护对count变量的操作。lock()方法用于获取锁,unlock()方法用于释放锁。使用try - finally块确保无论在try块中发生什么异常,锁都会被正确释放。

2. ReentrantLock的实现原理

ReentrantLock内部使用了AQS(AbstractQueuedSynchronizer)框架来实现同步。AQS是一个用于构建锁和同步器的框架,它通过一个FIFO队列来管理等待获取锁的线程。

  • 获取锁:当线程调用lock()方法时,首先尝试获取锁。如果锁当前未被占用,该线程获取锁并将锁的持有计数加1。如果锁已被占用,线程会被封装成一个节点加入到AQS队列中,进入等待状态。
  • 释放锁:当线程调用unlock()方法时,将锁的持有计数减1。如果持有计数变为0,表明该线程已经完全释放了锁,AQS会从队列中唤醒一个等待线程。

信号量(Semaphore)

1. Semaphore的使用

Semaphore可以控制同时访问某个资源的线程数量。例如,在一个数据库连接池中,我们可以使用Semaphore来限制同时获取连接的线程数量,以避免过多的线程竞争连接资源。

import java.util.concurrent.Semaphore;

public class SemaphoreExample {
    private static Semaphore semaphore = new Semaphore(3); // 允许同时3个线程访问

    public static void accessResource() {
        try {
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName() + " acquired the semaphore.");
            // 模拟资源访问
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getName() + " released the semaphore.");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore.release();
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(() -> accessResource());
            thread.start();
        }
    }
}

在上述代码中,Semaphore被初始化为允许3个线程同时访问。acquire()方法用于获取信号量,如果当前信号量不足,线程会进入等待状态。release()方法用于释放信号量,唤醒等待队列中的线程。

2. Semaphore的实现原理

Semaphore同样基于AQS框架实现。它维护了一个信号量计数,该计数表示当前可用的许可数量。当线程调用acquire()方法时,会尝试减少信号量计数。如果计数大于0,则成功获取许可并将计数减1;如果计数为0,则线程会被加入到AQS队列中等待。当线程调用release()方法时,会增加信号量计数,并唤醒等待队列中的线程。

倒计时器(CountDownLatch)

1. CountDownLatch的使用

CountDownLatch允许一个或多个线程等待,直到其他一组线程完成一系列操作。例如,在一个多线程的数据处理任务中,主线程可能需要等待所有子线程完成数据处理后,再进行结果汇总。

import java.util.concurrent.CountDownLatch;

public class CountDownLatchExample {
    private static CountDownLatch latch = new CountDownLatch(3);

    public static void main(String[] args) {
        Thread thread1 = new Thread(() -> {
            System.out.println("Thread 1 started.");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Thread 1 finished.");
            latch.countDown();
        });

        Thread thread2 = new Thread(() -> {
            System.out.println("Thread 2 started.");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Thread 2 finished.");
            latch.countDown();
        });

        Thread thread3 = new Thread(() -> {
            System.out.println("Thread 3 started.");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Thread 3 finished.");
            latch.countDown();
        });

        thread1.start();
        thread2.start();
        thread3.start();

        try {
            latch.await();
            System.out.println("All threads have finished. Main thread can proceed.");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在上述代码中,CountDownLatch被初始化为3。每个子线程在完成任务后调用countDown()方法,将计数减1。主线程调用await()方法等待,直到计数变为0,即所有子线程都完成任务。

2. CountDownLatch的实现原理

CountDownLatch内部使用AQS的共享模式来实现。它维护了一个初始计数,该计数会随着countDown()方法的调用而减少。当线程调用await()方法时,如果计数为0,线程立即返回;否则,线程会被加入到AQS队列中等待,直到计数变为0。

循环栅栏(CyclicBarrier)

1. CyclicBarrier的使用

CyclicBarrier使一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,所有被阻塞的线程才会继续执行。并且CyclicBarrier可以被重用。

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample {
    private static CyclicBarrier barrier = new CyclicBarrier(3, () -> {
        System.out.println("All threads have reached the barrier.");
    });

    public static void performTask() {
        try {
            System.out.println(Thread.currentThread().getName() + " is waiting at the barrier.");
            barrier.await();
            System.out.println(Thread.currentThread().getName() + " has passed the barrier.");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            Thread thread = new Thread(() -> performTask());
            thread.start();
        }
    }
}

在上述代码中,CyclicBarrier被初始化为3个线程。当每个线程调用await()方法时,会等待其他线程到达屏障。当所有3个线程都到达屏障时,会执行传入的Runnable任务,然后所有线程继续执行。

2. CyclicBarrier的实现原理

CyclicBarrier内部使用了一个计数器来记录到达屏障的线程数量。每次有线程调用await()方法时,计数器减1。当计数器变为0时,表明所有线程都已到达屏障,此时会唤醒所有等待的线程,并重置计数器,以便下次使用。如果在等待过程中,有线程中断或屏障被破坏,会抛出相应的异常。

同步器的选择与优化

1. 同步器的选择

在实际应用中,选择合适的同步器至关重要。以下是一些选择同步器的建议:

  • 锁(Lock):当需要更细粒度的控制,如可中断的锁获取、公平锁等功能时,优先选择ReentrantLock。如果只是简单的同步需求,synchronized关键字也是一个不错的选择,因为它语法简洁。
  • 信号量(Semaphore):适用于控制对共享资源的并发访问数量,如数据库连接池、线程池等场景。
  • 倒计时器(CountDownLatch):用于一个或多个线程等待其他一组线程完成任务的场景,例如等待所有数据处理线程完成后进行结果汇总。
  • 循环栅栏(CyclicBarrier):当一组线程需要在某个同步点进行等待,并且该同步点需要重复使用时,使用CyclicBarrier

2. 同步器的优化

为了提高多线程程序的性能,对同步器的优化必不可少:

  • 减少锁的粒度:尽量只在必要的代码块上加锁,避免在大段代码上使用锁,从而减少线程等待时间。
  • 使用读写锁:如果对共享资源的操作主要是读操作,可以使用ReadWriteLock,允许多个线程同时读,提高并发性能。
  • 避免死锁:在使用多个锁时,要按照一定的顺序获取锁,避免形成死锁。例如,所有线程都按照相同的顺序获取锁。

高级同步器特性

1. 公平性与非公平性

  • 公平锁:在公平锁模式下,等待时间最长的线程会优先获取锁。例如,ReentrantLock可以通过构造函数传入true来创建公平锁。公平锁保证了线程获取锁的顺序性,但由于每次锁的获取都需要检查等待队列,会增加额外的开销,性能相对较低。
ReentrantLock fairLock = new ReentrantLock(true);
  • 非公平锁:非公平锁在锁可用时,不考虑等待队列中的线程顺序,直接让请求锁的线程获取锁。ReentrantLock默认是非公平锁,非公平锁在高并发场景下性能更好,因为它减少了线程切换的开销。

2. 条件变量(Condition)

ReentrantLock提供了Condition接口,用于实现更灵活的线程间通信。Condition类似于传统的Objectwait()notify()方法,但功能更强大。

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionExample {
    private static ReentrantLock lock = new ReentrantLock();
    private static Condition condition = lock.newCondition();

    public static void main(String[] args) {
        Thread thread1 = new Thread(() -> {
            lock.lock();
            try {
                System.out.println("Thread 1 is waiting on condition.");
                condition.await();
                System.out.println("Thread 1 has been signaled.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        });

        Thread thread2 = new Thread(() -> {
            lock.lock();
            try {
                System.out.println("Thread 2 is signaling.");
                condition.signal();
            } finally {
                lock.unlock();
            }
        });

        thread1.start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        thread2.start();
    }
}

在上述代码中,thread1调用condition.await()方法进入等待状态,thread2调用condition.signal()方法唤醒thread1Condition可以创建多个,实现更复杂的线程间通信逻辑。

与其他并发工具的结合使用

1. 与线程池结合

同步器常常与线程池一起使用,以提高并发处理能力。例如,在使用ThreadPoolExecutor时,可以使用Semaphore来限制任务提交的速度,避免线程池过载。

import java.util.concurrent.*;

public class ThreadPoolWithSemaphore {
    private static Semaphore semaphore = new Semaphore(5); // 限制同时提交5个任务
    private static ExecutorService executorService = Executors.newFixedThreadPool(10);

    public static void submitTask() {
        try {
            semaphore.acquire();
            executorService.submit(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + " is running task.");
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
            });
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            submitTask();
        }

        executorService.shutdown();
    }
}

在上述代码中,Semaphore限制了同时提交到线程池的任务数量,避免线程池瞬间接收过多任务而导致性能问题。

2. 与并发集合结合

在使用并发集合如ConcurrentHashMap时,同步器可以进一步保证数据的一致性。例如,在对ConcurrentHashMap进行复杂操作时,可以使用ReentrantLock来确保操作的原子性。

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;

public class ConcurrentHashMapWithLock {
    private static ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
    private static ReentrantLock lock = new ReentrantLock();

    public static void updateMap(String key, int value) {
        lock.lock();
        try {
            Integer oldValue = map.get(key);
            if (oldValue != null) {
                map.put(key, oldValue + value);
            } else {
                map.put(key, value);
            }
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        Thread thread1 = new Thread(() -> updateMap("key1", 10));
        Thread thread2 = new Thread(() -> updateMap("key1", 20));

        thread1.start();
        thread2.start();

        try {
            thread1.join();
            thread2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Final value for key1: " + map.get("key1"));
    }
}

在上述代码中,ReentrantLock保证了对ConcurrentHashMap的更新操作是原子的,避免了并发更新可能导致的数据不一致问题。

通过深入了解和正确使用Java的同步器,开发人员可以编写出高效、安全的多线程程序,充分利用多核处理器的性能,提升应用程序的整体质量。在实际应用中,需要根据具体的业务场景和性能需求,合理选择和优化同步器的使用。