MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 重写线程池的 afterExecute 方法

2023-07-284.0k 阅读

Java 线程池的基本概念

在深入探讨重写 afterExecute 方法之前,我们先来回顾一下 Java 线程池的基本概念。线程池是一种管理和复用线程的机制,它避免了频繁创建和销毁线程带来的开销,从而提高系统性能和资源利用率。

线程池的创建与核心组件

在 Java 中,线程池主要通过 ThreadPoolExecutor 类来实现。ThreadPoolExecutor 有几个关键参数:

  1. 核心线程数(corePoolSize):线程池中保持活动的最小线程数,即使这些线程处于空闲状态,也不会被销毁。
  2. 最大线程数(maximumPoolSize):线程池中允许存在的最大线程数。当任务队列已满且活动线程数小于最大线程数时,会创建新的线程来处理任务。
  3. 存活时间(keepAliveTime):当线程数大于核心线程数时,多余的空闲线程在终止前等待新任务的最长时间。
  4. 任务队列(workQueue):用于存放等待执行的任务。常见的任务队列有 ArrayBlockingQueueLinkedBlockingQueue 等。

以下是一个简单创建线程池的示例代码:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExample {
    public static void main(String[] args) {
        // 创建任务队列
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(10);
        // 创建线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                4,
                10,
                TimeUnit.SECONDS,
                workQueue
        );

        // 提交任务
        for (int i = 0; i < 15; i++) {
            int taskNumber = i;
            executor.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        // 关闭线程池
        executor.shutdown();
    }
}

在上述代码中,我们创建了一个核心线程数为 2,最大线程数为 4,任务队列容量为 10 的线程池。然后提交了 15 个任务,这些任务会根据线程池的规则依次执行。

线程池的工作流程

当一个任务提交到线程池时,线程池会按照以下流程处理:

  1. 如果当前活动线程数小于核心线程数,会创建新的线程来执行任务。
  2. 如果当前活动线程数等于核心线程数,任务会被放入任务队列中等待执行。
  3. 如果任务队列已满,且当前活动线程数小于最大线程数,会创建新的线程来执行任务。
  4. 如果任务队列已满,且当前活动线程数达到最大线程数,线程池会根据饱和策略来处理任务。常见的饱和策略有 AbortPolicy(抛出异常)、CallerRunsPolicy(在调用者线程中执行任务)、DiscardPolicy(丢弃任务)和 DiscardOldestPolicy(丢弃队列中最老的任务)。

afterExecute 方法的作用与默认实现

ThreadPoolExecutor 类提供了 afterExecute 方法,该方法在任务执行完成后被调用。它主要用于在任务执行结束后进行一些清理工作、记录日志、统计任务执行时间等操作。

afterExecute 方法的签名

afterExecute 方法的签名如下:

protected void afterExecute(Runnable r, Throwable t) { }
  • 参数 r:表示执行完成的任务。
  • 参数 t:表示任务执行过程中抛出的异常,如果任务正常完成,该参数为 null

默认实现

ThreadPoolExecutor 类中,afterExecute 方法的默认实现为空:

protected void afterExecute(Runnable r, Throwable t) { }

这意味着如果我们不重写该方法,任务执行完成后不会有任何额外的操作。

重写 afterExecute 方法的场景与需求

在实际应用中,我们经常需要在任务执行完成后进行一些特定的操作。以下是一些常见的场景:

  1. 日志记录:记录任务的执行结果、执行时间等信息,以便于排查问题和分析系统性能。
  2. 资源清理:如果任务在执行过程中占用了一些资源(如文件句柄、数据库连接等),在任务执行完成后需要及时释放这些资源。
  3. 任务统计:统计任务的执行次数、成功率、失败率等指标,以便于对系统的整体运行状况进行监控和分析。

日志记录场景示例

假设我们有一个电商系统,订单处理任务在执行完成后,我们需要记录订单的处理结果和处理时间。我们可以通过重写 afterExecute 方法来实现这一需求。

资源清理场景示例

在一个文件处理系统中,每个任务负责读取和处理一个文件。在任务执行完成后,需要关闭文件句柄以释放资源。同样,我们可以利用重写 afterExecute 方法来完成这一操作。

任务统计场景示例

在一个分布式计算系统中,我们需要统计每个任务的执行时间、成功次数和失败次数,以便于评估任务的性能和稳定性。重写 afterExecute 方法可以方便地实现这些统计功能。

重写 afterExecute 方法的实现步骤

继承 ThreadPoolExecutor

要重写 afterExecute 方法,我们需要创建一个继承自 ThreadPoolExecutor 的子类。例如:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
    public CustomThreadPoolExecutor(int corePoolSize,
                                    int maximumPoolSize,
                                    long keepAliveTime,
                                    TimeUnit unit,
                                    BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
}

在上述代码中,我们创建了一个 CustomThreadPoolExecutor 类,继承自 ThreadPoolExecutor,并在构造函数中调用了父类的构造函数。

重写 afterExecute 方法

在子类中,我们可以重写 afterExecute 方法来实现我们的自定义逻辑。以下是一个简单的示例,用于记录任务的执行时间和执行结果:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
    public CustomThreadPoolExecutor(int corePoolSize,
                                    int maximumPoolSize,
                                    long keepAliveTime,
                                    TimeUnit unit,
                                    BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        long endTime = System.currentTimeMillis();
        if (t == null) {
            System.out.println("Task " + r + " completed successfully. Execution time: " + (endTime - startTime.get(r)) + " ms");
        } else {
            System.out.println("Task " + r + " failed. Execution time: " + (endTime - startTime.get(r)) + " ms");
            t.printStackTrace();
        }
        startTime.remove(r);
    }

    private final ThreadLocal<Long> startTime = new ThreadLocal<>();

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        startTime.set(System.currentTimeMillis());
    }
}

在上述代码中:

  1. 我们重写了 afterExecute 方法,在方法中获取任务执行结束的时间,并根据 Throwable 参数判断任务是否执行成功。如果任务成功,打印成功信息和执行时间;如果任务失败,打印失败信息、执行时间并打印异常堆栈信息。
  2. 我们还定义了一个 ThreadLocal<Long> 类型的变量 startTime,用于存储每个任务的开始时间。在 beforeExecute 方法中,我们记录任务的开始时间。

使用自定义线程池

接下来,我们可以使用自定义的线程池来提交任务,示例代码如下:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class CustomThreadPoolUsage {
    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(10);
        CustomThreadPoolExecutor executor = new CustomThreadPoolExecutor(
                2,
                4,
                10,
                TimeUnit.SECONDS,
                workQueue
        );

        for (int i = 0; i < 5; i++) {
            int taskNumber = i;
            executor.submit(() -> {
                if (taskNumber % 2 == 0) {
                    System.out.println("Task " + taskNumber + " is running and will succeed.");
                } else {
                    throw new RuntimeException("Task " + taskNumber + " is running and will fail.");
                }
            });
        }

        executor.shutdown();
    }
}

在上述代码中,我们创建了一个 CustomThreadPoolExecutor 实例,并提交了 5 个任务。其中,偶数编号的任务会正常执行,奇数编号的任务会抛出异常。运行该代码后,我们可以在控制台看到任务的执行结果和执行时间的记录。

重写 afterExecute 方法的注意事项

异常处理

在重写 afterExecute 方法时,要注意处理可能抛出的异常。由于 afterExecute 方法是在任务执行完成后调用,如果在该方法中抛出异常,可能会影响线程池的正常运行。因此,建议在 afterExecute 方法中进行异常捕获和处理,避免异常向上传播。 例如:

@Override
protected void afterExecute(Runnable r, Throwable t) {
    try {
        // 自定义逻辑
        long endTime = System.currentTimeMillis();
        if (t == null) {
            System.out.println("Task " + r + " completed successfully. Execution time: " + (endTime - startTime.get(r)) + " ms");
        } else {
            System.out.println("Task " + r + " failed. Execution time: " + (endTime - startTime.get(r)) + " ms");
            t.printStackTrace();
        }
        startTime.remove(r);
    } catch (Exception e) {
        System.err.println("Error in afterExecute method: " + e.getMessage());
        e.printStackTrace();
    }
}

线程安全

由于 afterExecute 方法可能会被多个线程同时调用,因此在实现自定义逻辑时要注意线程安全。例如,如果在 afterExecute 方法中访问和修改共享资源,需要使用适当的同步机制(如 synchronized 关键字、ReentrantLock 等)来保证数据的一致性。 假设我们要统计任务的成功次数和失败次数,示例代码如下:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
    private int successCount = 0;
    private int failureCount = 0;
    private final ReentrantLock lock = new ReentrantLock();

    public CustomThreadPoolExecutor(int corePoolSize,
                                    int maximumPoolSize,
                                    long keepAliveTime,
                                    TimeUnit unit,
                                    BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        lock.lock();
        try {
            if (t == null) {
                successCount++;
            } else {
                failureCount++;
            }
            System.out.println("Success count: " + successCount + ", Failure count: " + failureCount);
        } finally {
            lock.unlock();
        }
    }
}

在上述代码中,我们使用 ReentrantLock 来保证 successCountfailureCount 的线程安全。

对性能的影响

虽然重写 afterExecute 方法可以实现很多有用的功能,但也要注意其对性能的影响。如果在 afterExecute 方法中执行了大量复杂的操作,可能会增加任务执行的总时间,降低线程池的整体性能。因此,在实现自定义逻辑时,要尽量保证操作的简洁性和高效性。

结合实际项目的案例分析

电商订单处理系统

在一个电商订单处理系统中,订单处理任务可能涉及到库存扣减、支付处理、物流信息更新等多个操作。每个订单处理任务执行完成后,我们需要记录订单的处理结果、处理时间以及相关的业务数据(如订单金额、商品数量等)。

  1. 定义订单任务类
public class OrderTask implements Runnable {
    private final long orderId;
    private final double orderAmount;
    private final int productQuantity;

    public OrderTask(long orderId, double orderAmount, int productQuantity) {
        this.orderId = orderId;
        this.orderAmount = orderAmount;
        this.productQuantity = productQuantity;
    }

    @Override
    public void run() {
        // 模拟订单处理逻辑
        try {
            Thread.sleep(2000);
            System.out.println("Order " + orderId + " processed successfully.");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
  1. 自定义线程池并重写 afterExecute 方法
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class OrderProcessingThreadPool extends ThreadPoolExecutor {
    public OrderProcessingThreadPool(int corePoolSize,
                                     int maximumPoolSize,
                                     long keepAliveTime,
                                     TimeUnit unit,
                                     BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        if (r instanceof OrderTask) {
            OrderTask orderTask = (OrderTask) r;
            long endTime = System.currentTimeMillis();
            if (t == null) {
                System.out.println("Order " + orderTask.orderId + " processed successfully. Execution time: " + (endTime - startTime.get(orderTask)) + " ms");
                System.out.println("Order amount: " + orderTask.orderAmount + ", Product quantity: " + orderTask.productQuantity);
            } else {
                System.out.println("Order " + orderTask.orderId + " processing failed. Execution time: " + (endTime - startTime.get(orderTask)) + " ms");
                t.printStackTrace();
            }
            startTime.remove(orderTask);
        }
    }

    private final ThreadLocal<Long> startTime = new ThreadLocal<>();

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        startTime.set(System.currentTimeMillis());
    }
}
  1. 使用自定义线程池处理订单任务
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class OrderProcessingSystem {
    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(10);
        OrderProcessingThreadPool executor = new OrderProcessingThreadPool(
                2,
                4,
                10,
                TimeUnit.SECONDS,
                workQueue
        );

        for (int i = 0; i < 5; i++) {
            long orderId = 1000 + i;
            double orderAmount = 100.0 + i * 10;
            int productQuantity = 1 + i;
            executor.submit(new OrderTask(orderId, orderAmount, productQuantity));
        }

        executor.shutdown();
    }
}

通过上述代码,我们可以在订单处理任务执行完成后,详细记录订单的处理情况和相关业务数据,方便后续的业务分析和问题排查。

分布式数据处理系统

在一个分布式数据处理系统中,每个任务负责处理一部分数据,如数据清洗、数据分析等。任务执行完成后,我们需要统计每个任务处理的数据量、处理时间以及任务的成功率,以便于评估系统的整体性能和稳定性。

  1. 定义数据处理任务类
public class DataProcessingTask implements Runnable {
    private final int dataSize;

    public DataProcessingTask(int dataSize) {
        this.dataSize = dataSize;
    }

    @Override
    public void run() {
        // 模拟数据处理逻辑
        try {
            Thread.sleep(1000);
            if (Math.random() < 0.8) {
                System.out.println("Data processing task with data size " + dataSize + " completed successfully.");
            } else {
                throw new RuntimeException("Data processing task with data size " + dataSize + " failed.");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
  1. 自定义线程池并重写 afterExecute 方法
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class DataProcessingThreadPool extends ThreadPoolExecutor {
    private int totalDataProcessed = 0;
    private int successCount = 0;
    private int failureCount = 0;
    private final ReentrantLock lock = new ReentrantLock();

    public DataProcessingThreadPool(int corePoolSize,
                                    int maximumPoolSize,
                                    long keepAliveTime,
                                    TimeUnit unit,
                                    BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        if (r instanceof DataProcessingTask) {
            DataProcessingTask dataTask = (DataProcessingTask) r;
            long endTime = System.currentTimeMillis();
            lock.lock();
            try {
                totalDataProcessed += dataTask.dataSize;
                if (t == null) {
                    successCount++;
                } else {
                    failureCount++;
                }
                System.out.println("Data size processed: " + dataTask.dataSize + ", Execution time: " + (endTime - startTime.get(dataTask)) + " ms");
                System.out.println("Total data processed: " + totalDataProcessed + ", Success count: " + successCount + ", Failure count: " + failureCount);
            } finally {
                lock.unlock();
            }
            startTime.remove(dataTask);
        }
    }

    private final ThreadLocal<Long> startTime = new ThreadLocal<>();

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        startTime.set(System.currentTimeMillis());
    }
}
  1. 使用自定义线程池处理数据处理任务
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class DataProcessingSystem {
    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(10);
        DataProcessingThreadPool executor = new DataProcessingThreadPool(
                2,
                4,
                10,
                TimeUnit.SECONDS,
                workQueue
        );

        for (int i = 0; i < 5; i++) {
            int dataSize = 100 + i * 10;
            executor.submit(new DataProcessingTask(dataSize));
        }

        executor.shutdown();
    }
}

通过上述代码,我们可以实时统计分布式数据处理系统中每个任务的处理情况以及系统整体的性能指标,为系统的优化和维护提供有力支持。

与其他线程池扩展点的结合使用

beforeExecute 方法

beforeExecute 方法在任务执行之前被调用,它和 afterExecute 方法可以配合使用。例如,我们可以在 beforeExecute 方法中记录任务开始时间,在 afterExecute 方法中计算任务执行时间。前面的示例中已经展示了这种配合使用的方式。

terminated 方法

terminated 方法在线程池关闭且所有任务都执行完成后被调用。我们可以在这个方法中进行一些全局的清理工作、汇总统计信息等。 例如,在电商订单处理系统中,当所有订单任务执行完成后,我们可以在 terminated 方法中生成一份订单处理报告,包括总的订单处理数量、成功数量、失败数量等信息。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class OrderProcessingThreadPool extends ThreadPoolExecutor {
    private int totalOrders = 0;
    private int successOrders = 0;
    private int failureOrders = 0;

    public OrderProcessingThreadPool(int corePoolSize,
                                     int maximumPoolSize,
                                     long keepAliveTime,
                                     TimeUnit unit,
                                     BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        totalOrders++;
        if (t == null) {
            successOrders++;
        } else {
            failureOrders++;
        }
    }

    @Override
    protected void terminated() {
        System.out.println("All orders processed.");
        System.out.println("Total orders: " + totalOrders);
        System.out.println("Success orders: " + successOrders);
        System.out.println("Failure orders: " + failureOrders);
    }
}

通过结合 afterExecuteterminated 方法,我们可以实现从单个任务到整个线程池生命周期的全面监控和管理。

总结与展望

重写 afterExecute 方法是 Java 线程池扩展的重要手段之一,它为我们在任务执行完成后进行各种自定义操作提供了便利。通过合理地重写 afterExecute 方法,并结合其他线程池扩展点(如 beforeExecuteterminated 等),我们可以构建出功能强大、灵活可定制的线程池,满足不同应用场景的需求。

在未来的开发中,随着系统规模的不断扩大和业务逻辑的日益复杂,对线程池的精细化管理和监控将变得更加重要。重写 afterExecute 方法以及其他线程池扩展机制将继续发挥关键作用,帮助我们打造高性能、高可靠性的应用系统。同时,随着 Java 技术的不断发展,我们也期待线程池相关的 API 和功能能够进一步完善,为开发者提供更多便利和优化空间。