MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 线程池的任务提交

2021-07-057.9k 阅读

Java 线程池的任务提交基础概念

在Java多线程编程中,线程池是一种非常重要的工具,它可以有效地管理和复用线程,提高程序的性能和资源利用率。而任务提交则是线程池使用过程中的关键操作,即将需要执行的任务交给线程池来处理。

线程池的核心是一个线程队列和一组工作线程。当我们提交任务时,任务首先被放入任务队列中,然后工作线程从任务队列中取出任务并执行。这种机制实现了任务的异步执行,减少了线程创建和销毁的开销。

Java提供了java.util.concurrent.Executor框架来支持线程池的使用。其中,ExecutorService接口扩展了Executor接口,提供了更多控制线程池生命周期和任务提交的方法。ThreadPoolExecutor类是ExecutorService接口的一个实现类,它提供了灵活的线程池配置和任务提交策略。

任务提交的基本方法

ExecutorService接口中,定义了两种主要的任务提交方法:execute(Runnable task)submit(Callable<T> task)

  1. execute(Runnable task)方法:该方法用于提交一个Runnable任务。Runnable接口只有一个run()方法,该方法没有返回值。当任务执行完毕后,我们无法获取任务的执行结果。示例代码如下:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExecuteExample {
    public static void main(String[] args) {
        // 创建一个固定大小的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 提交Runnable任务
        executorService.execute(() -> {
            System.out.println("Task executed by thread: " + Thread.currentThread().getName());
        });

        // 关闭线程池
        executorService.shutdown();
    }
}

在上述代码中,我们创建了一个固定大小为2的线程池,并使用execute方法提交了一个Runnable任务。任务在一个线程池中线程上执行,并打印出执行任务的线程名称。

  1. submit(Callable<T> task)方法:该方法用于提交一个Callable任务。Callable接口有一个call()方法,该方法可以返回一个值。submit方法返回一个Future<T>对象,通过这个对象我们可以获取任务的执行结果,还可以取消任务的执行。示例代码如下:
import java.util.concurrent.*;

public class SubmitExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建一个固定大小的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 提交Callable任务
        Future<Integer> future = executorService.submit(() -> {
            System.out.println("Task executed by thread: " + Thread.currentThread().getName());
            return 42;
        });

        try {
            // 获取任务执行结果
            Integer result = future.get();
            System.out.println("Task result: " + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        // 关闭线程池
        executorService.shutdown();
    }
}

在上述代码中,我们提交了一个Callable任务,该任务返回一个整数值42。通过Future对象的get()方法,我们可以获取任务的执行结果。

任务提交与线程池状态

线程池有几种不同的状态,如RUNNINGSHUTDOWNSTOPTIDYINGTERMINATED。任务提交的行为会受到线程池状态的影响。

  1. RUNNING状态:在线程池处于RUNNING状态时,可以正常提交任务。新提交的任务会被放入任务队列或直接分配给空闲线程执行。

  2. SHUTDOWN状态:当调用ExecutorServiceshutdown()方法后,线程池进入SHUTDOWN状态。在这个状态下,不再接受新的任务提交,但会继续执行已提交到任务队列中的任务。示例代码如下:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ShutdownExample {
    public static void main(String[] args) throws InterruptedException {
        // 创建一个固定大小的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 提交任务
        executorService.execute(() -> {
            System.out.println("Task 1 executed by thread: " + Thread.currentThread().getName());
        });

        // 关闭线程池
        executorService.shutdown();

        // 尝试提交新任务,会被拒绝
        executorService.execute(() -> {
            System.out.println("Task 2 (should not execute)");
        });

        // 等待已提交任务执行完毕
        executorService.awaitTermination(5, TimeUnit.SECONDS);
    }
}

在上述代码中,调用shutdown()方法后,再提交新任务会被拒绝,线程池会继续执行已提交的任务,直到所有任务完成或超时。

  1. STOP状态:当调用ExecutorServiceshutdownNow()方法后,线程池进入STOP状态。在这个状态下,不再接受新的任务提交,并且会尝试停止正在执行的任务,清空任务队列。

  2. TIDYINGTERMINATED状态:当所有任务执行完毕,且线程池中的工作线程都已关闭时,线程池进入TIDYING状态,然后很快进入TERMINATED状态。在这两个状态下,无法再提交任务。

任务提交策略

ThreadPoolExecutor类提供了几种任务提交策略,这些策略决定了当任务队列已满且线程池中的线程都在忙碌时,新提交的任务如何处理。

  1. AbortPolicy(默认策略):当任务无法执行时,直接抛出RejectedExecutionException异常。示例代码如下:
import java.util.concurrent.*;

public class AbortPolicyExample {
    public static void main(String[] args) {
        // 创建一个固定大小为2,任务队列容量为1的线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                2,
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1),
                new ThreadPoolExecutor.AbortPolicy());

        // 提交任务
        executor.execute(() -> {
            System.out.println("Task 1 executed by thread: " + Thread.currentThread().getName());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        executor.execute(() -> {
            System.out.println("Task 2 executed by thread: " + Thread.currentThread().getName());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        executor.execute(() -> {
            System.out.println("Task 3 (should throw exception)");
        });

        // 关闭线程池
        executor.shutdown();
    }
}

在上述代码中,由于任务队列已满且线程池中的线程都在忙碌,提交第三个任务时会抛出RejectedExecutionException异常。

  1. CallerRunsPolicy:当任务无法执行时,由提交任务的线程来执行该任务。这样可以降低新任务的提交速度,示例代码如下:
import java.util.concurrent.*;

public class CallerRunsPolicyExample {
    public static void main(String[] args) {
        // 创建一个固定大小为2,任务队列容量为1的线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                2,
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1),
                new ThreadPoolExecutor.CallerRunsPolicy());

        // 提交任务
        executor.execute(() -> {
            System.out.println("Task 1 executed by thread: " + Thread.currentThread().getName());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        executor.execute(() -> {
            System.out.println("Task 2 executed by thread: " + Thread.currentThread().getName());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        executor.execute(() -> {
            System.out.println("Task 3 executed by main thread: " + Thread.currentThread().getName());
        });

        // 关闭线程池
        executor.shutdown();
    }
}

在上述代码中,当任务队列已满且线程池中的线程都在忙碌时,第三个任务由主线程执行。

  1. DiscardPolicy:当任务无法执行时,直接丢弃该任务,不抛出任何异常。示例代码如下:
import java.util.concurrent.*;

public class DiscardPolicyExample {
    public static void main(String[] args) {
        // 创建一个固定大小为2,任务队列容量为1的线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                2,
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1),
                new ThreadPoolExecutor.DiscardPolicy());

        // 提交任务
        executor.execute(() -> {
            System.out.println("Task 1 executed by thread: " + Thread.currentThread().getName());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        executor.execute(() -> {
            System.out.println("Task 2 executed by thread: " + Thread.currentThread().getName());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        executor.execute(() -> {
            System.out.println("Task 3 (should be discarded)");
        });

        // 关闭线程池
        executor.shutdown();
    }
}

在上述代码中,当任务队列已满且线程池中的线程都在忙碌时,第三个任务被直接丢弃,不会执行也不会抛出异常。

  1. DiscardOldestPolicy:当任务无法执行时,丢弃任务队列中最老的任务,然后尝试提交新任务。示例代码如下:
import java.util.concurrent.*;

public class DiscardOldestPolicyExample {
    public static void main(String[] args) {
        // 创建一个固定大小为2,任务队列容量为1的线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                2,
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1),
                new ThreadPoolExecutor.DiscardOldestPolicy());

        // 提交任务
        executor.execute(() -> {
            System.out.println("Task 1 executed by thread: " + Thread.currentThread().getName());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        executor.execute(() -> {
            System.out.println("Task 2 (should be discarded)");
        });
        executor.execute(() -> {
            System.out.println("Task 3 executed by thread: " + Thread.currentThread().getName());
        });

        // 关闭线程池
        executor.shutdown();
    }
}

在上述代码中,当任务队列已满且线程池中的线程都在忙碌时,第二个任务被丢弃,第三个任务被提交执行。

批量任务提交

在实际应用中,我们经常需要提交一批任务并等待所有任务执行完毕。Java提供了invokeAll方法来实现这一功能。invokeAll方法接受一个Collection集合,集合中包含多个Callable任务。该方法会等待所有任务执行完毕,并返回一个包含每个任务执行结果的List<Future<T>>。示例代码如下:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class InvokeAllExample {
    public static void main(String[] args) throws InterruptedException {
        // 创建一个固定大小的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 创建多个Callable任务
        List<Callable<Integer>> tasks = new ArrayList<>();
        tasks.add(() -> {
            System.out.println("Task 1 executed by thread: " + Thread.currentThread().getName());
            return 10;
        });
        tasks.add(() -> {
            System.out.println("Task 2 executed by thread: " + Thread.currentThread().getName());
            return 20;
        });

        // 提交并等待所有任务执行完毕
        try {
            List<Future<Integer>> futures = executorService.invokeAll(tasks);
            for (Future<Integer> future : futures) {
                try {
                    System.out.println("Task result: " + future.get());
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 关闭线程池
        executorService.shutdown();
    }
}

在上述代码中,我们创建了两个Callable任务,并使用invokeAll方法提交这两个任务。线程池会并行执行这些任务,当所有任务执行完毕后,我们可以通过Future对象获取每个任务的执行结果。

异常处理与任务提交

当使用execute方法提交Runnable任务时,如果任务执行过程中抛出异常,默认情况下这个异常会被线程池捕获并打印到标准错误输出,但不会被上层调用者直接捕获。如果我们希望在任务执行出现异常时能够进行自定义处理,可以通过Thread.UncaughtExceptionHandler来实现。示例代码如下:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExceptionHandlingExecuteExample {
    public static void main(String[] args) {
        // 创建一个固定大小的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 设置线程的UncaughtExceptionHandler
        Thread.setDefaultUncaughtExceptionHandler((thread, exception) -> {
            System.out.println("Uncaught exception in thread " + thread.getName() + ": " + exception.getMessage());
        });

        // 提交可能抛出异常的任务
        executorService.execute(() -> {
            throw new RuntimeException("Task execution failed");
        });

        // 关闭线程池
        executorService.shutdown();
    }
}

在上述代码中,我们通过Thread.setDefaultUncaughtExceptionHandler方法设置了全局的未捕获异常处理器,当任务执行抛出异常时,会调用这个处理器进行处理。

当使用submit方法提交Callable任务时,任务执行过程中抛出的异常会被封装在Future对象的get()方法中。我们可以通过try-catch块捕获ExecutionExceptionInterruptedException来处理异常。示例代码如下:

import java.util.concurrent.*;

public class ExceptionHandlingSubmitExample {
    public static void main(String[] args) {
        // 创建一个固定大小的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 提交可能抛出异常的Callable任务
        Future<Integer> future = executorService.submit(() -> {
            throw new RuntimeException("Task execution failed");
        });

        try {
            Integer result = future.get();
        } catch (InterruptedException | ExecutionException e) {
            System.out.println("Exception caught: " + e.getMessage());
        }

        // 关闭线程池
        executorService.shutdown();
    }
}

在上述代码中,通过try-catch块捕获get()方法抛出的异常,从而处理任务执行过程中抛出的异常。

任务提交与性能优化

合理的任务提交策略和线程池配置对于提高程序性能至关重要。

  1. 线程池大小的选择:线程池大小的选择要根据任务的类型(CPU密集型或I/O密集型)来决定。对于CPU密集型任务,线程池大小一般设置为CPU核心数加1,以充分利用CPU资源并避免线程上下文切换的开销。对于I/O密集型任务,由于I/O操作会使线程阻塞,线程池大小可以适当增大,以提高系统的并发处理能力。例如,可以根据经验公式线程数 = CPU核心数 * (1 + 平均I/O等待时间 / 平均CPU计算时间)来估算线程池大小。

  2. 任务队列的选择:任务队列的类型和容量也会影响性能。如果任务执行时间较短且任务数量较多,可以选择无界队列(如LinkedBlockingQueue),这样可以避免任务被拒绝,但可能会导致内存消耗过大。如果任务执行时间较长且希望控制内存使用,可以选择有界队列(如ArrayBlockingQueue),并根据实际情况设置合适的队列容量。

  3. 任务提交频率:如果任务提交频率过高,可能会导致任务队列迅速填满,进而触发任务拒绝策略。因此,需要根据线程池的处理能力合理控制任务提交频率,可以通过流量控制等手段来实现。

  4. 使用合适的线程池类型:除了ThreadPoolExecutor,Java还提供了一些预定义的线程池,如Executors.newFixedThreadPool(固定大小线程池)、Executors.newCachedThreadPool(可缓存线程池)、Executors.newSingleThreadExecutor(单线程线程池)等。不同类型的线程池适用于不同的场景,选择合适的线程池类型可以提高性能。例如,newFixedThreadPool适用于需要控制并发线程数的场景,newCachedThreadPool适用于任务执行时间短且提交频率高的场景,newSingleThreadExecutor适用于需要顺序执行任务的场景。

总结任务提交在Java线程池中的要点

任务提交是Java线程池使用过程中的核心操作,涉及到任务提交方法的选择、线程池状态对任务提交的影响、任务提交策略的应用、批量任务提交、异常处理以及性能优化等多个方面。深入理解这些要点并根据实际应用场景合理配置和使用线程池,可以有效地提高程序的性能和稳定性,充分发挥多线程编程的优势。在实际开发中,需要根据具体的业务需求和系统资源情况,精心调整线程池的参数和任务提交策略,以达到最佳的运行效果。同时,要注意处理任务执行过程中可能出现的异常,确保程序的健壮性。通过合理运用线程池的任务提交机制,可以使Java应用程序在多线程环境下高效、稳定地运行。