MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java Stream 并行流的多 CPU 利用策略

2023-11-021.2k 阅读

Java Stream 并行流的基本概念

在 Java 8 引入 Stream API 后,开发者能够以一种更简洁、声明式的方式处理集合数据。Stream 可以看作是一系列支持顺序和并行聚合操作的元素。并行流(Parallel Stream)则是 Stream API 中利用多核 CPU 特性的关键,它允许将流操作并行化,从而提升处理大数据集时的性能。

Java 并行流基于 Fork/Join 框架实现。Fork/Join 框架是 Java 7 引入的用于并行执行任务的框架,它的核心思想是将一个大任务分割成多个小任务(fork),然后并行处理这些小任务,最后将结果合并(join)。并行流在内部利用 Fork/Join 框架来并行化流操作。

例如,我们有一个简单的整数列表,希望对其每个元素进行平方操作并求和。使用顺序流:

import java.util.Arrays;
import java.util.List;

public class SequentialStreamExample {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
        int sum = numbers.stream()
                .map(n -> n * n)
                .reduce(0, Integer::sum);
        System.out.println("顺序流结果: " + sum);
    }
}

而使用并行流,只需将 stream() 替换为 parallelStream()

import java.util.Arrays;
import java.util.List;

public class ParallelStreamExample {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
        int sum = numbers.parallelStream()
                .map(n -> n * n)
                .reduce(0, Integer::sum);
        System.out.println("并行流结果: " + sum);
    }
}

虽然在这个简单示例中并行流的优势不明显,但当处理大数据集时,并行流能够显著提升性能。

并行流的多 CPU 利用原理

并行流利用多 CPU 核心的关键在于数据的分割和任务的并行执行。当调用 parallelStream() 时,Java 会将数据源(如集合)分割成多个子部分,每个子部分由一个独立的线程处理。这些线程并行执行流操作,最后将结果合并。

在 Fork/Join 框架中,任务被抽象为 ForkJoinTask。对于并行流操作,ForkJoinTask 会递归地将任务分割成更小的子任务,直到子任务足够小可以直接处理。例如,对于一个包含大量元素的列表,并行流可能会将其分割成多个子列表,每个子列表由一个 ForkJoinTask 处理。

map 操作举例,假设我们有一个包含 1000 个元素的列表并行执行 map 操作。并行流会将列表分割成多个子列表(比如 4 个子列表,假设 CPU 有 4 个核心),每个子列表由一个线程并行处理 map 操作,将子列表中的元素进行映射转换。处理完成后,这些子列表的结果会被合并起来。

影响并行流多 CPU 利用的因素

数据源特性

数据源的可分割性对并行流的性能影响很大。像 ArrayList 这样的随机访问列表,由于其内部结构特点,很容易被分割成多个子部分,非常适合并行流操作。而 LinkedList 由于其链式结构,分割成本较高,并行流操作的性能提升可能不明显。

例如,我们分别对 ArrayListLinkedList 进行并行求和操作:

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

public class ListTypeParallelStreamExample {
    public static void main(String[] args) {
        List<Integer> arrayList = new ArrayList<>();
        List<Integer> linkedList = new LinkedList<>();
        for (int i = 0; i < 1000000; i++) {
            arrayList.add(i);
            linkedList.add(i);
        }

        long startTimeArrayList = System.currentTimeMillis();
        int sumArrayList = arrayList.parallelStream().mapToInt(Integer::intValue).sum();
        long endTimeArrayList = System.currentTimeMillis();

        long startTimeLinkedList = System.currentTimeMillis();
        int sumLinkedList = linkedList.parallelStream().mapToInt(Integer::intValue).sum();
        long endTimeLinkedList = System.currentTimeMillis();

        System.out.println("ArrayList 并行求和时间: " + (endTimeArrayList - startTimeArrayList) + " 毫秒");
        System.out.println("LinkedList 并行求和时间: " + (endTimeLinkedList - startTimeLinkedList) + " 毫秒");
    }
}

运行上述代码,通常会发现 ArrayList 的并行流操作时间比 LinkedList 短很多。

操作类型

流操作分为中间操作(如 mapfilter)和终端操作(如 reduceforEach)。不同类型的操作对并行流的性能影响不同。

中间操作通常是轻量级的,如 map 操作只是对元素进行简单的转换,适合并行化。而终端操作的并行化成本可能较高,尤其是涉及到全局状态的操作。例如 reduce 操作,它需要将各个子任务的结果合并起来,这涉及到额外的同步和合并开销。

filterreduce 操作组合为例:

import java.util.Arrays;
import java.util.List;

public class OperationTypeParallelStreamExample {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        long startTime = System.currentTimeMillis();
        int result = numbers.parallelStream()
                .filter(n -> n % 2 == 0)
                .reduce(0, Integer::sum);
        long endTime = System.currentTimeMillis();

        System.out.println("并行操作结果: " + result);
        System.out.println("操作时间: " + (endTime - startTime) + " 毫秒");
    }
}

在这个例子中,filter 操作可以并行化处理每个元素,而 reduce 操作需要合并各个子任务的结果。

任务粒度

任务粒度指的是并行流中每个子任务处理的数据量大小。如果任务粒度太小,并行化带来的开销(如任务创建、线程调度等)可能会超过并行执行带来的性能提升。相反,如果任务粒度太大,并行性可能无法充分发挥,因为单个任务处理时间过长,其他 CPU 核心可能处于空闲状态。

例如,假设我们有一个包含大量小任务的场景,如对每个字符进行简单的转换操作:

import java.util.ArrayList;
import java.util.List;

public class TaskGranularityParallelStreamExample {
    public static void main(String[] args) {
        List<String> words = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {
            words.add("a" + i);
        }

        long startTime = System.currentTimeMillis();
        words.parallelStream()
                .map(word -> word.replace('a', 'b'))
                .forEach(System.out::println);
        long endTime = System.currentTimeMillis();

        System.out.println("操作时间: " + (endTime - startTime) + " 毫秒");
    }
}

这里每个字符转换任务粒度较小,并行化开销可能较大。可以尝试将多个字符组合成一个较大的任务块来提升性能。

优化并行流多 CPU 利用的策略

选择合适的数据源

在可能的情况下,优先选择适合并行处理的数据源,如 ArrayListHashSet。如果必须使用 LinkedList,可以考虑先将其转换为 ArrayList 再进行并行流操作。

例如:

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

public class OptimizeDataSourceExample {
    public static void main(String[] args) {
        LinkedList<Integer> linkedList = new LinkedList<>();
        for (int i = 0; i < 1000000; i++) {
            linkedList.add(i);
        }

        List<Integer> arrayList = new ArrayList<>(linkedList);

        long startTimeArrayList = System.currentTimeMillis();
        int sumArrayList = arrayList.parallelStream().mapToInt(Integer::intValue).sum();
        long endTimeArrayList = System.currentTimeMillis();

        long startTimeLinkedList = System.currentTimeMillis();
        int sumLinkedList = linkedList.parallelStream().mapToInt(Integer::intValue).sum();
        long endTimeLinkedList = System.currentTimeMillis();

        System.out.println("ArrayList 并行求和时间: " + (endTimeArrayList - startTimeArrayList) + " 毫秒");
        System.out.println("LinkedList 并行求和时间: " + (endTimeLinkedList - startTimeLinkedList) + " 毫秒");
    }
}

通过将 LinkedList 转换为 ArrayList,可以提升并行流操作的性能。

优化操作顺序

合理安排流操作的顺序可以提升并行流性能。将计算量小、可并行性高的操作放在前面,计算量大、涉及全局状态的操作放在后面。

例如,在进行 filterreduce 操作时,如果 filter 能够大幅减少数据量,就应该先执行 filter 操作:

import java.util.Arrays;
import java.util.List;

public class OptimizeOperationOrderExample {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        long startTime1 = System.currentTimeMillis();
        int result1 = numbers.parallelStream()
                .filter(n -> n % 2 == 0)
                .reduce(0, Integer::sum);
        long endTime1 = System.currentTimeMillis();

        long startTime2 = System.currentTimeMillis();
        int result2 = numbers.parallelStream()
                .reduce(0, (a, b) -> a + (b % 2 == 0? b : 0));
        long endTime2 = System.currentTimeMillis();

        System.out.println("先 filter 后 reduce 时间: " + (endTime1 - startTime1) + " 毫秒");
        System.out.println("先 reduce 后 filter 时间: " + (endTime2 - startTime2) + " 毫秒");
    }
}

通常情况下,先执行 filter 操作会使后续 reduce 操作的数据量减少,从而提升性能。

调整任务粒度

通过调整任务粒度,可以平衡并行化开销和并行执行带来的性能提升。可以使用 Collectors.groupingBy 等方法将小任务合并成较大的任务块。

例如,对于字符转换任务,可以将多个字符组合成一个字符串块进行处理:

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class OptimizeTaskGranularityExample {
    public static void main(String[] args) {
        List<String> words = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {
            words.add("a" + i);
        }

        long startTime = System.currentTimeMillis();
        Map<Integer, List<String>> groupedWords = words.parallelStream()
                .collect(Collectors.groupingBy(s -> s.length()));

        groupedWords.forEach((length, group) -> {
            group.parallelStream()
                   .map(word -> word.replace('a', 'b'))
                   .forEach(System.out::println);
        });
        long endTime = System.currentTimeMillis();

        System.out.println("操作时间: " + (endTime - startTime) + " 毫秒");
    }
}

通过 Collectors.groupingBy 将字符串按长度分组,每个组作为一个较大的任务块并行处理,提升了任务粒度,从而可能提升性能。

合理设置并行度

并行度指的是并行流中同时执行的任务数量,通常与 CPU 核心数相关。Java 并行流默认会根据 CPU 核心数自动设置并行度,但在某些情况下,手动调整并行度可能会提升性能。

可以通过 parallelStream(int parallelism) 方法来设置并行度。例如:

import java.util.Arrays;
import java.util.List;

public class SetParallelismExample {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        long startTime1 = System.currentTimeMillis();
        int result1 = numbers.parallelStream().reduce(0, Integer::sum);
        long endTime1 = System.currentTimeMillis();

        long startTime2 = System.currentTimeMillis();
        int result2 = numbers.parallelStream(2).reduce(0, Integer::sum);
        long endTime2 = System.currentTimeMillis();

        System.out.println("默认并行度时间: " + (endTime1 - startTime1) + " 毫秒");
        System.out.println("并行度为 2 时间: " + (endTime2 - startTime2) + " 毫秒");
    }
}

在这个例子中,手动设置并行度为 2,并与默认并行度进行比较。根据具体的任务和系统环境,选择合适的并行度可以优化性能。

并行流多 CPU 利用的注意事项

线程安全问题

并行流操作涉及多线程执行,因此要特别注意线程安全。如果流操作中涉及到共享可变状态,可能会导致数据竞争和不一致问题。

例如,下面的代码试图在并行流中更新一个共享变量:

import java.util.Arrays;
import java.util.List;

public class ThreadSafetyParallelStreamExample {
    private static int sharedVariable = 0;

    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
        numbers.parallelStream()
               .forEach(n -> sharedVariable += n);
        System.out.println("共享变量值: " + sharedVariable);
    }
}

这段代码由于多个线程同时访问和修改 sharedVariable,会导致结果不可预测。要解决这个问题,可以使用线程安全的类,如 AtomicInteger

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadSafetyParallelStreamFixedExample {
    private static AtomicInteger sharedVariable = new AtomicInteger(0);

    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
        numbers.parallelStream()
               .forEach(n -> sharedVariable.addAndGet(n));
        System.out.println("共享变量值: " + sharedVariable.get());
    }
}

副作用问题

并行流操作应该避免产生副作用。副作用指的是流操作除了返回结果外,还对外部状态产生影响。例如,在 forEach 操作中修改外部集合:

import java.util.ArrayList;
import java.util.List;

public class SideEffectParallelStreamExample {
    public static void main(String[] args) {
        List<Integer> numbers = new ArrayList<>();
        numbers.add(1);
        numbers.add(2);
        numbers.add(3);

        List<Integer> resultList = new ArrayList<>();
        numbers.parallelStream()
               .forEach(resultList::add);
        System.out.println("结果列表: " + resultList);
    }
}

这段代码在并行流 forEach 操作中向 resultList 添加元素,由于并行执行的不确定性,可能会导致 resultList 中元素顺序混乱或数据丢失。更好的做法是使用 collect 操作来收集结果:

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class SideEffectParallelStreamFixedExample {
    public static void main(String[] args) {
        List<Integer> numbers = new ArrayList<>();
        numbers.add(1);
        numbers.add(2);
        numbers.add(3);

        List<Integer> resultList = numbers.parallelStream()
               .collect(Collectors.toList());
        System.out.println("结果列表: " + resultList);
    }
}

通过 collect 操作,结果会按照并行流的规约逻辑正确收集,避免了副作用问题。

性能监控与调优

在实际应用中,需要对并行流性能进行监控和调优。可以使用 Java 自带的性能分析工具,如 VisualVM,来分析并行流操作的性能瓶颈。

例如,通过 VisualVM 可以查看并行流操作过程中线程的使用情况、CPU 利用率等指标。根据这些指标,进一步调整数据源、操作顺序、任务粒度和并行度等参数,以达到最佳性能。

总结

Java Stream 并行流为利用多 CPU 核心提升数据处理性能提供了强大的工具。理解并行流的多 CPU 利用原理,分析影响其性能的因素,并采取合适的优化策略和注意事项,能够使开发者在处理大数据集时充分发挥多核 CPU 的优势,提升应用程序的性能和响应速度。在实际开发中,需要根据具体的业务场景和数据特点,灵活运用并行流技术,以实现高效的数据处理。同时,注意线程安全、避免副作用以及进行性能监控与调优,确保并行流操作的正确性和高效性。