Java并发编程的最佳实践
一、线程安全基础
在Java并发编程中,线程安全是一个核心概念。当多个线程访问一个类时,如果不考虑这些线程的调度和交替运行,并且在调用方代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么这个类就是线程安全的。
1.1 共享可变数据的问题
考虑以下简单的Java类:
public class Counter {
private int count = 0;
public void increment() {
count++;
}
public int getCount() {
return count;
}
}
如果多个线程同时调用 increment
方法,就会出现问题。因为 count++
操作不是原子性的,它实际上包含了读取、增加和写入三个步骤。在多线程环境下,可能会发生数据竞争。假设有两个线程A和B同时执行 increment
方法,线程A读取了 count
的值为10,此时线程B也读取了 count
的值10,然后线程A增加并写入 count
为11,接着线程B增加并写入 count
也为11,而不是预期的12。
1.2 解决共享可变数据问题 - 同步块
为了解决上述问题,可以使用 synchronized
关键字来创建同步块。
public class SafeCounter {
private int count = 0;
public synchronized void increment() {
count++;
}
public synchronized int getCount() {
return count;
}
}
在这个例子中,increment
和 getCount
方法都被声明为 synchronized
。这意味着当一个线程进入 increment
方法时,其他线程就不能同时进入该方法,也不能进入 getCount
方法,从而保证了数据的一致性。
二、线程池与任务执行
2.1 线程池的概念
线程池是一种管理和复用线程的机制。创建和销毁线程是有一定开销的,线程池可以避免频繁地创建和销毁线程,提高系统性能。Java提供了 Executor
框架来管理线程池。
2.2 创建线程池
可以使用 ThreadPoolExecutor
类来创建线程池。以下是一个简单的示例:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExample {
public static void main(String[] args) {
// 创建一个固定大小为5的线程池
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
int taskNumber = i;
executorService.submit(() -> {
System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 关闭线程池,不再接受新任务
executorService.shutdown();
try {
// 等待所有任务执行完毕
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("Pool did not terminate");
}
}
} catch (InterruptedException ie) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
在这个例子中,Executors.newFixedThreadPool(5)
创建了一个固定大小为5的线程池。这意味着线程池最多可以同时执行5个任务,其余任务会在队列中等待。
2.3 线程池参数详解
ThreadPoolExecutor
构造函数有多个参数,这些参数决定了线程池的行为。
- corePoolSize:线程池中的核心线程数,即使这些线程处于空闲状态,也不会被销毁。
- maximumPoolSize:线程池所能容纳的最大线程数。当任务队列已满且工作线程数小于最大线程数时,线程池会创建新的线程来处理任务。
- keepAliveTime:当线程数大于核心线程数时,多余的空闲线程的存活时间。即如果一个线程空闲了
keepAliveTime
时间,就会被销毁。 - unit:
keepAliveTime
的时间单位。 - workQueue:任务队列,用于存放等待执行的任务。常见的任务队列有
ArrayBlockingQueue
、LinkedBlockingQueue
等。
三、并发集合框架
3.1 为什么需要并发集合
传统的Java集合类,如 ArrayList
、HashMap
等,在多线程环境下使用时需要手动进行同步,否则会出现数据不一致的问题。Java并发集合框架提供了一系列线程安全的集合类,方便在多线程环境下使用。
3.2 常用并发集合类
- ConcurrentHashMap:这是一个线程安全的哈希表。与
Hashtable
不同,ConcurrentHashMap
采用了分段锁的机制,允许多个线程同时访问不同的段,提高了并发性能。
import java.util.concurrent.ConcurrentHashMap;
public class ConcurrentHashMapExample {
public static void main(String[] args) {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("key1", 1);
map.put("key2", 2);
System.out.println(map.get("key1"));
}
}
- CopyOnWriteArrayList:这是一个线程安全的
ArrayList
。它的特点是在进行写操作(如添加、删除元素)时,会创建一个原数组的副本,在副本上进行操作,操作完成后再将原数组指向新的副本。这样读操作(如获取元素)就不需要加锁,因为读操作始终是在原数组上进行,不会受到写操作的影响。
import java.util.Iterator;
import java.util.concurrent.CopyOnWriteArrayList;
public class CopyOnWriteArrayListExample {
public static void main(String[] args) {
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("element1");
list.add("element2");
Iterator<String> iterator = list.iterator();
while (iterator.hasNext()) {
System.out.println(iterator.next());
}
list.add("element3");
System.out.println(list.size());
}
}
- ConcurrentLinkedQueue:这是一个线程安全的无界队列,基于链表实现。它采用了非阻塞算法,允许多个线程同时进行入队和出队操作,性能较高。
import java.util.concurrent.ConcurrentLinkedQueue;
public class ConcurrentLinkedQueueExample {
public static void main(String[] args) {
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
queue.add("element1");
queue.add("element2");
System.out.println(queue.poll());
}
}
四、锁机制
4.1 内置锁(Monitor锁)
在Java中,每个对象都有一个内置锁(也称为Monitor锁)。当一个方法被声明为 synchronized
时,调用该方法的线程必须先获取该对象的内置锁,才能执行方法体。当方法执行完毕或者抛出异常时,内置锁会被自动释放。
4.2 ReentrantLock
ReentrantLock
是Java提供的一种可重入的互斥锁,它提供了比内置锁更灵活的锁控制。
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockExample {
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
Thread thread1 = new Thread(() -> {
lock.lock();
try {
System.out.println("Thread 1 has acquired the lock");
// 模拟一些工作
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
System.out.println("Thread 1 has released the lock");
}
});
Thread thread2 = new Thread(() -> {
lock.lock();
try {
System.out.println("Thread 2 has acquired the lock");
// 模拟一些工作
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
System.out.println("Thread 2 has released the lock");
}
});
thread1.start();
thread2.start();
}
}
在这个例子中,ReentrantLock
确保了同一时间只有一个线程可以进入临界区。lock()
方法用于获取锁,unlock()
方法用于释放锁,并且为了保证锁一定被释放,unlock()
方法通常放在 finally
块中。
4.3 读写锁(ReadWriteLock)
ReadWriteLock
允许一个资源可以被多个读操作并发访问,或者被一个写操作独占访问。它维护了一对锁,一个读锁和一个写锁。
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockExample {
private static ReadWriteLock lock = new ReentrantReadWriteLock();
private static int data = 0;
public static void main(String[] args) {
Thread readThread1 = new Thread(() -> {
lock.readLock().lock();
try {
System.out.println("Read Thread 1: Data is " + data);
} finally {
lock.readLock().unlock();
}
});
Thread readThread2 = new Thread(() -> {
lock.readLock().lock();
try {
System.out.println("Read Thread 2: Data is " + data);
} finally {
lock.readLock().unlock();
}
});
Thread writeThread = new Thread(() -> {
lock.writeLock().lock();
try {
data++;
System.out.println("Write Thread: Data has been incremented to " + data);
} finally {
lock.writeLock().unlock();
}
});
readThread1.start();
readThread2.start();
writeThread.start();
}
}
在这个例子中,读线程可以同时获取读锁,而写线程需要独占写锁,这样既保证了读操作的并发性能,又保证了写操作的原子性和数据一致性。
五、并发工具类
5.1 CountDownLatch
CountDownLatch
是一个同步辅助类,它允许一个或多个线程等待,直到其他线程完成一组操作。
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
public static void main(String[] args) {
int numThreads = 5;
CountDownLatch latch = new CountDownLatch(numThreads);
for (int i = 0; i < numThreads; i++) {
new Thread(() -> {
try {
// 模拟一些工作
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " has finished");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}).start();
}
try {
latch.await();
System.out.println("All threads have finished");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
在这个例子中,CountDownLatch
的初始值为5,每个线程完成工作后调用 countDown()
方法,主线程调用 await()
方法等待,直到所有线程都调用了 countDown()
,主线程才会继续执行。
5.2 CyclicBarrier
CyclicBarrier
也是一个同步辅助类,它允许一组线程相互等待,直到所有线程都到达某个点(称为屏障点)。与 CountDownLatch
不同的是,CyclicBarrier
可以重复使用。
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
public static void main(String[] args) {
int numThreads = 3;
CyclicBarrier barrier = new CyclicBarrier(numThreads, () -> {
System.out.println("All threads have reached the barrier");
});
for (int i = 0; i < numThreads; i++) {
new Thread(() -> {
try {
// 模拟一些工作
Thread.sleep((long) (Math.random() * 2000));
System.out.println(Thread.currentThread().getName() + " is waiting at the barrier");
barrier.await();
System.out.println(Thread.currentThread().getName() + " has passed the barrier");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
在这个例子中,CyclicBarrier
的初始值为3,当三个线程都调用了 await()
方法时,会触发一个可选的 Runnable
任务,然后所有线程继续执行。
5.3 Semaphore
Semaphore
是一个计数信号量,它维护了一组许可证。线程可以通过调用 acquire()
方法获取许可证,如果没有可用的许可证,线程会被阻塞,直到有许可证可用。通过调用 release()
方法可以释放许可证。
import java.util.concurrent.Semaphore;
public class SemaphoreExample {
public static void main(String[] args) {
int permits = 2;
Semaphore semaphore = new Semaphore(permits);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " has acquired a permit");
// 模拟一些工作
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
System.out.println(Thread.currentThread().getName() + " has released a permit");
}
}).start();
}
}
}
在这个例子中,Semaphore
的初始许可证数量为2,最多允许两个线程同时获取许可证并执行临界区代码。
六、线程间通信
6.1 wait() 和 notify() 方法
在Java中,Object
类提供了 wait()
和 notify()
方法用于线程间通信。wait()
方法会使当前线程等待,直到其他线程调用 notify()
或 notifyAll()
方法唤醒它。
public class WaitNotifyExample {
private static final Object lock = new Object();
private static boolean flag = false;
public static void main(String[] args) {
Thread producer = new Thread(() -> {
synchronized (lock) {
while (flag) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Producer is producing data");
flag = true;
lock.notify();
}
});
Thread consumer = new Thread(() -> {
synchronized (lock) {
while (!flag) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Consumer is consuming data");
flag = false;
lock.notify();
}
});
producer.start();
consumer.start();
}
}
在这个例子中,生产者和消费者线程通过 lock
对象的 wait()
和 notify()
方法进行通信,避免了消费者在没有数据时消费,以及生产者在已有数据时继续生产的问题。
6.2 Condition接口
Condition
接口提供了比 wait()
和 notify()
更灵活的线程间通信方式。它是在 Lock
框架下使用的。
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionExample {
private static final Lock lock = new ReentrantLock();
private static final Condition condition = lock.newCondition();
private static boolean flag = false;
public static void main(String[] args) {
Thread producer = new Thread(() -> {
lock.lock();
try {
while (flag) {
condition.await();
}
System.out.println("Producer is producing data");
flag = true;
condition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
});
Thread consumer = new Thread(() -> {
lock.lock();
try {
while (!flag) {
condition.await();
}
System.out.println("Consumer is consuming data");
flag = false;
condition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
});
producer.start();
consumer.start();
}
}
在这个例子中,Condition
与 Lock
配合使用,await()
方法相当于 wait()
,signal()
方法相当于 notify()
,但 Condition
可以创建多个实例,实现更细粒度的线程间通信控制。
七、原子操作
7.1 原子类的概念
在Java并发包中,java.util.concurrent.atomic
包提供了一系列原子类。这些类提供了原子性的操作,避免了使用锁带来的性能开销。例如,AtomicInteger
类可以对整数进行原子性的操作。
7.2 常用原子类示例
- AtomicInteger:
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicIntegerExample {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(0);
System.out.println(atomicInteger.incrementAndGet());
}
}
在这个例子中,incrementAndGet()
方法是原子性的,它会先增加 AtomicInteger
的值,然后返回增加后的值。
2. AtomicReference:用于对对象引用进行原子性操作。
import java.util.concurrent.atomic.AtomicReference;
public class AtomicReferenceExample {
public static void main(String[] args) {
AtomicReference<String> atomicReference = new AtomicReference<>("initial value");
System.out.println(atomicReference.getAndSet("new value"));
}
}
这里,getAndSet()
方法会先返回当前的引用值,然后设置新的引用值,整个过程是原子性的。
通过合理使用原子类,可以在保证线程安全的同时,提高程序的性能,尤其是在高并发场景下。在实际的Java并发编程中,结合上述的各种技术和最佳实践,可以构建出高效、稳定且线程安全的应用程序。无论是小型的单服务器应用,还是大型的分布式系统,这些知识都能帮助开发者避免常见的并发问题,提升系统的整体质量。