MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 线程池线程工厂的作用与定制

2021-04-023.1k 阅读

Java 线程池线程工厂的作用

在Java的多线程编程中,线程池是一种重要的工具,它可以管理和复用线程,提高应用程序的性能和资源利用率。而线程工厂(ThreadFactory)在这个过程中扮演着不可或缺的角色。

线程工厂的主要作用是创建线程。当线程池需要新的线程来执行任务时,它并不会直接使用 new Thread() 这样的方式来创建线程,而是通过线程工厂来创建。这使得线程的创建过程变得可定制化。通过自定义线程工厂,我们可以对新创建线程的属性进行统一的设置,例如线程名、线程优先级、是否为守护线程等。

假设我们有一个简单的线程池应用场景,需要执行一些简单的任务。如果没有线程工厂,我们创建线程的方式可能如下:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class WithoutThreadFactory {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 5; i++) {
            executorService.submit(() -> {
                System.out.println(Thread.currentThread().getName() + " is running.");
            });
        }
        executorService.shutdown();
    }
}

在上述代码中,Executors.newFixedThreadPool(3) 创建了一个固定大小为3的线程池。当提交任务时,线程池会使用默认的方式创建线程,这些线程的名字通常是类似 pool-1-thread-1 这样的默认命名,缺乏业务可读性。

线程工厂的默认实现

Java中线程池相关的类(如 ThreadPoolExecutor)默认使用 Executors.defaultThreadFactory() 作为线程工厂。Executors 类提供了这个静态方法来获取默认的线程工厂。

下面我们来看一下 defaultThreadFactory 的大致实现逻辑(简化版):

public static ThreadFactory defaultThreadFactory() {
    return new ThreadFactory() {
        private final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        {
            SecurityManager s = System.getSecurityManager();
            group = (s != null)? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    };
}

从上述代码可以看出,默认的线程工厂会创建属于当前线程组的线程,线程名按照 pool-<pool number>-thread-<thread number> 的格式生成,并且会将线程的守护属性设置为非守护,优先级设置为普通优先级。

定制线程工厂的需求场景

  1. 线程命名规范:在大型项目中,为了便于调试和监控,我们可能希望线程的命名遵循一定的业务规则。例如,在一个电商系统中,处理订单相关任务的线程可以命名为 order-task-thread-1,这样在日志或者监控工具中能够快速定位到线程所属的业务模块。
  2. 线程优先级设置:对于一些关键任务,如数据备份、系统监控等,我们可能希望将执行这些任务的线程设置为较高的优先级,以确保它们能够在系统资源有限的情况下优先执行。
  3. 守护线程设置:某些情况下,我们可能希望线程池中的线程为守护线程。守护线程的特点是当所有非守护线程结束时,守护线程会自动结束。例如,一些后台的日志记录线程、资源清理线程等,我们可以将它们设置为守护线程,这样当主线程和其他重要业务线程结束时,这些辅助线程也会随之结束,不会导致程序无法正常退出。

定制线程工厂的实现

  1. 简单定制线程名 我们可以通过实现 ThreadFactory 接口来创建一个简单的定制线程工厂,只对线程名进行定制。
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class CustomThreadFactory implements ThreadFactory {
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String prefix;

    public CustomThreadFactory(String prefix) {
        this.prefix = prefix;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r, prefix + "-thread-" + threadNumber.getAndIncrement());
        System.out.println("Created thread: " + thread.getName());
        return thread;
    }
}

在上述代码中,CustomThreadFactory 类实现了 ThreadFactory 接口。构造函数接受一个前缀参数 prefix,在 newThread 方法中,使用这个前缀和一个自增的序号来为新创建的线程命名。

我们可以使用这个定制的线程工厂来创建线程池,如下所示:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class CustomThreadFactoryUsage {
    public static void main(String[] args) {
        ThreadFactory customThreadFactory = new CustomThreadFactory("my-task");
        ExecutorService executorService = Executors.newFixedThreadPool(3, customThreadFactory);
        for (int i = 0; i < 5; i++) {
            executorService.submit(() -> {
                System.out.println(Thread.currentThread().getName() + " is running.");
            });
        }
        executorService.shutdown();
    }
}

CustomThreadFactoryUsage 类中,我们创建了一个 CustomThreadFactory 实例,并将其传递给 Executors.newFixedThreadPool 方法来创建线程池。运行该程序,我们会看到线程名按照我们定制的格式输出。

  1. 定制线程优先级和守护线程属性 除了线程名,我们还可以进一步定制线程的优先级和守护线程属性。
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class AdvancedCustomThreadFactory implements ThreadFactory {
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String prefix;
    private final int priority;
    private final boolean isDaemon;

    public AdvancedCustomThreadFactory(String prefix, int priority, boolean isDaemon) {
        this.prefix = prefix;
        this.priority = priority;
        this.isDaemon = isDaemon;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r, prefix + "-thread-" + threadNumber.getAndIncrement());
        thread.setPriority(priority);
        thread.setDaemon(isDaemon);
        System.out.println("Created thread: " + thread.getName() + ", priority: " + thread.getPriority() + ", isDaemon: " + thread.isDaemon());
        return thread;
    }
}

在上述代码中,AdvancedCustomThreadFactory 类的构造函数接受前缀、优先级和守护线程标志作为参数。在 newThread 方法中,不仅设置了线程名,还设置了线程的优先级和守护线程属性。

使用这个高级定制线程工厂的示例如下:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class AdvancedCustomThreadFactoryUsage {
    public static void main(String[] args) {
        ThreadFactory advancedCustomThreadFactory = new AdvancedCustomThreadFactory("critical-task", Thread.MAX_PRIORITY, true);
        ExecutorService executorService = Executors.newFixedThreadPool(3, advancedCustomThreadFactory);
        for (int i = 0; i < 5; i++) {
            executorService.submit(() -> {
                System.out.println(Thread.currentThread().getName() + " is running.");
            });
        }
        executorService.shutdown();
    }
}

AdvancedCustomThreadFactoryUsage 类中,我们创建了一个 AdvancedCustomThreadFactory 实例,设置线程前缀为 critical-task,优先级为最高优先级,并且设置为守护线程。运行该程序,我们可以看到线程按照我们定制的属性创建并运行。

在线程池构造函数中使用定制线程工厂

除了通过 Executors 工具类来创建线程池并使用定制线程工厂外,我们还可以直接通过 ThreadPoolExecutor 的构造函数来使用定制线程工厂。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutorWithCustomFactory {
    public static void main(String[] args) {
        ThreadFactory customThreadFactory = new CustomThreadFactory("direct-pool");
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(10);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                5,
                10,
                TimeUnit.SECONDS,
                workQueue,
                customThreadFactory
        );
        for (int i = 0; i < 15; i++) {
            executor.submit(() -> {
                System.out.println(Thread.currentThread().getName() + " is running.");
            });
        }
        executor.shutdown();
    }
}

在上述代码中,我们直接创建了一个 ThreadPoolExecutor 实例,在构造函数中传入了定制的线程工厂 customThreadFactory。这样创建的线程池中的线程将由我们定制的线程工厂来创建。

结合线程上下文传递信息

在多线程编程中,线程上下文(Thread Context)是一个重要的概念。有时候,我们可能希望在创建线程时,将一些上下文信息传递给新创建的线程。通过定制线程工厂,我们可以实现这一点。

例如,假设我们有一个Web应用,每个请求都需要一个唯一的请求ID,我们希望在处理这个请求的所有线程中都能够访问到这个请求ID。我们可以通过定制线程工厂来实现。

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ThreadFactory;

public class ContextAwareThreadFactory implements ThreadFactory {
    private final ThreadFactory defaultFactory;
    private final Map<Thread, Integer> requestIdMap = new HashMap<>();

    public ContextAwareThreadFactory(ThreadFactory defaultFactory) {
        this.defaultFactory = defaultFactory;
    }

    public void setRequestId(Thread thread, int requestId) {
        requestIdMap.put(thread, requestId);
    }

    public int getRequestId(Thread thread) {
        return requestIdMap.getOrDefault(thread, -1);
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread thread = defaultFactory.newThread(() -> {
            int requestId = getRequestId(Thread.currentThread());
            System.out.println("Thread " + Thread.currentThread().getName() + " with requestId: " + requestId + " is running.");
            r.run();
        });
        return thread;
    }
}

在上述代码中,ContextAwareThreadFactory 类接受一个默认的线程工厂作为参数,并维护一个 Map 来存储线程和对应的请求ID。在 newThread 方法中,它对传入的 Runnable 进行了包装,在运行 Runnable 之前,先获取请求ID并打印。

使用示例如下:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class ContextAwareThreadFactoryUsage {
    public static void main(String[] args) {
        ThreadFactory defaultFactory = Executors.defaultThreadFactory();
        ContextAwareThreadFactory contextAwareFactory = new ContextAwareThreadFactory(defaultFactory);
        ExecutorService executorService = Executors.newFixedThreadPool(3, contextAwareFactory);

        int requestId = 12345;
        for (int i = 0; i < 5; i++) {
            Thread thread = executorService.submit(() -> {
                // 模拟业务逻辑
            }).getThread();
            contextAwareFactory.setRequestId(thread, requestId);
        }
        executorService.shutdown();
    }
}

ContextAwareThreadFactoryUsage 类中,我们先创建了默认的线程工厂,然后将其传递给 ContextAwareThreadFactory 来创建一个上下文感知的线程工厂。在提交任务后,我们获取线程并设置请求ID,这样在线程执行时就能够访问到这个上下文信息。

异常处理与线程工厂

在多线程编程中,线程执行过程中可能会抛出异常。如果没有适当的处理,这些异常可能会导致线程终止,影响整个应用程序的稳定性。通过定制线程工厂,我们可以对新创建线程的异常处理机制进行定制。

Java中的线程提供了 setUncaughtExceptionHandler 方法来设置未捕获异常处理器。我们可以在定制线程工厂的 newThread 方法中为线程设置这个处理器。

import java.util.concurrent.ThreadFactory;

public class ExceptionHandlingThreadFactory implements ThreadFactory {
    private final ThreadFactory defaultFactory;

    public ExceptionHandlingThreadFactory(ThreadFactory defaultFactory) {
        this.defaultFactory = defaultFactory;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread thread = defaultFactory.newThread(r);
        thread.setUncaughtExceptionHandler((t, e) -> {
            System.err.println("Thread " + t.getName() + " threw an exception: " + e.getMessage());
            e.printStackTrace();
        });
        return thread;
    }
}

在上述代码中,ExceptionHandlingThreadFactory 类接受一个默认的线程工厂作为参数。在 newThread 方法中,它先通过默认工厂创建线程,然后为该线程设置了一个未捕获异常处理器,当线程抛出未捕获异常时,会打印线程名和异常信息。

使用示例如下:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class ExceptionHandlingThreadFactoryUsage {
    public static void main(String[] args) {
        ThreadFactory defaultFactory = Executors.defaultThreadFactory();
        ThreadFactory exceptionHandlingFactory = new ExceptionHandlingThreadFactory(defaultFactory);
        ExecutorService executorService = Executors.newFixedThreadPool(3, exceptionHandlingFactory);

        executorService.submit(() -> {
            throw new RuntimeException("Simulated exception");
        });
        executorService.shutdown();
    }
}

ExceptionHandlingThreadFactoryUsage 类中,我们创建了一个包含异常处理的线程工厂,并使用它来创建线程池。当提交的任务抛出异常时,会触发我们设置的未捕获异常处理器,打印出异常信息。

线程工厂与线程池的生命周期管理

线程工厂虽然主要负责线程的创建,但它与线程池的生命周期管理也存在一定的关联。例如,当线程池需要扩容时,会通过线程工厂创建新的线程。而当线程池进行缩容时,那些由线程工厂创建的线程可能会被销毁。

了解这种关联有助于我们更好地设计和管理线程池。比如,我们可以在定制线程工厂中添加一些统计信息,记录创建了多少线程,这些线程的存活时间等,从而更好地监控线程池的运行状态。

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class LifecycleAwareThreadFactory implements ThreadFactory {
    private final AtomicInteger threadCount = new AtomicInteger(0);
    private final AtomicInteger activeThreadCount = new AtomicInteger(0);
    private final ThreadFactory defaultFactory;

    public LifecycleAwareThreadFactory(ThreadFactory defaultFactory) {
        this.defaultFactory = defaultFactory;
    }

    public int getThreadCount() {
        return threadCount.get();
    }

    public int getActiveThreadCount() {
        return activeThreadCount.get();
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread thread = defaultFactory.newThread(() -> {
            activeThreadCount.incrementAndGet();
            try {
                r.run();
            } finally {
                activeThreadCount.decrementAndGet();
            }
        });
        threadCount.incrementAndGet();
        return thread;
    }
}

在上述代码中,LifecycleAwareThreadFactory 类维护了两个计数器,一个用于记录总共创建的线程数,另一个用于记录当前活动的线程数。在 newThread 方法中,每当创建一个新线程时,总线程数计数器增加;在线程执行任务前后,分别增加和减少活动线程数计数器。

使用示例如下:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

public class LifecycleAwareThreadFactoryUsage {
    public static void main(String[] args) throws InterruptedException {
        ThreadFactory defaultFactory = Executors.defaultThreadFactory();
        LifecycleAwareThreadFactory lifecycleAwareFactory = new LifecycleAwareThreadFactory(defaultFactory);
        ExecutorService executorService = Executors.newFixedThreadPool(3, lifecycleAwareFactory);

        for (int i = 0; i < 5; i++) {
            executorService.submit(() -> {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        TimeUnit.SECONDS.sleep(2);
        System.out.println("Total threads created: " + lifecycleAwareFactory.getThreadCount());
        System.out.println("Active threads: " + lifecycleAwareFactory.getActiveThreadCount());

        executorService.shutdown();
    }
}

LifecycleAwareThreadFactoryUsage 类中,我们创建了一个生命周期感知的线程工厂,并使用它来创建线程池。在提交任务并等待一段时间后,打印出总共创建的线程数和当前活动的线程数,以便更好地了解线程池的运行状态。

通过以上对Java线程池线程工厂的深入探讨,我们了解了其作用、默认实现以及各种定制场景和实现方式。合理使用和定制线程工厂能够让我们的多线程应用更加健壮、易于维护和监控。无论是简单的线程命名规范,还是复杂的线程上下文传递和异常处理,线程工厂都为我们提供了强大的定制能力。在实际的项目开发中,我们应该根据具体的业务需求,灵活运用线程工厂来优化多线程应用的性能和稳定性。