MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java Stream 并行流的性能提升

2023-10-287.7k 阅读

Java Stream 并行流的性能提升

并行流基础概念

在 Java 8 引入 Stream API 后,并行流成为了开发者们提升数据处理性能的有力工具。并行流允许开发者以并行方式处理流中的元素,充分利用多核处理器的计算能力。简单来说,当我们将一个普通的顺序流转换为并行流时,流中的元素会被分成多个部分,每个部分由独立的线程进行处理,最后将各个部分的处理结果合并起来。

例如,我们有一个包含整数的列表,想要计算这些整数的总和:

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class ParallelStreamExample {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        // 顺序流计算总和
        int sequentialSum = numbers.stream()
                .mapToInt(Integer::intValue)
                .sum();

        // 并行流计算总和
        int parallelSum = numbers.parallelStream()
                .mapToInt(Integer::intValue)
                .sum();

        System.out.println("顺序流计算总和: " + sequentialSum);
        System.out.println("并行流计算总和: " + parallelSum);
    }
}

在上述代码中,numbers.stream() 创建了一个顺序流,而 numbers.parallelStream() 创建了一个并行流。两者都使用 mapToInt 方法将 Integer 类型转换为 int 类型,最后通过 sum 方法计算总和。从功能上看,顺序流和并行流的计算结果是一样的,但在性能上可能存在差异。

并行流的工作原理

并行流的实现依赖于 Fork/Join 框架。Fork/Join 框架是 Java 7 引入的用于并行执行任务的框架,它采用分治算法。当使用并行流时,流的数据源会被分割成多个子任务,这些子任务被提交到 Fork/Join 线程池中并行执行。

具体来说,并行流的工作过程如下:

  1. 数据分割:流的数据源(如集合)被分割成多个子部分。分割的方式取决于数据源的类型,例如对于 ArrayList,它会按照一定的规则平均分割。
  2. 任务创建:每个子部分对应一个任务,这些任务继承自 RecursiveTask(用于有返回值的任务)或 RecursiveAction(用于无返回值的任务)。任务在执行时会对自己负责的子部分数据进行处理。
  3. 任务执行:任务被提交到 Fork/Join 线程池,线程池中的线程会从任务队列中取出任务并执行。当一个任务执行过程中发现自己的子任务还可以继续分割时,会再次分割并提交新的子任务,这个过程就是“Fork”。当所有子任务执行完毕后,会将结果合并起来,这个过程就是“Join”。

例如,假设有一个包含 100 个元素的列表,并行流可能会将其分割成 4 个子列表,每个子列表由一个独立的线程处理。线程处理完自己的子列表后,将结果返回,最后由主线程将这 4 个结果合并得到最终结果。

并行流性能提升场景分析

  1. 大数据集处理:当处理的数据量非常大时,并行流的性能优势尤为明显。例如,处理包含数百万条记录的数据库查询结果,或者处理大型文本文件中的数据。由于数据量巨大,顺序处理会花费很长时间,而并行流可以将数据分割成多个部分同时处理,大大缩短处理时间。
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;

public class BigDataParallelStream {
    public static void main(String[] args) {
        List<Integer> bigData = new ArrayList<>();
        Random random = new Random();
        for (int i = 0; i < 10000000; i++) {
            bigData.add(random.nextInt(100));
        }

        long startTimeSequential = System.currentTimeMillis();
        int sequentialSum = bigData.stream()
                .mapToInt(Integer::intValue)
                .sum();
        long endTimeSequential = System.currentTimeMillis();
        System.out.println("顺序流计算总和: " + sequentialSum + ",耗时: " + (endTimeSequential - startTimeSequential) + " 毫秒");

        long startTimeParallel = System.currentTimeMillis();
        int parallelSum = bigData.parallelStream()
                .mapToInt(Integer::intValue)
                .sum();
        long endTimeParallel = System.currentTimeMillis();
        System.out.println("并行流计算总和: " + parallelSum + ",耗时: " + (endTimeParallel - startTimeParallel) + " 毫秒");
    }
}

在上述代码中,我们生成了一个包含 1000 万个随机整数的列表。通过对比顺序流和并行流计算总和的时间,会发现并行流在这种大数据集情况下的性能提升非常显著。

  1. 计算密集型操作:如果流中的操作是计算密集型的,例如复杂的数学计算、加密解密等,并行流也能有效提升性能。因为这些操作需要消耗大量的 CPU 时间,并行处理可以充分利用多核处理器的资源。
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;

public class ComputeIntensiveParallelStream {
    public static double complexCalculation(int num) {
        // 模拟复杂计算
        double result = 0;
        for (int i = 0; i < 1000000; i++) {
            result += Math.sin(num * i) * Math.cos(num + i);
        }
        return result;
    }

    public static void main(String[] args) {
        List<Integer> data = new ArrayList<>();
        Random random = new Random();
        for (int i = 0; i < 1000; i++) {
            data.add(random.nextInt(100));
        }

        long startTimeSequential = System.currentTimeMillis();
        List<Double> sequentialResults = data.stream()
                .map(ComputeIntensiveParallelStream::complexCalculation)
                .collect(Collectors.toList());
        long endTimeSequential = System.currentTimeMillis();
        System.out.println("顺序流处理耗时: " + (endTimeSequential - startTimeSequential) + " 毫秒");

        long startTimeParallel = System.currentTimeMillis();
        List<Double> parallelResults = data.parallelStream()
                .map(ComputeIntensiveParallelStream::complexCalculation)
                .collect(Collectors.toList());
        long endTimeParallel = System.currentTimeMillis();
        System.out.println("并行流处理耗时: " + (endTimeParallel - startTimeParallel) + " 毫秒");
    }
}

在这个例子中,complexCalculation 方法模拟了一个复杂的数学计算。通过对比顺序流和并行流处理包含 1000 个元素列表的时间,可以看到并行流在计算密集型操作上的性能优势。

并行流性能的影响因素

  1. 数据源类型:不同的数据源类型对并行流的性能有影响。例如,ArrayList 由于其连续的内存存储结构,在并行处理时分割数据和访问数据都比较高效。而 LinkedList 由于其链式存储结构,在分割数据时可能需要更多的操作,性能相对较差。
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;

public class DataSourceParallelStream {
    public static void main(String[] args) {
        List<Integer> arrayList = new ArrayList<>();
        List<Integer> linkedList = new LinkedList<>();
        Random random = new Random();
        for (int i = 0; i < 1000000; i++) {
            int num = random.nextInt(100);
            arrayList.add(num);
            linkedList.add(num);
        }

        long startTimeArrayListParallel = System.currentTimeMillis();
        int arrayListParallelSum = arrayList.parallelStream()
                .mapToInt(Integer::intValue)
                .sum();
        long endTimeArrayListParallel = System.currentTimeMillis();
        System.out.println("ArrayList 并行流计算总和耗时: " + (endTimeArrayListParallel - startTimeArrayListParallel) + " 毫秒");

        long startTimeLinkedListParallel = System.currentTimeMillis();
        int linkedListParallelSum = linkedList.parallelStream()
                .mapToInt(Integer::intValue)
                .sum();
        long endTimeLinkedListParallel = System.currentTimeMillis();
        System.out.println("LinkedList 并行流计算总和耗时: " + (endTimeLinkedListParallel - startTimeLinkedListParallel) + " 毫秒");
    }
}

运行上述代码会发现,在并行流计算总和时,ArrayList 的耗时通常会比 LinkedList 短。

  1. 操作类型:流中的操作类型也会影响并行流的性能。例如,reducecollect 等操作在并行执行时需要进行结果的合并,合并操作的复杂度会影响整体性能。如果合并操作比较简单,如求和、求最大值等,并行流的性能提升会比较明显;但如果合并操作非常复杂,可能会抵消并行处理带来的优势。
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;

public class OperationTypeParallelStream {
    public static class ComplexObject {
        private int value;

        public ComplexObject(int value) {
            this.value = value;
        }

        public void merge(ComplexObject other) {
            // 模拟复杂的合并操作
            this.value = this.value * other.value + this.value - other.value;
        }
    }

    public static ComplexObject complexReduce(List<ComplexObject> list) {
        return list.parallelStream()
                .reduce(new ComplexObject(1), (a, b) -> {
                    a.merge(b);
                    return a;
                }, (a, b) -> {
                    a.merge(b);
                    return a;
                });
    }

    public static void main(String[] args) {
        List<ComplexObject> data = new ArrayList<>();
        Random random = new Random();
        for (int i = 0; i < 1000; i++) {
            data.add(new ComplexObject(random.nextInt(10)));
        }

        long startTime = System.currentTimeMillis();
        ComplexObject result = complexReduce(data);
        long endTime = System.currentTimeMillis();
        System.out.println("并行流复杂 reduce 操作耗时: " + (endTime - startTime) + " 毫秒");
    }
}

在这个例子中,ComplexObjectmerge 方法模拟了一个复杂的合并操作。通过对包含 1000 个 ComplexObject 的列表进行并行 reduce 操作,可以看到由于合并操作的复杂性,并行流的性能提升可能并不理想。

  1. 线程开销:虽然并行流利用多线程提高了处理速度,但线程的创建、调度和销毁也会带来一定的开销。如果数据量较小或者操作本身比较简单,线程开销可能会超过并行处理带来的性能提升,导致并行流的性能反而不如顺序流。
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;

public class ThreadOverheadParallelStream {
    public static void main(String[] args) {
        List<Integer> smallData = new ArrayList<>();
        Random random = new Random();
        for (int i = 0; i < 10; i++) {
            smallData.add(random.nextInt(10));
        }

        long startTimeSequential = System.currentTimeMillis();
        int sequentialSum = smallData.stream()
                .mapToInt(Integer::intValue)
                .sum();
        long endTimeSequential = System.currentTimeMillis();
        System.out.println("顺序流计算总和: " + sequentialSum + ",耗时: " + (endTimeSequential - startTimeSequential) + " 毫秒");

        long startTimeParallel = System.currentTimeMillis();
        int parallelSum = smallData.parallelStream()
                .mapToInt(Integer::intValue)
                .sum();
        long endTimeParallel = System.currentTimeMillis();
        System.out.println("并行流计算总和: " + parallelSum + ",耗时: " + (endTimeParallel - startTimeParallel) + " 毫秒");
    }
}

在这个包含 10 个元素的小数据集例子中,通常会发现并行流的耗时比顺序流长,这就是因为线程开销的影响。

并行流性能优化策略

  1. 合理选择数据源:尽量使用适合并行处理的数据源,如 ArrayListHashSet 等。如果必须使用 LinkedList 等不适合并行处理的数据源,可以考虑在并行处理前将其转换为适合的数据源类型。
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;

public class DataSourceConversion {
    public static void main(String[] args) {
        LinkedList<Integer> linkedList = new LinkedList<>();
        linkedList.add(1);
        linkedList.add(2);
        linkedList.add(3);

        // 将 LinkedList 转换为 ArrayList 后进行并行处理
        List<Integer> arrayList = linkedList.stream()
                .collect(Collectors.toList());
        int parallelSum = arrayList.parallelStream()
                .mapToInt(Integer::intValue)
                .sum();
        System.out.println("转换后并行流计算总和: " + parallelSum);
    }
}
  1. 优化操作:简化流中的合并操作,避免复杂的计算和状态共享。如果合并操作不可避免地复杂,可以考虑自定义并行算法,以减少并行执行时的性能损耗。
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.RecursiveTask;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class OptimizeOperation {
    public static class ComplexObject {
        private int value;

        public ComplexObject(int value) {
            this.value = value;
        }

        public void merge(ComplexObject other) {
            // 简化合并操作
            this.value += other.value;
        }
    }

    public static class ComplexReduceTask extends RecursiveTask<ComplexObject> {
        private static final int THRESHOLD = 100;
        private List<ComplexObject> data;
        private int start;
        private int end;

        public ComplexReduceTask(List<ComplexObject> data, int start, int end) {
            this.data = data;
            this.start = start;
            this.end = end;
        }

        @Override
        protected ComplexObject compute() {
            if (end - start <= THRESHOLD) {
                ComplexObject result = new ComplexObject(0);
                for (int i = start; i < end; i++) {
                    result.merge(data.get(i));
                }
                return result;
            } else {
                int mid = (start + end) / 2;
                ComplexReduceTask leftTask = new ComplexReduceTask(data, start, mid);
                ComplexReduceTask rightTask = new ComplexReduceTask(data, mid, end);

                leftTask.fork();
                ComplexObject rightResult = rightTask.compute();
                ComplexObject leftResult = leftTask.join();

                leftResult.merge(rightResult);
                return leftResult;
            }
        }
    }

    public static ComplexObject optimizedComplexReduce(List<ComplexObject> list) {
        int size = list.size();
        ComplexReduceTask task = new ComplexReduceTask(list, 0, size);
        return task.compute();
    }

    public static void main(String[] args) {
        List<ComplexObject> data = new ArrayList<>();
        Random random = new Random();
        for (int i = 0; i < 1000; i++) {
            data.add(new ComplexObject(random.nextInt(10)));
        }

        long startTime = System.currentTimeMillis();
        ComplexObject result = optimizedComplexReduce(data);
        long endTime = System.currentTimeMillis();
        System.out.println("优化后并行处理耗时: " + (endTime - startTime) + " 毫秒");
    }
}

在这个例子中,我们首先简化了 ComplexObjectmerge 操作,然后通过自定义 RecursiveTask 实现了一个优化的并行 reduce 操作,提高了性能。

  1. 控制并行度:可以通过设置系统属性 java.util.concurrent.ForkJoinPool.common.parallelism 来控制并行流的并行度,即同时执行的线程数。合理设置并行度可以避免线程过多导致的资源竞争和性能下降。
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;

public class ControlParallelism {
    public static void main(String[] args) {
        // 设置并行度为 4
        System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4");

        List<Integer> data = new ArrayList<>();
        Random random = new Random();
        for (int i = 0; i < 1000000; i++) {
            data.add(random.nextInt(100));
        }

        long startTime = System.currentTimeMillis();
        int parallelSum = data.parallelStream()
                .mapToInt(Integer::intValue)
                .sum();
        long endTime = System.currentTimeMillis();
        System.out.println("设置并行度后并行流计算总和耗时: " + (endTime - startTime) + " 毫秒");
    }
}

在这个例子中,我们将并行度设置为 4,通过对比不同并行度下并行流的处理时间,可以找到一个适合当前任务和硬件环境的并行度,从而优化性能。

  1. 避免状态共享:在并行流处理过程中,尽量避免共享可变状态。因为多个线程同时访问和修改共享状态可能会导致数据竞争和不一致问题,同时也会降低并行流的性能。如果需要共享状态,可以考虑使用线程安全的数据结构,如 ConcurrentHashMap 等。
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

public class AvoidStateSharing {
    public static void main(String[] args) {
        List<Integer> data = new ArrayList<>();
        Random random = new Random();
        for (int i = 0; i < 1000; i++) {
            data.add(random.nextInt(10));
        }

        // 错误示例:共享可变状态
        Map<Integer, Integer> countMap = new HashMap<>();
        data.parallelStream()
                .forEach(num -> {
                    if (countMap.containsKey(num)) {
                        countMap.put(num, countMap.get(num) + 1);
                    } else {
                        countMap.put(num, 1);
                    }
                });
        System.out.println("错误的共享状态结果: " + countMap);

        // 正确示例:使用线程安全的数据结构
        Map<Integer, Integer> concurrentCountMap = new ConcurrentHashMap<>();
        data.parallelStream()
                .forEach(num -> concurrentCountMap.merge(num, 1, Integer::sum));
        System.out.println("正确的线程安全结果: " + concurrentCountMap);
    }
}

在上述代码中,第一个 forEach 操作试图在并行流中共享一个普通的 HashMap,这会导致数据竞争和结果不一致。而使用 ConcurrentHashMap 并通过 merge 方法可以安全地在并行流中统计每个数字出现的次数。

通过深入理解并行流的工作原理、性能影响因素,并采用合理的优化策略,开发者可以在 Java 编程中充分发挥并行流的性能优势,提高数据处理的效率。无论是大数据集处理还是计算密集型任务,并行流都为我们提供了一种高效的解决方案,但在使用过程中需要谨慎考虑各种因素,以确保获得最佳的性能提升。