Java并行流的基本用法
Java并行流基础概念
在Java 8引入流(Stream)之后,并行流作为其重要特性,极大地提升了数据处理的效率。流提供了一种声明式的处理数据集合的方式,而并行流则利用多核处理器的优势,将数据处理任务并行化。
流与并行流的关系
流是Java 8中对集合数据处理的新抽象,它允许以一种类似SQL查询的方式对集合元素进行筛选、映射、归约等操作。例如,假设有一个整数列表,我们想筛选出所有偶数并计算它们的平方和,使用流可以这样写:
import java.util.Arrays;
import java.util.List;
public class StreamExample {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
int sum = numbers.stream()
.filter(n -> n % 2 == 0)
.mapToInt(n -> n * n)
.sum();
System.out.println(sum);
}
}
在这个例子中,stream()
方法将列表转换为流,filter
方法筛选出偶数,mapToInt
方法将每个偶数映射为其平方,最后sum
方法计算总和。
并行流则是流的并行版本,它会将数据分块处理,利用多核CPU的多个线程并行执行操作。我们只需要将上述代码中的stream()
替换为parallelStream()
,就可以实现并行处理:
import java.util.Arrays;
import java.util.List;
public class ParallelStreamExample {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
int sum = numbers.parallelStream()
.filter(n -> n % 2 == 0)
.mapToInt(n -> n * n)
.sum();
System.out.println(sum);
}
}
虽然代码看起来相似,但并行流会在后台将数据分割成多个部分,由不同的线程同时处理,从而提高处理速度,特别是在处理大数据集时效果更为显著。
并行流的内部实现机制
并行流的实现依赖于Fork/Join
框架。Fork/Join
框架是Java 7引入的用于并行执行任务的框架,它采用分治算法(Divide and Conquer)。当使用并行流时,数据会被分割成多个子任务,每个子任务由不同的线程独立执行。例如,对于一个包含1000个元素的列表,并行流可能会将其分成10个包含100个元素的子列表,然后每个子列表由一个线程处理。处理完子任务后,结果会被合并起来得到最终结果。
具体来说,并行流的操作分为三个阶段:分解(Partitioning)、处理(Processing)和合并(Merging)。在分解阶段,流会将数据源分割成多个子部分。在处理阶段,每个子部分由不同的线程并行处理。最后在合并阶段,各个子任务的结果会被合并成最终结果。以刚才计算偶数平方和的例子来说,并行流会先将列表分割成多个子列表,每个子列表筛选出偶数并计算平方和,最后将这些子结果合并得到总的平方和。
创建并行流
在Java中,有多种方式可以创建并行流。
通过集合创建并行流
集合类如List
、Set
等都提供了parallelStream()
方法来直接创建并行流。例如:
import java.util.ArrayList;
import java.util.List;
public class CreateParallelStreamFromList {
public static void main(String[] args) {
List<String> words = new ArrayList<>();
words.add("apple");
words.add("banana");
words.add("cherry");
words.parallelStream()
.map(String::toUpperCase)
.forEach(System.out::println);
}
}
在这个例子中,words
列表通过parallelStream()
方法创建了并行流,然后将每个单词转换为大写并打印出来。
同样,Set
也可以创建并行流:
import java.util.HashSet;
import java.util.Set;
public class CreateParallelStreamFromSet {
public static void main(String[] args) {
Set<Integer> numbers = new HashSet<>();
numbers.add(1);
numbers.add(2);
numbers.add(3);
numbers.parallelStream()
.map(n -> n * 2)
.forEach(System.out::println);
}
}
这里的HashSet
通过parallelStream()
方法创建并行流,并将每个元素乘以2后打印。
通过数组创建并行流
Arrays
类提供了静态方法来为数组创建并行流。例如:
import java.util.Arrays;
public class CreateParallelStreamFromArray {
public static void main(String[] args) {
int[] numbers = {1, 2, 3, 4, 5};
Arrays.stream(numbers)
.parallel()
.map(n -> n * n)
.forEach(System.out::println);
}
}
在这个例子中,先通过Arrays.stream(numbers)
将数组转换为流,然后调用parallel()
方法将其转换为并行流,最后对每个元素求平方并打印。
通过其他方式创建并行流
除了集合和数组,还可以通过IntStream
、LongStream
、DoubleStream
等原始类型流的parallel()
方法创建并行流。例如:
import java.util.stream.IntStream;
public class CreateParallelStreamFromIntStream {
public static void main(String[] args) {
IntStream.range(1, 10)
.parallel()
.filter(n -> n % 2 == 0)
.forEach(System.out::println);
}
}
这里通过IntStream.range(1, 10)
创建一个包含1到9的整数流,然后调用parallel()
方法将其转换为并行流,筛选出偶数并打印。
并行流的操作
并行流的操作和普通流一样,分为中间操作和终端操作。
中间操作
中间操作会返回一个新的流,并且可以链式调用。常见的中间操作包括filter
、map
、flatMap
、distinct
、sorted
等。
filter
操作:用于筛选流中的元素,保留满足条件的元素。例如:
import java.util.Arrays;
import java.util.List;
public class ParallelStreamFilter {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
numbers.parallelStream()
.filter(n -> n % 2 == 0)
.forEach(System.out::println);
}
}
在这个例子中,filter(n -> n % 2 == 0)
筛选出列表中的偶数并打印。
map
操作:用于将流中的每个元素映射为一个新的元素。例如:
import java.util.Arrays;
import java.util.List;
public class ParallelStreamMap {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
numbers.parallelStream()
.map(n -> n * 2)
.forEach(System.out::println);
}
}
这里map(n -> n * 2)
将列表中的每个元素乘以2并打印。
flatMap
操作:与map
类似,但flatMap
会将映射后的流扁平化。例如,假设有一个字符串列表,每个字符串包含多个单词,我们想将所有单词提取出来并转换为大写:
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class ParallelStreamFlatMap {
public static void main(String[] args) {
List<String> sentences = Arrays.asList("hello world", "java is great");
List<String> words = sentences.parallelStream()
.flatMap(s -> Arrays.stream(s.split(" ")))
.map(String::toUpperCase)
.collect(Collectors.toList());
System.out.println(words);
}
}
在这个例子中,flatMap(s -> Arrays.stream(s.split(" ")))
将每个句子分割成单词并扁平化,然后map(String::toUpperCase)
将每个单词转换为大写,最后通过collect(Collectors.toList())
收集结果。
distinct
操作:用于去除流中的重复元素。例如:
import java.util.Arrays;
import java.util.List;
public class ParallelStreamDistinct {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 2, 3, 3, 3);
numbers.parallelStream()
.distinct()
.forEach(System.out::println);
}
}
这里distinct()
方法去除列表中的重复元素并打印。
sorted
操作:用于对流中的元素进行排序。例如:
import java.util.Arrays;
import java.util.List;
public class ParallelStreamSorted {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(3, 1, 4, 1, 5, 9);
numbers.parallelStream()
.sorted()
.forEach(System.out::println);
}
}
sorted()
方法将列表中的元素按升序排序并打印。如果需要自定义排序,可以传入一个Comparator
。
终端操作
终端操作会执行流的处理流水线,并返回一个结果或副作用。常见的终端操作包括forEach
、collect
、reduce
、count
、min
、max
等。
forEach
操作:用于对流中的每个元素执行一个动作。例如:
import java.util.Arrays;
import java.util.List;
public class ParallelStreamForEach {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
numbers.parallelStream()
.forEach(System.out::println);
}
}
这里forEach(System.out::println)
会打印列表中的每个元素。需要注意的是,由于并行流是并行处理的,打印顺序可能与列表顺序不一致。
collect
操作:用于将流中的元素收集到一个集合或其他数据结构中。例如:
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class ParallelStreamCollect {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
List<Integer> squaredNumbers = numbers.parallelStream()
.map(n -> n * n)
.collect(Collectors.toList());
System.out.println(squaredNumbers);
}
}
在这个例子中,collect(Collectors.toList())
将平方后的元素收集到一个新的列表中。除了toList()
,还可以使用toSet()
、toMap()
等方法收集到不同的数据结构。
reduce
操作:用于将流中的元素归约为一个值。例如,计算列表中所有元素的和:
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
public class ParallelStreamReduce {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
Optional<Integer> sum = numbers.parallelStream()
.reduce((a, b) -> a + b);
sum.ifPresent(System.out::println);
}
}
这里reduce((a, b) -> a + b)
将列表中的元素两两相加,最终得到总和。Optional
类型用于处理可能为空的结果。如果流为空,reduce
操作返回Optional.empty()
。我们可以通过ifPresent
方法来处理结果。
count
操作:用于计算流中元素的数量。例如:
import java.util.Arrays;
import java.util.List;
public class ParallelStreamCount {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
long count = numbers.parallelStream()
.filter(n -> n % 2 == 0)
.count();
System.out.println(count);
}
}
这里count()
方法计算筛选出的偶数的数量并打印。
min
和max
操作:分别用于找出流中的最小和最大元素。例如:
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
public class ParallelStreamMinMax {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
Optional<Integer> min = numbers.parallelStream()
.min(Integer::compareTo);
Optional<Integer> max = numbers.parallelStream()
.max(Integer::compareTo);
min.ifPresent(m -> System.out.println("Min: " + m));
max.ifPresent(m -> System.out.println("Max: " + m));
}
}
这里min(Integer::compareTo)
和max(Integer::compareTo)
分别找出列表中的最小和最大元素,并通过ifPresent
方法打印。
并行流的性能与注意事项
虽然并行流可以显著提高数据处理效率,但在使用时也需要注意一些问题。
性能考量
-
数据规模:并行流在处理大数据集时效果最为显著。对于小规模数据,并行流的线程创建、任务分配和结果合并等开销可能会超过并行处理带来的性能提升。例如,处理一个只有10个元素的列表,使用并行流可能反而比普通流慢。
-
操作复杂性:如果流的操作非常简单,如只进行简单的筛选或映射,并行流的优势可能不明显。但对于复杂的操作,如复杂的过滤条件、昂贵的映射计算等,并行流可以通过并行处理提高效率。
-
硬件环境:并行流依赖多核CPU,在多核处理器上才能发挥其优势。如果运行环境是单核CPU,并行流可能不会带来性能提升,甚至可能因为线程调度等开销而变慢。
注意事项
- 线程安全:在并行流中,由于多个线程同时处理数据,需要确保操作是线程安全的。例如,在
forEach
操作中,如果对共享变量进行修改,可能会导致数据竞争问题。考虑以下代码:
import java.util.Arrays;
import java.util.List;
public class ParallelStreamThreadSafety {
private static int count = 0;
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
numbers.parallelStream()
.forEach(n -> count++);
System.out.println(count);
}
}
在这个例子中,count++
操作不是线程安全的,多个线程同时对count
进行自增操作可能会导致结果不准确。为了解决这个问题,可以使用AtomicInteger
等线程安全的类:
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
public class ParallelStreamThreadSafetyFixed {
private static AtomicInteger count = new AtomicInteger(0);
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
numbers.parallelStream()
.forEach(n -> count.incrementAndGet());
System.out.println(count.get());
}
}
-
顺序一致性:并行流的处理顺序是不确定的,因为不同线程可能以不同顺序处理数据块。如果结果依赖于元素的顺序,使用并行流可能会得到错误的结果。例如,在对一个列表进行排序并打印时,如果使用并行流,打印顺序可能与排序后的顺序不一致。在这种情况下,需要考虑使用普通流或确保并行流的操作不依赖于顺序。
-
资源消耗:并行流会创建多个线程,过多的线程可能会消耗大量系统资源,导致系统性能下降。因此,在使用并行流时,需要根据系统资源情况合理调整并行度。可以通过
parallelStream(int parallelism)
方法指定并行度,其中parallelism
表示并行处理的线程数。例如:
import java.util.Arrays;
import java.util.List;
public class ParallelStreamParallelism {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
numbers.parallelStream(2)
.map(n -> n * 2)
.forEach(System.out::println);
}
}
在这个例子中,parallelStream(2)
指定并行度为2,即使用2个线程并行处理。
并行流与普通流的选择
在实际应用中,需要根据具体情况选择使用并行流还是普通流。
根据数据规模选择
如果数据集较小,普通流通常就足够了,因为其简单且没有并行处理的开销。例如,处理一个只有几十或几百个元素的列表,普通流的性能可能更好。但当数据集非常大,如包含数百万或更多元素时,并行流可以充分利用多核处理器的优势,显著提高处理速度。
根据操作类型选择
对于简单的操作,如简单的筛选或映射,普通流可能更合适,因为并行流的开销可能超过其带来的性能提升。然而,对于复杂的操作,如需要大量计算的映射或复杂的归约操作,并行流可能会带来更好的性能。
根据顺序依赖选择
如果操作结果依赖于元素的顺序,如对列表进行排序后再进行其他操作,普通流是更好的选择,因为并行流可能会打乱元素顺序。但如果顺序不重要,并行流可以提供更高的效率。
并行流在实际项目中的应用案例
- 数据分析:在数据分析场景中,经常需要处理大量的数据。例如,分析销售数据,需要从数百万条销售记录中筛选出特定地区、特定时间段的记录,并计算销售额总和。使用并行流可以快速处理这些数据,提高分析效率。
import java.util.ArrayList;
import java.util.List;
class SaleRecord {
private String region;
private String date;
private double amount;
public SaleRecord(String region, String date, double amount) {
this.region = region;
this.date = date;
this.amount = amount;
}
public String getRegion() {
return region;
}
public String getDate() {
return date;
}
public double getAmount() {
return amount;
}
}
public class SalesAnalysis {
public static void main(String[] args) {
List<SaleRecord> records = new ArrayList<>();
// 假设这里添加了大量销售记录
double totalAmount = records.parallelStream()
.filter(record -> "North".equals(record.getRegion()) && "2023-01".equals(record.getDate()))
.mapToDouble(SaleRecord::getAmount)
.sum();
System.out.println("Total amount in North region in January 2023: " + totalAmount);
}
}
- 图像渲染:在图像处理中,对图像的每个像素进行处理可以并行化。例如,对图像进行灰度化处理,每个像素的灰度计算可以独立进行,使用并行流可以加速处理过程。
import java.awt.image.BufferedImage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ImageProcessing {
public static BufferedImage grayscale(BufferedImage image) {
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
int width = image.getWidth();
int height = image.getHeight();
for (int y = 0; y < height; y++) {
final int finalY = y;
executorService.submit(() -> {
for (int x = 0; x < width; x++) {
int argb = image.getRGB(x, finalY);
int alpha = (argb >> 24) & 0xff;
int red = (argb >> 16) & 0xff;
int green = (argb >> 8) & 0xff;
int blue = argb & 0xff;
int gray = (int) (0.299 * red + 0.587 * green + 0.114 * blue);
argb = (alpha << 24) | (gray << 16) | (gray << 8) | gray;
image.setRGB(x, finalY, argb);
}
});
}
executorService.shutdown();
while (!executorService.isTerminated()) {
}
return image;
}
}
虽然这里没有直接使用并行流,但类似的并行处理思想可以应用到并行流中,将对像素的操作并行化,提高图像处理效率。
- 文本处理:在文本处理中,如统计文档中每个单词的出现次数,处理大文档时并行流可以加快处理速度。
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
public class WordCount {
public static void main(String[] args) {
try (BufferedReader br = new BufferedReader(new FileReader("large_document.txt"))) {
Map<String, Integer> wordCount = br.lines()
.parallel()
.flatMap(line -> java.util.Arrays.stream(line.split("\\W+")))
.filter(word ->!word.isEmpty())
.collect(Collectors.groupingBy(String::toLowerCase, Collectors.summingInt(word -> 1)));
System.out.println(wordCount);
} catch (IOException e) {
e.printStackTrace();
}
}
}
在这个例子中,从大文档中读取每一行,并行处理每一行,将其分割成单词,统计每个单词的出现次数。
总结
Java并行流是一项强大的功能,它利用多核处理器的优势,大大提高了数据处理的效率。通过合理使用并行流,我们可以在处理大数据集和复杂操作时显著提升程序性能。然而,在使用并行流时,需要注意线程安全、顺序一致性和资源消耗等问题。同时,要根据数据规模、操作类型和顺序依赖等因素,合理选择并行流或普通流。在实际项目中,并行流在数据分析、图像渲染、文本处理等多个领域都有广泛的应用,可以为开发者带来高效的数据处理解决方案。希望通过本文的介绍,读者能对Java并行流的基本用法有更深入的理解,并在实际编程中灵活运用。