MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java并发编程的最佳实践

2024-04-134.3k 阅读

一、线程安全基础

在Java并发编程中,线程安全是一个核心概念。当多个线程访问一个类时,如果不考虑这些线程的调度和交替运行,并且在调用方代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么这个类就是线程安全的。

1.1 共享可变数据的问题

考虑以下简单的Java类:

public class Counter {
    private int count = 0;

    public void increment() {
        count++;
    }

    public int getCount() {
        return count;
    }
}

如果多个线程同时调用 increment 方法,就会出现问题。因为 count++ 操作不是原子性的,它实际上包含了读取、增加和写入三个步骤。在多线程环境下,可能会发生数据竞争。假设有两个线程A和B同时执行 increment 方法,线程A读取了 count 的值为10,此时线程B也读取了 count 的值10,然后线程A增加并写入 count 为11,接着线程B增加并写入 count 也为11,而不是预期的12。

1.2 解决共享可变数据问题 - 同步块

为了解决上述问题,可以使用 synchronized 关键字来创建同步块。

public class SafeCounter {
    private int count = 0;

    public synchronized void increment() {
        count++;
    }

    public synchronized int getCount() {
        return count;
    }
}

在这个例子中,incrementgetCount 方法都被声明为 synchronized。这意味着当一个线程进入 increment 方法时,其他线程就不能同时进入该方法,也不能进入 getCount 方法,从而保证了数据的一致性。

二、线程池与任务执行

2.1 线程池的概念

线程池是一种管理和复用线程的机制。创建和销毁线程是有一定开销的,线程池可以避免频繁地创建和销毁线程,提高系统性能。Java提供了 Executor 框架来管理线程池。

2.2 创建线程池

可以使用 ThreadPoolExecutor 类来创建线程池。以下是一个简单的示例:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExample {
    public static void main(String[] args) {
        // 创建一个固定大小为5的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        for (int i = 0; i < 10; i++) {
            int taskNumber = i;
            executorService.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        // 关闭线程池,不再接受新任务
        executorService.shutdown();
        try {
            // 等待所有任务执行完毕
            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
                if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                    System.err.println("Pool did not terminate");
                }
            }
        } catch (InterruptedException ie) {
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

在这个例子中,Executors.newFixedThreadPool(5) 创建了一个固定大小为5的线程池。这意味着线程池最多可以同时执行5个任务,其余任务会在队列中等待。

2.3 线程池参数详解

ThreadPoolExecutor 构造函数有多个参数,这些参数决定了线程池的行为。

  • corePoolSize:线程池中的核心线程数,即使这些线程处于空闲状态,也不会被销毁。
  • maximumPoolSize:线程池所能容纳的最大线程数。当任务队列已满且工作线程数小于最大线程数时,线程池会创建新的线程来处理任务。
  • keepAliveTime:当线程数大于核心线程数时,多余的空闲线程的存活时间。即如果一个线程空闲了 keepAliveTime 时间,就会被销毁。
  • unitkeepAliveTime 的时间单位。
  • workQueue:任务队列,用于存放等待执行的任务。常见的任务队列有 ArrayBlockingQueueLinkedBlockingQueue 等。

三、并发集合框架

3.1 为什么需要并发集合

传统的Java集合类,如 ArrayListHashMap 等,在多线程环境下使用时需要手动进行同步,否则会出现数据不一致的问题。Java并发集合框架提供了一系列线程安全的集合类,方便在多线程环境下使用。

3.2 常用并发集合类

  1. ConcurrentHashMap:这是一个线程安全的哈希表。与 Hashtable 不同,ConcurrentHashMap 采用了分段锁的机制,允许多个线程同时访问不同的段,提高了并发性能。
import java.util.concurrent.ConcurrentHashMap;

public class ConcurrentHashMapExample {
    public static void main(String[] args) {
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
        map.put("key1", 1);
        map.put("key2", 2);
        System.out.println(map.get("key1"));
    }
}
  1. CopyOnWriteArrayList:这是一个线程安全的 ArrayList。它的特点是在进行写操作(如添加、删除元素)时,会创建一个原数组的副本,在副本上进行操作,操作完成后再将原数组指向新的副本。这样读操作(如获取元素)就不需要加锁,因为读操作始终是在原数组上进行,不会受到写操作的影响。
import java.util.Iterator;
import java.util.concurrent.CopyOnWriteArrayList;

public class CopyOnWriteArrayListExample {
    public static void main(String[] args) {
        CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
        list.add("element1");
        list.add("element2");

        Iterator<String> iterator = list.iterator();
        while (iterator.hasNext()) {
            System.out.println(iterator.next());
        }

        list.add("element3");
        System.out.println(list.size());
    }
}
  1. ConcurrentLinkedQueue:这是一个线程安全的无界队列,基于链表实现。它采用了非阻塞算法,允许多个线程同时进行入队和出队操作,性能较高。
import java.util.concurrent.ConcurrentLinkedQueue;

public class ConcurrentLinkedQueueExample {
    public static void main(String[] args) {
        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
        queue.add("element1");
        queue.add("element2");
        System.out.println(queue.poll());
    }
}

四、锁机制

4.1 内置锁(Monitor锁)

在Java中,每个对象都有一个内置锁(也称为Monitor锁)。当一个方法被声明为 synchronized 时,调用该方法的线程必须先获取该对象的内置锁,才能执行方法体。当方法执行完毕或者抛出异常时,内置锁会被自动释放。

4.2 ReentrantLock

ReentrantLock 是Java提供的一种可重入的互斥锁,它提供了比内置锁更灵活的锁控制。

import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockExample {
    private static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {
        Thread thread1 = new Thread(() -> {
            lock.lock();
            try {
                System.out.println("Thread 1 has acquired the lock");
                // 模拟一些工作
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
                System.out.println("Thread 1 has released the lock");
            }
        });

        Thread thread2 = new Thread(() -> {
            lock.lock();
            try {
                System.out.println("Thread 2 has acquired the lock");
                // 模拟一些工作
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
                System.out.println("Thread 2 has released the lock");
            }
        });

        thread1.start();
        thread2.start();
    }
}

在这个例子中,ReentrantLock 确保了同一时间只有一个线程可以进入临界区。lock() 方法用于获取锁,unlock() 方法用于释放锁,并且为了保证锁一定被释放,unlock() 方法通常放在 finally 块中。

4.3 读写锁(ReadWriteLock)

ReadWriteLock 允许一个资源可以被多个读操作并发访问,或者被一个写操作独占访问。它维护了一对锁,一个读锁和一个写锁。

import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockExample {
    private static ReadWriteLock lock = new ReentrantReadWriteLock();
    private static int data = 0;

    public static void main(String[] args) {
        Thread readThread1 = new Thread(() -> {
            lock.readLock().lock();
            try {
                System.out.println("Read Thread 1: Data is " + data);
            } finally {
                lock.readLock().unlock();
            }
        });

        Thread readThread2 = new Thread(() -> {
            lock.readLock().lock();
            try {
                System.out.println("Read Thread 2: Data is " + data);
            } finally {
                lock.readLock().unlock();
            }
        });

        Thread writeThread = new Thread(() -> {
            lock.writeLock().lock();
            try {
                data++;
                System.out.println("Write Thread: Data has been incremented to " + data);
            } finally {
                lock.writeLock().unlock();
            }
        });

        readThread1.start();
        readThread2.start();
        writeThread.start();
    }
}

在这个例子中,读线程可以同时获取读锁,而写线程需要独占写锁,这样既保证了读操作的并发性能,又保证了写操作的原子性和数据一致性。

五、并发工具类

5.1 CountDownLatch

CountDownLatch 是一个同步辅助类,它允许一个或多个线程等待,直到其他线程完成一组操作。

import java.util.concurrent.CountDownLatch;

public class CountDownLatchExample {
    public static void main(String[] args) {
        int numThreads = 5;
        CountDownLatch latch = new CountDownLatch(numThreads);

        for (int i = 0; i < numThreads; i++) {
            new Thread(() -> {
                try {
                    // 模拟一些工作
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getName() + " has finished");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    latch.countDown();
                }
            }).start();
        }

        try {
            latch.await();
            System.out.println("All threads have finished");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在这个例子中,CountDownLatch 的初始值为5,每个线程完成工作后调用 countDown() 方法,主线程调用 await() 方法等待,直到所有线程都调用了 countDown(),主线程才会继续执行。

5.2 CyclicBarrier

CyclicBarrier 也是一个同步辅助类,它允许一组线程相互等待,直到所有线程都到达某个点(称为屏障点)。与 CountDownLatch 不同的是,CyclicBarrier 可以重复使用。

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample {
    public static void main(String[] args) {
        int numThreads = 3;
        CyclicBarrier barrier = new CyclicBarrier(numThreads, () -> {
            System.out.println("All threads have reached the barrier");
        });

        for (int i = 0; i < numThreads; i++) {
            new Thread(() -> {
                try {
                    // 模拟一些工作
                    Thread.sleep((long) (Math.random() * 2000));
                    System.out.println(Thread.currentThread().getName() + " is waiting at the barrier");
                    barrier.await();
                    System.out.println(Thread.currentThread().getName() + " has passed the barrier");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

在这个例子中,CyclicBarrier 的初始值为3,当三个线程都调用了 await() 方法时,会触发一个可选的 Runnable 任务,然后所有线程继续执行。

5.3 Semaphore

Semaphore 是一个计数信号量,它维护了一组许可证。线程可以通过调用 acquire() 方法获取许可证,如果没有可用的许可证,线程会被阻塞,直到有许可证可用。通过调用 release() 方法可以释放许可证。

import java.util.concurrent.Semaphore;

public class SemaphoreExample {
    public static void main(String[] args) {
        int permits = 2;
        Semaphore semaphore = new Semaphore(permits);

        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + " has acquired a permit");
                    // 模拟一些工作
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                    System.out.println(Thread.currentThread().getName() + " has released a permit");
                }
            }).start();
        }
    }
}

在这个例子中,Semaphore 的初始许可证数量为2,最多允许两个线程同时获取许可证并执行临界区代码。

六、线程间通信

6.1 wait() 和 notify() 方法

在Java中,Object 类提供了 wait()notify() 方法用于线程间通信。wait() 方法会使当前线程等待,直到其他线程调用 notify()notifyAll() 方法唤醒它。

public class WaitNotifyExample {
    private static final Object lock = new Object();
    private static boolean flag = false;

    public static void main(String[] args) {
        Thread producer = new Thread(() -> {
            synchronized (lock) {
                while (flag) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("Producer is producing data");
                flag = true;
                lock.notify();
            }
        });

        Thread consumer = new Thread(() -> {
            synchronized (lock) {
                while (!flag) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("Consumer is consuming data");
                flag = false;
                lock.notify();
            }
        });

        producer.start();
        consumer.start();
    }
}

在这个例子中,生产者和消费者线程通过 lock 对象的 wait()notify() 方法进行通信,避免了消费者在没有数据时消费,以及生产者在已有数据时继续生产的问题。

6.2 Condition接口

Condition 接口提供了比 wait()notify() 更灵活的线程间通信方式。它是在 Lock 框架下使用的。

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionExample {
    private static final Lock lock = new ReentrantLock();
    private static final Condition condition = lock.newCondition();
    private static boolean flag = false;

    public static void main(String[] args) {
        Thread producer = new Thread(() -> {
            lock.lock();
            try {
                while (flag) {
                    condition.await();
                }
                System.out.println("Producer is producing data");
                flag = true;
                condition.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        });

        Thread consumer = new Thread(() -> {
            lock.lock();
            try {
                while (!flag) {
                    condition.await();
                }
                System.out.println("Consumer is consuming data");
                flag = false;
                condition.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        });

        producer.start();
        consumer.start();
    }
}

在这个例子中,ConditionLock 配合使用,await() 方法相当于 wait()signal() 方法相当于 notify(),但 Condition 可以创建多个实例,实现更细粒度的线程间通信控制。

七、原子操作

7.1 原子类的概念

在Java并发包中,java.util.concurrent.atomic 包提供了一系列原子类。这些类提供了原子性的操作,避免了使用锁带来的性能开销。例如,AtomicInteger 类可以对整数进行原子性的操作。

7.2 常用原子类示例

  1. AtomicInteger
import java.util.concurrent.atomic.AtomicInteger;

public class AtomicIntegerExample {
    public static void main(String[] args) {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        System.out.println(atomicInteger.incrementAndGet());
    }
}

在这个例子中,incrementAndGet() 方法是原子性的,它会先增加 AtomicInteger 的值,然后返回增加后的值。 2. AtomicReference:用于对对象引用进行原子性操作。

import java.util.concurrent.atomic.AtomicReference;

public class AtomicReferenceExample {
    public static void main(String[] args) {
        AtomicReference<String> atomicReference = new AtomicReference<>("initial value");
        System.out.println(atomicReference.getAndSet("new value"));
    }
}

这里,getAndSet() 方法会先返回当前的引用值,然后设置新的引用值,整个过程是原子性的。

通过合理使用原子类,可以在保证线程安全的同时,提高程序的性能,尤其是在高并发场景下。在实际的Java并发编程中,结合上述的各种技术和最佳实践,可以构建出高效、稳定且线程安全的应用程序。无论是小型的单服务器应用,还是大型的分布式系统,这些知识都能帮助开发者避免常见的并发问题,提升系统的整体质量。