MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java并行流的基本用法

2024-07-104.7k 阅读

Java并行流基础概念

在Java 8引入流(Stream)之后,并行流作为其重要特性,极大地提升了数据处理的效率。流提供了一种声明式的处理数据集合的方式,而并行流则利用多核处理器的优势,将数据处理任务并行化。

流与并行流的关系

流是Java 8中对集合数据处理的新抽象,它允许以一种类似SQL查询的方式对集合元素进行筛选、映射、归约等操作。例如,假设有一个整数列表,我们想筛选出所有偶数并计算它们的平方和,使用流可以这样写:

import java.util.Arrays;
import java.util.List;

public class StreamExample {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
        int sum = numbers.stream()
                .filter(n -> n % 2 == 0)
                .mapToInt(n -> n * n)
                .sum();
        System.out.println(sum);
    }
}

在这个例子中,stream()方法将列表转换为流,filter方法筛选出偶数,mapToInt方法将每个偶数映射为其平方,最后sum方法计算总和。

并行流则是流的并行版本,它会将数据分块处理,利用多核CPU的多个线程并行执行操作。我们只需要将上述代码中的stream()替换为parallelStream(),就可以实现并行处理:

import java.util.Arrays;
import java.util.List;

public class ParallelStreamExample {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
        int sum = numbers.parallelStream()
                .filter(n -> n % 2 == 0)
                .mapToInt(n -> n * n)
                .sum();
        System.out.println(sum);
    }
}

虽然代码看起来相似,但并行流会在后台将数据分割成多个部分,由不同的线程同时处理,从而提高处理速度,特别是在处理大数据集时效果更为显著。

并行流的内部实现机制

并行流的实现依赖于Fork/Join框架。Fork/Join框架是Java 7引入的用于并行执行任务的框架,它采用分治算法(Divide and Conquer)。当使用并行流时,数据会被分割成多个子任务,每个子任务由不同的线程独立执行。例如,对于一个包含1000个元素的列表,并行流可能会将其分成10个包含100个元素的子列表,然后每个子列表由一个线程处理。处理完子任务后,结果会被合并起来得到最终结果。

具体来说,并行流的操作分为三个阶段:分解(Partitioning)、处理(Processing)和合并(Merging)。在分解阶段,流会将数据源分割成多个子部分。在处理阶段,每个子部分由不同的线程并行处理。最后在合并阶段,各个子任务的结果会被合并成最终结果。以刚才计算偶数平方和的例子来说,并行流会先将列表分割成多个子列表,每个子列表筛选出偶数并计算平方和,最后将这些子结果合并得到总的平方和。

创建并行流

在Java中,有多种方式可以创建并行流。

通过集合创建并行流

集合类如ListSet等都提供了parallelStream()方法来直接创建并行流。例如:

import java.util.ArrayList;
import java.util.List;

public class CreateParallelStreamFromList {
    public static void main(String[] args) {
        List<String> words = new ArrayList<>();
        words.add("apple");
        words.add("banana");
        words.add("cherry");

        words.parallelStream()
               .map(String::toUpperCase)
               .forEach(System.out::println);
    }
}

在这个例子中,words列表通过parallelStream()方法创建了并行流,然后将每个单词转换为大写并打印出来。

同样,Set也可以创建并行流:

import java.util.HashSet;
import java.util.Set;

public class CreateParallelStreamFromSet {
    public static void main(String[] args) {
        Set<Integer> numbers = new HashSet<>();
        numbers.add(1);
        numbers.add(2);
        numbers.add(3);

        numbers.parallelStream()
               .map(n -> n * 2)
               .forEach(System.out::println);
    }
}

这里的HashSet通过parallelStream()方法创建并行流,并将每个元素乘以2后打印。

通过数组创建并行流

Arrays类提供了静态方法来为数组创建并行流。例如:

import java.util.Arrays;

public class CreateParallelStreamFromArray {
    public static void main(String[] args) {
        int[] numbers = {1, 2, 3, 4, 5};
        Arrays.stream(numbers)
               .parallel()
               .map(n -> n * n)
               .forEach(System.out::println);
    }
}

在这个例子中,先通过Arrays.stream(numbers)将数组转换为流,然后调用parallel()方法将其转换为并行流,最后对每个元素求平方并打印。

通过其他方式创建并行流

除了集合和数组,还可以通过IntStreamLongStreamDoubleStream等原始类型流的parallel()方法创建并行流。例如:

import java.util.stream.IntStream;

public class CreateParallelStreamFromIntStream {
    public static void main(String[] args) {
        IntStream.range(1, 10)
               .parallel()
               .filter(n -> n % 2 == 0)
               .forEach(System.out::println);
    }
}

这里通过IntStream.range(1, 10)创建一个包含1到9的整数流,然后调用parallel()方法将其转换为并行流,筛选出偶数并打印。

并行流的操作

并行流的操作和普通流一样,分为中间操作和终端操作。

中间操作

中间操作会返回一个新的流,并且可以链式调用。常见的中间操作包括filtermapflatMapdistinctsorted等。

  1. filter操作:用于筛选流中的元素,保留满足条件的元素。例如:
import java.util.Arrays;
import java.util.List;

public class ParallelStreamFilter {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
        numbers.parallelStream()
               .filter(n -> n % 2 == 0)
               .forEach(System.out::println);
    }
}

在这个例子中,filter(n -> n % 2 == 0)筛选出列表中的偶数并打印。

  1. map操作:用于将流中的每个元素映射为一个新的元素。例如:
import java.util.Arrays;
import java.util.List;

public class ParallelStreamMap {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
        numbers.parallelStream()
               .map(n -> n * 2)
               .forEach(System.out::println);
    }
}

这里map(n -> n * 2)将列表中的每个元素乘以2并打印。

  1. flatMap操作:与map类似,但flatMap会将映射后的流扁平化。例如,假设有一个字符串列表,每个字符串包含多个单词,我们想将所有单词提取出来并转换为大写:
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class ParallelStreamFlatMap {
    public static void main(String[] args) {
        List<String> sentences = Arrays.asList("hello world", "java is great");
        List<String> words = sentences.parallelStream()
               .flatMap(s -> Arrays.stream(s.split(" ")))
               .map(String::toUpperCase)
               .collect(Collectors.toList());
        System.out.println(words);
    }
}

在这个例子中,flatMap(s -> Arrays.stream(s.split(" ")))将每个句子分割成单词并扁平化,然后map(String::toUpperCase)将每个单词转换为大写,最后通过collect(Collectors.toList())收集结果。

  1. distinct操作:用于去除流中的重复元素。例如:
import java.util.Arrays;
import java.util.List;

public class ParallelStreamDistinct {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 2, 3, 3, 3);
        numbers.parallelStream()
               .distinct()
               .forEach(System.out::println);
    }
}

这里distinct()方法去除列表中的重复元素并打印。

  1. sorted操作:用于对流中的元素进行排序。例如:
import java.util.Arrays;
import java.util.List;

public class ParallelStreamSorted {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(3, 1, 4, 1, 5, 9);
        numbers.parallelStream()
               .sorted()
               .forEach(System.out::println);
    }
}

sorted()方法将列表中的元素按升序排序并打印。如果需要自定义排序,可以传入一个Comparator

终端操作

终端操作会执行流的处理流水线,并返回一个结果或副作用。常见的终端操作包括forEachcollectreducecountminmax等。

  1. forEach操作:用于对流中的每个元素执行一个动作。例如:
import java.util.Arrays;
import java.util.List;

public class ParallelStreamForEach {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
        numbers.parallelStream()
               .forEach(System.out::println);
    }
}

这里forEach(System.out::println)会打印列表中的每个元素。需要注意的是,由于并行流是并行处理的,打印顺序可能与列表顺序不一致。

  1. collect操作:用于将流中的元素收集到一个集合或其他数据结构中。例如:
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class ParallelStreamCollect {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
        List<Integer> squaredNumbers = numbers.parallelStream()
               .map(n -> n * n)
               .collect(Collectors.toList());
        System.out.println(squaredNumbers);
    }
}

在这个例子中,collect(Collectors.toList())将平方后的元素收集到一个新的列表中。除了toList(),还可以使用toSet()toMap()等方法收集到不同的数据结构。

  1. reduce操作:用于将流中的元素归约为一个值。例如,计算列表中所有元素的和:
import java.util.Arrays;
import java.util.List;
import java.util.Optional;

public class ParallelStreamReduce {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
        Optional<Integer> sum = numbers.parallelStream()
               .reduce((a, b) -> a + b);
        sum.ifPresent(System.out::println);
    }
}

这里reduce((a, b) -> a + b)将列表中的元素两两相加,最终得到总和。Optional类型用于处理可能为空的结果。如果流为空,reduce操作返回Optional.empty()。我们可以通过ifPresent方法来处理结果。

  1. count操作:用于计算流中元素的数量。例如:
import java.util.Arrays;
import java.util.List;

public class ParallelStreamCount {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
        long count = numbers.parallelStream()
               .filter(n -> n % 2 == 0)
               .count();
        System.out.println(count);
    }
}

这里count()方法计算筛选出的偶数的数量并打印。

  1. minmax操作:分别用于找出流中的最小和最大元素。例如:
import java.util.Arrays;
import java.util.List;
import java.util.Optional;

public class ParallelStreamMinMax {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
        Optional<Integer> min = numbers.parallelStream()
               .min(Integer::compareTo);
        Optional<Integer> max = numbers.parallelStream()
               .max(Integer::compareTo);
        min.ifPresent(m -> System.out.println("Min: " + m));
        max.ifPresent(m -> System.out.println("Max: " + m));
    }
}

这里min(Integer::compareTo)max(Integer::compareTo)分别找出列表中的最小和最大元素,并通过ifPresent方法打印。

并行流的性能与注意事项

虽然并行流可以显著提高数据处理效率,但在使用时也需要注意一些问题。

性能考量

  1. 数据规模:并行流在处理大数据集时效果最为显著。对于小规模数据,并行流的线程创建、任务分配和结果合并等开销可能会超过并行处理带来的性能提升。例如,处理一个只有10个元素的列表,使用并行流可能反而比普通流慢。

  2. 操作复杂性:如果流的操作非常简单,如只进行简单的筛选或映射,并行流的优势可能不明显。但对于复杂的操作,如复杂的过滤条件、昂贵的映射计算等,并行流可以通过并行处理提高效率。

  3. 硬件环境:并行流依赖多核CPU,在多核处理器上才能发挥其优势。如果运行环境是单核CPU,并行流可能不会带来性能提升,甚至可能因为线程调度等开销而变慢。

注意事项

  1. 线程安全:在并行流中,由于多个线程同时处理数据,需要确保操作是线程安全的。例如,在forEach操作中,如果对共享变量进行修改,可能会导致数据竞争问题。考虑以下代码:
import java.util.Arrays;
import java.util.List;

public class ParallelStreamThreadSafety {
    private static int count = 0;

    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
        numbers.parallelStream()
               .forEach(n -> count++);
        System.out.println(count);
    }
}

在这个例子中,count++操作不是线程安全的,多个线程同时对count进行自增操作可能会导致结果不准确。为了解决这个问题,可以使用AtomicInteger等线程安全的类:

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class ParallelStreamThreadSafetyFixed {
    private static AtomicInteger count = new AtomicInteger(0);

    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
        numbers.parallelStream()
               .forEach(n -> count.incrementAndGet());
        System.out.println(count.get());
    }
}
  1. 顺序一致性:并行流的处理顺序是不确定的,因为不同线程可能以不同顺序处理数据块。如果结果依赖于元素的顺序,使用并行流可能会得到错误的结果。例如,在对一个列表进行排序并打印时,如果使用并行流,打印顺序可能与排序后的顺序不一致。在这种情况下,需要考虑使用普通流或确保并行流的操作不依赖于顺序。

  2. 资源消耗:并行流会创建多个线程,过多的线程可能会消耗大量系统资源,导致系统性能下降。因此,在使用并行流时,需要根据系统资源情况合理调整并行度。可以通过parallelStream(int parallelism)方法指定并行度,其中parallelism表示并行处理的线程数。例如:

import java.util.Arrays;
import java.util.List;

public class ParallelStreamParallelism {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
        numbers.parallelStream(2)
               .map(n -> n * 2)
               .forEach(System.out::println);
    }
}

在这个例子中,parallelStream(2)指定并行度为2,即使用2个线程并行处理。

并行流与普通流的选择

在实际应用中,需要根据具体情况选择使用并行流还是普通流。

根据数据规模选择

如果数据集较小,普通流通常就足够了,因为其简单且没有并行处理的开销。例如,处理一个只有几十或几百个元素的列表,普通流的性能可能更好。但当数据集非常大,如包含数百万或更多元素时,并行流可以充分利用多核处理器的优势,显著提高处理速度。

根据操作类型选择

对于简单的操作,如简单的筛选或映射,普通流可能更合适,因为并行流的开销可能超过其带来的性能提升。然而,对于复杂的操作,如需要大量计算的映射或复杂的归约操作,并行流可能会带来更好的性能。

根据顺序依赖选择

如果操作结果依赖于元素的顺序,如对列表进行排序后再进行其他操作,普通流是更好的选择,因为并行流可能会打乱元素顺序。但如果顺序不重要,并行流可以提供更高的效率。

并行流在实际项目中的应用案例

  1. 数据分析:在数据分析场景中,经常需要处理大量的数据。例如,分析销售数据,需要从数百万条销售记录中筛选出特定地区、特定时间段的记录,并计算销售额总和。使用并行流可以快速处理这些数据,提高分析效率。
import java.util.ArrayList;
import java.util.List;

class SaleRecord {
    private String region;
    private String date;
    private double amount;

    public SaleRecord(String region, String date, double amount) {
        this.region = region;
        this.date = date;
        this.amount = amount;
    }

    public String getRegion() {
        return region;
    }

    public String getDate() {
        return date;
    }

    public double getAmount() {
        return amount;
    }
}

public class SalesAnalysis {
    public static void main(String[] args) {
        List<SaleRecord> records = new ArrayList<>();
        // 假设这里添加了大量销售记录
        double totalAmount = records.parallelStream()
               .filter(record -> "North".equals(record.getRegion()) && "2023-01".equals(record.getDate()))
               .mapToDouble(SaleRecord::getAmount)
               .sum();
        System.out.println("Total amount in North region in January 2023: " + totalAmount);
    }
}
  1. 图像渲染:在图像处理中,对图像的每个像素进行处理可以并行化。例如,对图像进行灰度化处理,每个像素的灰度计算可以独立进行,使用并行流可以加速处理过程。
import java.awt.image.BufferedImage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ImageProcessing {
    public static BufferedImage grayscale(BufferedImage image) {
        ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        int width = image.getWidth();
        int height = image.getHeight();
        for (int y = 0; y < height; y++) {
            final int finalY = y;
            executorService.submit(() -> {
                for (int x = 0; x < width; x++) {
                    int argb = image.getRGB(x, finalY);
                    int alpha = (argb >> 24) & 0xff;
                    int red = (argb >> 16) & 0xff;
                    int green = (argb >> 8) & 0xff;
                    int blue = argb & 0xff;
                    int gray = (int) (0.299 * red + 0.587 * green + 0.114 * blue);
                    argb = (alpha << 24) | (gray << 16) | (gray << 8) | gray;
                    image.setRGB(x, finalY, argb);
                }
            });
        }
        executorService.shutdown();
        while (!executorService.isTerminated()) {
        }
        return image;
    }
}

虽然这里没有直接使用并行流,但类似的并行处理思想可以应用到并行流中,将对像素的操作并行化,提高图像处理效率。

  1. 文本处理:在文本处理中,如统计文档中每个单词的出现次数,处理大文档时并行流可以加快处理速度。
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

public class WordCount {
    public static void main(String[] args) {
        try (BufferedReader br = new BufferedReader(new FileReader("large_document.txt"))) {
            Map<String, Integer> wordCount = br.lines()
                   .parallel()
                   .flatMap(line -> java.util.Arrays.stream(line.split("\\W+")))
                   .filter(word ->!word.isEmpty())
                   .collect(Collectors.groupingBy(String::toLowerCase, Collectors.summingInt(word -> 1)));
            System.out.println(wordCount);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

在这个例子中,从大文档中读取每一行,并行处理每一行,将其分割成单词,统计每个单词的出现次数。

总结

Java并行流是一项强大的功能,它利用多核处理器的优势,大大提高了数据处理的效率。通过合理使用并行流,我们可以在处理大数据集和复杂操作时显著提升程序性能。然而,在使用并行流时,需要注意线程安全、顺序一致性和资源消耗等问题。同时,要根据数据规模、操作类型和顺序依赖等因素,合理选择并行流或普通流。在实际项目中,并行流在数据分析、图像渲染、文本处理等多个领域都有广泛的应用,可以为开发者带来高效的数据处理解决方案。希望通过本文的介绍,读者能对Java并行流的基本用法有更深入的理解,并在实际编程中灵活运用。