Java Stream API的使用与优化
Java Stream API 基础概念
Stream API 是 Java 8 引入的重要特性,它提供了一种更高效、更简洁的方式来处理集合数据。Stream 可以看作是一个来自数据源的元素队列,支持顺序和并行的聚合操作。与传统的集合操作相比,Stream API 采用了函数式编程风格,更强调对数据的处理逻辑,而不是数据的存储和操作过程。
Stream 的数据源
Stream 的数据源可以是集合(Collection
)、数组、文件等。例如,从一个 List
创建 Stream:
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;
public class StreamExample {
public static void main(String[] args) {
List<Integer> numbers = new ArrayList<>();
numbers.add(1);
numbers.add(2);
numbers.add(3);
numbers.add(4);
numbers.add(5);
Stream<Integer> stream = numbers.stream();
}
}
这里通过 numbers.stream()
方法从 List
创建了一个 Stream。如果要创建并行流,可以使用 numbers.parallelStream()
。
Stream 的操作类型
Stream 的操作分为中间操作(Intermediate Operations)和终端操作(Terminal Operations)。
- 中间操作:返回一个新的 Stream,可以链式调用多个中间操作。例如
filter
、map
、sorted
等。这些操作是惰性求值的,只有在终端操作被调用时才会真正执行。 - 终端操作:执行 Stream 的计算,并返回一个结果或副作用。例如
forEach
、collect
、reduce
等。一旦终端操作执行,Stream 就会被消耗,不能再被使用。
常用的中间操作
Filter 操作
filter
操作用于根据给定的条件过滤 Stream 中的元素。它接受一个 Predicate
作为参数,返回满足条件的元素组成的新 Stream。
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class FilterExample {
public static void main(String[] args) {
List<Integer> numbers = new ArrayList<>();
numbers.add(1);
numbers.add(2);
numbers.add(3);
numbers.add(4);
numbers.add(5);
List<Integer> evenNumbers = numbers.stream()
.filter(n -> n % 2 == 0)
.collect(Collectors.toList());
System.out.println(evenNumbers);
}
}
在上述代码中,filter(n -> n % 2 == 0)
过滤出了列表中的偶数,最终通过 collect
终端操作将结果收集到一个新的 List
中。
Map 操作
map
操作将 Stream 中的每个元素按照给定的函数进行转换,返回一个新的 Stream,其元素类型可能与原 Stream 不同。
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class MapExample {
public static void main(String[] args) {
List<Integer> numbers = new ArrayList<>();
numbers.add(1);
numbers.add(2);
numbers.add(3);
numbers.add(4);
numbers.add(5);
List<Integer> squaredNumbers = numbers.stream()
.map(n -> n * n)
.collect(Collectors.toList());
System.out.println(squaredNumbers);
}
}
这里 map(n -> n * n)
将列表中的每个数平方,然后收集到新的列表中。
Sorted 操作
sorted
操作对 Stream 中的元素进行排序。如果 Stream 中的元素实现了 Comparable
接口,可以直接使用 sorted()
进行自然排序;也可以传入一个 Comparator
进行自定义排序。
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
public class SortedExample {
public static void main(String[] args) {
List<Integer> numbers = new ArrayList<>();
numbers.add(3);
numbers.add(1);
numbers.add(4);
numbers.add(2);
numbers.add(5);
List<Integer> sortedNumbers = numbers.stream()
.sorted()
.collect(Collectors.toList());
System.out.println(sortedNumbers);
List<Integer> reverseSortedNumbers = numbers.stream()
.sorted(Comparator.reverseOrder())
.collect(Collectors.toList());
System.out.println(reverseSortedNumbers);
}
}
上述代码展示了自然排序和逆序排序的两种方式。
常用的终端操作
ForEach 操作
forEach
操作对 Stream 中的每个元素执行给定的动作。它主要用于副作用操作,例如打印元素。
import java.util.ArrayList;
import java.util.List;
public class ForEachExample {
public static void main(String[] args) {
List<Integer> numbers = new ArrayList<>();
numbers.add(1);
numbers.add(2);
numbers.add(3);
numbers.add(4);
numbers.add(5);
numbers.stream().forEach(System.out::println);
}
}
这里通过 forEach
打印了列表中的每个元素。
Collect 操作
collect
操作将 Stream 中的元素收集到一个结果容器中,例如 List
、Set
、Map
等。Collectors
类提供了很多静态方法来支持不同类型的收集操作。
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class CollectExample {
public static void main(String[] args) {
List<Integer> numbers = new ArrayList<>();
numbers.add(1);
numbers.add(2);
numbers.add(3);
numbers.add(4);
numbers.add(5);
List<Integer> evenNumbers = numbers.stream()
.filter(n -> n % 2 == 0)
.collect(Collectors.toList());
Map<Boolean, List<Integer>> partitionedNumbers = numbers.stream()
.collect(Collectors.partitioningBy(n -> n % 2 == 0));
System.out.println(evenNumbers);
System.out.println(partitionedNumbers);
}
}
代码中不仅收集了偶数到一个 List
中,还通过 partitioningBy
方法将数字按照奇偶性进行了分区,收集到一个 Map
中。
Reduce 操作
reduce
操作通过一个累积函数将 Stream 中的元素进行聚合,产生一个单一的结果。它有几种重载形式,最常见的形式接受一个初始值和一个 BinaryOperator
。
import java.util.ArrayList;
import java.util.List;
public class ReduceExample {
public static void main(String[] args) {
List<Integer> numbers = new ArrayList<>();
numbers.add(1);
numbers.add(2);
numbers.add(3);
numbers.add(4);
numbers.add(5);
int sum = numbers.stream().reduce(0, (a, b) -> a + b);
System.out.println(sum);
}
}
这里 reduce(0, (a, b) -> a + b)
从初始值 0
开始,将列表中的数字依次相加,最终得到总和。
并行流与性能优化
并行流的原理
并行流利用多核 CPU 的优势,将 Stream 中的数据分成多个部分,并行地对这些部分进行操作,最后合并结果。Java 底层使用 Fork/Join
框架来实现并行流的并行处理。例如,在处理大数据集时,并行流可能会比顺序流快很多。
并行流的使用
将顺序流转换为并行流非常简单,只需要调用 parallel()
方法。同样,并行流也可以通过 sequential()
方法转换回顺序流。
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class ParallelStreamExample {
public static void main(String[] args) {
List<Integer> numbers = new ArrayList<>();
for (int i = 0; i < 1000000; i++) {
numbers.add(i);
}
long startTime = System.currentTimeMillis();
List<Integer> squaredParallel = numbers.parallelStream()
.map(n -> n * n)
.collect(Collectors.toList());
long endTime = System.currentTimeMillis();
System.out.println("Parallel time: " + (endTime - startTime));
startTime = System.currentTimeMillis();
List<Integer> squaredSequential = numbers.stream()
.map(n -> n * n)
.collect(Collectors.toList());
endTime = System.currentTimeMillis();
System.out.println("Sequential time: " + (endTime - startTime));
}
}
在这个例子中,对一个包含一百万个元素的列表进行平方操作,分别使用并行流和顺序流,通过对比时间可以看到并行流在大数据集处理时的性能优势。
并行流的性能优化要点
- 数据规模:并行流在处理大数据集时优势明显,但对于小数据集,并行流的开销可能会超过其带来的性能提升。因为并行流的创建、数据分割和结果合并都需要额外的开销。
- 操作复杂度:如果 Stream 中的操作复杂度较高,例如涉及复杂的计算或 I/O 操作,并行流的性能提升会更显著。但如果操作非常简单,如简单的过滤或映射,并行流的优势可能不明显。
- 避免共享状态:在并行流操作中,应避免共享可变状态。因为并行处理可能会导致数据竞争和不一致的结果。例如,不要在
forEach
中修改外部的可变对象。
自定义 Collector
为什么需要自定义 Collector
虽然 Java 提供的 Collectors
类已经包含了很多常用的收集器,但在某些特定场景下,我们可能需要自定义收集逻辑。例如,我们可能需要按照特定的规则将元素收集到自定义的数据结构中,或者进行复杂的聚合操作。
自定义 Collector 的实现步骤
- 定义
Supplier
:用于创建收集结果的容器。例如,如果要收集到一个自定义的MyContainer
中,需要提供一个创建MyContainer
的Supplier
。 - 定义
Accumulator
:用于将 Stream 中的元素累积到结果容器中。这是一个BiConsumer
,接受结果容器和 Stream 中的元素作为参数。 - 定义
Combiner
:在并行流中,用于合并不同子任务的结果容器。这是一个BinaryOperator
。 - 定义
Finisher
:在收集完成后,对结果容器进行最终处理的函数。如果不需要最终处理,可以直接返回结果容器。 - 创建
Collector
:使用Collector.of
方法,将上述定义的Supplier
、Accumulator
、Combiner
和Finisher
组合成一个Collector
。
自定义 Collector 示例
假设我们要将字符串收集到一个自定义的 WordContainer
中,该容器记录单词的数量和总长度。
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collector;
public class WordContainer {
private int wordCount;
private int totalLength;
public WordContainer() {
this.wordCount = 0;
this.totalLength = 0;
}
public void addWord(String word) {
wordCount++;
totalLength += word.length();
}
public void merge(WordContainer other) {
wordCount += other.wordCount;
totalLength += other.totalLength;
}
public int getWordCount() {
return wordCount;
}
public int getTotalLength() {
return totalLength;
}
}
public class CustomCollectorExample {
public static Collector<String, WordContainer, WordContainer> wordCollector() {
return Collector.of(
WordContainer::new,
WordContainer::addWord,
(a, b) -> {
a.merge(b);
return a;
}
);
}
public static void main(String[] args) {
List<String> words = new ArrayList<>();
words.add("hello");
words.add("world");
words.add("java");
WordContainer container = words.stream().collect(wordCollector());
System.out.println("Word count: " + container.getWordCount());
System.out.println("Total length: " + container.getTotalLength());
}
}
在上述代码中,我们定义了 WordContainer
类来存储单词数量和总长度。通过 Collector.of
创建了自定义收集器 wordCollector
,并在 main
方法中使用它来收集字符串列表中的单词信息。
Stream API 的高级应用
嵌套 Stream
有时候,我们需要处理嵌套的数据结构,例如一个包含多个列表的列表。可以使用嵌套 Stream 来处理这种情况。
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class NestedStreamExample {
public static void main(String[] args) {
List<List<Integer>> nestedLists = new ArrayList<>();
nestedLists.add(List.of(1, 2));
nestedLists.add(List.of(3, 4));
nestedLists.add(List.of(5, 6));
List<Integer> flatList = nestedLists.stream()
.flatMap(List::stream)
.collect(Collectors.toList());
System.out.println(flatList);
}
}
这里通过 flatMap
方法将嵌套的 List
扁平化,最终收集到一个单一的 List
中。
与 Optional 的结合使用
Optional
是 Java 8 引入的用于处理空值的类。Stream API 可以与 Optional
很好地结合。例如,当使用 findFirst
或 findAny
等操作可能返回空值时,可以返回 Optional
。
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
public class StreamOptionalExample {
public static void main(String[] args) {
List<Integer> numbers = new ArrayList<>();
numbers.add(1);
numbers.add(2);
numbers.add(3);
Optional<Integer> firstEven = numbers.stream()
.filter(n -> n % 2 == 0)
.findFirst();
firstEven.ifPresent(System.out::println);
}
}
在这个例子中,findFirst
可能返回空值,所以返回 Optional<Integer>
。通过 ifPresent
方法可以安全地处理可能存在的值。
处理无限流
Stream API 支持创建和处理无限流。例如,可以使用 Stream.iterate
或 Stream.generate
创建无限流。但在使用无限流时,通常需要结合中间操作来限制流的长度,否则终端操作可能永远不会结束。
import java.util.stream.Stream;
public class InfiniteStreamExample {
public static void main(String[] args) {
Stream.iterate(1, n -> n + 1)
.limit(10)
.forEach(System.out::println);
Stream.generate(Math::random)
.limit(5)
.forEach(System.out::println);
}
}
上述代码中,Stream.iterate
创建了一个从 1 开始的无限递增流,通过 limit(10)
限制为 10 个元素;Stream.generate
创建了一个无限的随机数流,同样通过 limit(5)
限制为 5 个元素。
通过深入理解和掌握 Java Stream API 的各种操作、并行流的使用、自定义收集器以及高级应用,开发人员可以更高效、更优雅地处理数据,提高代码的可读性和性能。在实际应用中,需要根据具体的业务场景和数据规模来选择合适的 Stream 操作和优化策略。