MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java ThreadPoolExecutor 重入锁的使用场景

2023-08-013.6k 阅读

Java ThreadPoolExecutor 概述

在Java并发编程领域,ThreadPoolExecutor是一个至关重要的类,它位于java.util.concurrent包中。ThreadPoolExecutor提供了一个灵活的线程池实现,允许开发者控制线程的创建、复用以及任务的执行。

线程池的核心优势在于它能够管理一组线程,这些线程可以被重复使用来执行多个任务。通过复用线程,避免了频繁创建和销毁线程带来的开销,提高了应用程序的性能和资源利用率。

ThreadPoolExecutor类提供了丰富的构造函数,允许开发者根据需求定制线程池的各种参数,例如核心线程数、最大线程数、线程存活时间等。下面是一个基本的ThreadPoolExecutor构造函数示例:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
  • corePoolSize:核心线程数,线程池在正常情况下保持的线程数量。即使这些线程处于空闲状态,它们也不会被销毁,除非设置了allowCoreThreadTimeOuttrue
  • maximumPoolSize:线程池允许的最大线程数。当任务队列已满且活跃线程数小于最大线程数时,线程池会创建新的线程来执行任务。
  • keepAliveTime:当线程数大于核心线程数时,多余的空闲线程等待新任务的最长时间。超过这个时间,多余的线程将被销毁。
  • unitkeepAliveTime的时间单位,例如TimeUnit.SECONDS表示秒。
  • workQueue:用于存储等待执行任务的阻塞队列。常见的阻塞队列有ArrayBlockingQueueLinkedBlockingQueue等。

重入锁(ReentrantLock)概述

重入锁是Java并发包java.util.concurrent.locks中的一个重要工具,它实现了Lock接口。与内置的synchronized关键字相比,ReentrantLock提供了更灵活和强大的锁机制。

“重入”意味着同一个线程可以多次获取同一个锁,而不会造成死锁。每次线程获取锁时,锁的持有计数会增加,每次释放锁时,持有计数会减少。当持有计数变为0时,锁被完全释放。

以下是ReentrantLock的基本使用示例:

import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockExample {
    private static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {
        Thread thread1 = new Thread(() -> {
            lock.lock();
            try {
                System.out.println("Thread 1 acquired the lock.");
                // 模拟一些工作
                for (int i = 0; i < 5; i++) {
                    System.out.println("Thread 1 working: " + i);
                }
            } finally {
                lock.unlock();
                System.out.println("Thread 1 released the lock.");
            }
        });

        Thread thread2 = new Thread(() -> {
            lock.lock();
            try {
                System.out.println("Thread 2 acquired the lock.");
                // 模拟一些工作
                for (int i = 0; i < 5; i++) {
                    System.out.println("Thread 2 working: " + i);
                }
            } finally {
                lock.unlock();
                System.out.println("Thread 2 released the lock.");
            }
        });

        thread1.start();
        thread2.start();
    }
}

在上述代码中,ReentrantLock用于保护共享资源,确保同一时间只有一个线程能够访问。lock()方法用于获取锁,unlock()方法用于释放锁。使用try - finally块来确保在任何情况下锁都能被正确释放,避免死锁。

Java ThreadPoolExecutor 与重入锁结合的使用场景

  1. 资源保护 在多线程环境下,线程池中的线程可能会访问共享资源,如数据库连接、文件系统等。为了确保这些共享资源的一致性和完整性,需要使用锁机制。重入锁可以有效地防止多个线程同时访问共享资源,避免数据竞争和不一致问题。

假设我们有一个线程池用于处理数据库查询任务,并且这些任务需要共享一个数据库连接对象。代码示例如下:

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class DatabaseQueryThreadPool {
    private static final String URL = "jdbc:mysql://localhost:3306/mydb";
    private static final String USER = "root";
    private static final String PASSWORD = "password";
    private static Connection connection;
    private static ReentrantLock lock = new ReentrantLock();

    static {
        try {
            connection = DriverManager.getConnection(URL, USER, PASSWORD);
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                4,
                10,
                TimeUnit.SECONDS,
                workQueue
        );

        for (int i = 0; i < 5; i++) {
            executor.submit(() -> {
                lock.lock();
                try {
                    String query = "SELECT * FROM users WHERE id =?";
                    PreparedStatement statement = connection.prepareStatement(query);
                    statement.setInt(1, 1);
                    ResultSet resultSet = statement.executeQuery();
                    while (resultSet.next()) {
                        System.out.println("Thread " + Thread.currentThread().getName() + " retrieved user: " + resultSet.getString("name"));
                    }
                } catch (SQLException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            });
        }

        executor.shutdown();
    }
}

在上述代码中,ReentrantLock用于保护数据库连接对象connection。每个任务在执行数据库查询前获取锁,确保同一时间只有一个线程可以使用该连接,从而避免了数据库连接资源的竞争问题。

  1. 任务顺序控制 有时候,线程池中的任务需要按照特定的顺序执行,或者某些任务需要等待其他任务完成后才能执行。重入锁可以通过条件变量(Condition)来实现任务之间的顺序控制和同步。

假设我们有一个线程池,其中包含两个类型的任务:初始化任务和后续处理任务。后续处理任务需要等待初始化任务完成后才能执行。代码示例如下:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Condition;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class TaskOrderThreadPool {
    private static ReentrantLock lock = new ReentrantLock();
    private static Condition condition = lock.newCondition();
    private static boolean initialized = false;

    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                4,
                10,
                TimeUnit.SECONDS,
                workQueue
        );

        executor.submit(() -> {
            lock.lock();
            try {
                System.out.println("Initialization task started.");
                // 模拟初始化工作
                Thread.sleep(2000);
                initialized = true;
                System.out.println("Initialization task completed.");
                condition.signalAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        });

        for (int i = 0; i < 3; i++) {
            executor.submit(() -> {
                lock.lock();
                try {
                    while (!initialized) {
                        condition.await();
                    }
                    System.out.println("Subsequent task " + i + " started.");
                    // 模拟后续处理工作
                    Thread.sleep(1000);
                    System.out.println("Subsequent task " + i + " completed.");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            });
        }

        executor.shutdown();
    }
}

在上述代码中,初始化任务在完成初始化工作后,通过condition.signalAll()唤醒所有等待的后续处理任务。后续处理任务在开始执行前,通过condition.await()等待初始化任务完成。这种方式确保了任务的顺序执行。

  1. 避免死锁 在复杂的多线程应用中,死锁是一个常见的问题。死锁通常发生在多个线程相互等待对方释放资源的情况下。重入锁的可重入特性以及灵活的锁获取和释放机制可以帮助避免死锁。

假设我们有一个线程池,其中的任务需要获取多个锁来完成操作。如果不使用合适的锁机制,可能会导致死锁。下面是一个可能出现死锁的示例:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class DeadlockExample {
    private static final Object lock1 = new Object();
    private static final Object lock2 = new Object();

    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                4,
                10,
                TimeUnit.SECONDS,
                workQueue
        );

        executor.submit(() -> {
            synchronized (lock1) {
                System.out.println("Thread 1 acquired lock1.");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (lock2) {
                    System.out.println("Thread 1 acquired lock2.");
                }
            }
        });

        executor.submit(() -> {
            synchronized (lock2) {
                System.out.println("Thread 2 acquired lock2.");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (lock1) {
                    System.out.println("Thread 2 acquired lock1.");
                }
            }
        });

        executor.shutdown();
    }
}

在上述代码中,如果Thread 1先获取lock1Thread 2先获取lock2,然后双方都尝试获取对方已持有的锁,就会发生死锁。

使用ReentrantLock可以通过控制锁的获取顺序来避免死锁。下面是使用ReentrantLock避免死锁的示例:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class AvoidDeadlockExample {
    private static ReentrantLock lock1 = new ReentrantLock();
    private static ReentrantLock lock2 = new ReentrantLock();

    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                4,
                10,
                TimeUnit.SECONDS,
                workQueue
        );

        executor.submit(() -> {
            lock1.lock();
            try {
                System.out.println("Thread 1 acquired lock1.");
                Thread.sleep(1000);
                lock2.lock();
                try {
                    System.out.println("Thread 1 acquired lock2.");
                } finally {
                    lock2.unlock();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock1.unlock();
            }
        });

        executor.submit(() -> {
            lock1.lock();
            try {
                System.out.println("Thread 2 acquired lock1.");
                Thread.sleep(1000);
                lock2.lock();
                try {
                    System.out.println("Thread 2 acquired lock2.");
                } finally {
                    lock2.unlock();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock1.unlock();
            }
        });

        executor.shutdown();
    }
}

在上述代码中,两个线程都按照相同的顺序获取lock1lock2,从而避免了死锁的发生。

  1. 线程池动态调整与同步 在一些场景下,需要根据任务的执行情况动态调整线程池的参数,如核心线程数、最大线程数等。重入锁可以用于同步线程池的动态调整操作,确保在调整过程中线程池的状态一致性。

假设我们有一个线程池,在某些任务执行完成后,需要根据任务的执行结果动态增加核心线程数。代码示例如下:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class ThreadPoolDynamicAdjustment {
    private static ThreadPoolExecutor executor;
    private static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
        executor = new ThreadPoolExecutor(
                2,
                4,
                10,
                TimeUnit.SECONDS,
                workQueue
        );

        for (int i = 0; i < 5; i++) {
            executor.submit(() -> {
                boolean success = false;
                // 模拟任务执行
                try {
                    Thread.sleep(2000);
                    success = true;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                if (success) {
                    lock.lock();
                    try {
                        int currentCorePoolSize = executor.getCorePoolSize();
                        executor.setCorePoolSize(currentCorePoolSize + 1);
                        System.out.println("Core pool size increased to: " + executor.getCorePoolSize());
                    } finally {
                        lock.unlock();
                    }
                }
            });
        }

        executor.shutdown();
    }
}

在上述代码中,ReentrantLock用于保护线程池核心线程数的调整操作。当任务执行成功后,获取锁并增加核心线程数,确保在多线程环境下动态调整操作的线程安全。

  1. 线程池中的资源复用与锁控制 线程池的一个重要优势是资源复用,例如复用线程来执行不同的任务。然而,在复用资源的过程中,可能会涉及到资源的状态管理和同步问题。重入锁可以用于控制资源的复用过程,确保资源在不同任务之间的正确使用。

假设我们有一个线程池,其中的线程需要复用一个共享的缓存对象。缓存对象在读取和写入操作时需要进行同步控制。代码示例如下:

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class ResourceReuseInThreadPool {
    private static Map<String, Object> cache = new HashMap<>();
    private static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                4,
                10,
                TimeUnit.SECONDS,
                workQueue
        );

        executor.submit(() -> {
            lock.lock();
            try {
                cache.put("key1", "value1");
                System.out.println("Thread " + Thread.currentThread().getName() + " added to cache.");
            } finally {
                lock.unlock();
            }
        });

        executor.submit(() -> {
            lock.lock();
            try {
                Object value = cache.get("key1");
                System.out.println("Thread " + Thread.currentThread().getName() + " retrieved from cache: " + value);
            } finally {
                lock.unlock();
            }
        });

        executor.shutdown();
    }
}

在上述代码中,ReentrantLock用于保护缓存对象cache。不同的任务在对缓存进行读写操作时,通过获取锁来确保缓存操作的线程安全,从而实现了资源的复用与同步控制。

  1. 并发任务的原子性操作 在多线程环境下,线程池中的任务可能需要执行一些原子性操作,即这些操作要么全部执行成功,要么全部不执行。重入锁可以用于实现这种原子性操作。

假设我们有一个线程池,其中的任务需要对一个共享的计数器进行增加和减少操作,并且这两个操作需要作为一个原子操作执行。代码示例如下:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class AtomicOperationInThreadPool {
    private static int counter = 0;
    private static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                4,
                10,
                TimeUnit.SECONDS,
                workQueue
        );

        executor.submit(() -> {
            lock.lock();
            try {
                counter++;
                System.out.println("Thread " + Thread.currentThread().getName() + " incremented counter: " + counter);
                counter--;
                System.out.println("Thread " + Thread.currentThread().getName() + " decremented counter: " + counter);
            } finally {
                lock.unlock();
            }
        });

        executor.submit(() -> {
            lock.lock();
            try {
                counter++;
                System.out.println("Thread " + Thread.currentThread().getName() + " incremented counter: " + counter);
                counter--;
                System.out.println("Thread " + Thread.currentThread().getName() + " decremented counter: " + counter);
            } finally {
                lock.unlock();
            }
        });

        executor.shutdown();
    }
}

在上述代码中,ReentrantLock确保了计数器的增加和减少操作作为一个原子操作执行,避免了多线程环境下可能出现的竞争条件。

  1. 线程池任务调度与锁协同 在一些复杂的应用场景中,线程池中的任务调度需要与锁机制协同工作。例如,某些高优先级任务需要优先执行,并且在执行过程中需要独占某些资源。重入锁可以用于实现这种任务调度与资源控制的协同。

假设我们有一个线程池,其中包含高优先级和低优先级任务。高优先级任务需要在执行时独占一个共享资源。代码示例如下:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class TaskSchedulingWithLock {
    private static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new PriorityBlockingQueue<>((r1, r2) -> {
            if (r1 instanceof HighPriorityTask && r2 instanceof LowPriorityTask) {
                return -1;
            } else if (r1 instanceof LowPriorityTask && r2 instanceof HighPriorityTask) {
                return 1;
            }
            return 0;
        });

        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                4,
                10,
                TimeUnit.SECONDS,
                workQueue
        );

        executor.submit(new HighPriorityTask());
        executor.submit(new LowPriorityTask());
        executor.submit(new HighPriorityTask());

        executor.shutdown();
    }

    static class HighPriorityTask implements Runnable {
        @Override
        public void run() {
            lock.lock();
            try {
                System.out.println("High priority task " + Thread.currentThread().getName() + " started.");
                // 模拟高优先级任务工作
                Thread.sleep(2000);
                System.out.println("High priority task " + Thread.currentThread().getName() + " completed.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }

    static class LowPriorityTask implements Runnable {
        @Override
        public void run() {
            System.out.println("Low priority task " + Thread.currentThread().getName() + " started.");
            // 模拟低优先级任务工作
            Thread.sleep(1000);
            System.out.println("Low priority task " + Thread.currentThread().getName() + " completed.");
        }
    }
}

在上述代码中,PriorityBlockingQueue用于根据任务优先级进行任务调度,ReentrantLock确保高优先级任务在执行时独占共享资源,避免低优先级任务干扰。

  1. 多阶段任务的同步与锁管理 在一些应用中,线程池中的任务可能分为多个阶段,不同阶段之间需要进行同步。重入锁可以用于管理多阶段任务的同步过程,确保每个阶段按顺序执行。

假设我们有一个线程池,其中的任务分为初始化阶段、处理阶段和清理阶段。每个任务在进入处理阶段前需要等待初始化阶段完成,在进入清理阶段前需要等待处理阶段完成。代码示例如下:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Condition;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class MultiStageTaskSynchronization {
    private static ReentrantLock lock = new ReentrantLock();
    private static Condition initCondition = lock.newCondition();
    private static Condition processCondition = lock.newCondition();
    private static boolean initialized = false;
    private static boolean processed = false;

    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                4,
                10,
                TimeUnit.SECONDS,
                workQueue
        );

        executor.submit(() -> {
            lock.lock();
            try {
                System.out.println("Initialization stage started.");
                // 模拟初始化工作
                Thread.sleep(2000);
                initialized = true;
                System.out.println("Initialization stage completed.");
                initCondition.signalAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        });

        executor.submit(() -> {
            lock.lock();
            try {
                while (!initialized) {
                    initCondition.await();
                }
                System.out.println("Processing stage started.");
                // 模拟处理工作
                Thread.sleep(2000);
                processed = true;
                System.out.println("Processing stage completed.");
                processCondition.signalAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        });

        executor.submit(() -> {
            lock.lock();
            try {
                while (!processed) {
                    processCondition.await();
                }
                System.out.println("Cleanup stage started.");
                // 模拟清理工作
                Thread.sleep(1000);
                System.out.println("Cleanup stage completed.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        });

        executor.shutdown();
    }
}

在上述代码中,ReentrantLockCondition结合使用,实现了多阶段任务之间的同步,确保任务按顺序执行。

通过以上多种使用场景的介绍和代码示例,我们可以看到Java ThreadPoolExecutor与重入锁结合在多线程编程中能够解决很多复杂的问题,有效地提高程序的性能、可靠性和可维护性。在实际应用中,开发者需要根据具体的需求和场景,合理选择和使用这些工具,以实现高效的并发编程。