MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java Stream API的使用与优化

2021-01-311.9k 阅读

Java Stream API 基础概念

Stream API 是 Java 8 引入的重要特性,它提供了一种更高效、更简洁的方式来处理集合数据。Stream 可以看作是一个来自数据源的元素队列,支持顺序和并行的聚合操作。与传统的集合操作相比,Stream API 采用了函数式编程风格,更强调对数据的处理逻辑,而不是数据的存储和操作过程。

Stream 的数据源

Stream 的数据源可以是集合(Collection)、数组、文件等。例如,从一个 List 创建 Stream:

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;

public class StreamExample {
    public static void main(String[] args) {
        List<Integer> numbers = new ArrayList<>();
        numbers.add(1);
        numbers.add(2);
        numbers.add(3);
        numbers.add(4);
        numbers.add(5);

        Stream<Integer> stream = numbers.stream();
    }
}

这里通过 numbers.stream() 方法从 List 创建了一个 Stream。如果要创建并行流,可以使用 numbers.parallelStream()

Stream 的操作类型

Stream 的操作分为中间操作(Intermediate Operations)和终端操作(Terminal Operations)。

  • 中间操作:返回一个新的 Stream,可以链式调用多个中间操作。例如 filtermapsorted 等。这些操作是惰性求值的,只有在终端操作被调用时才会真正执行。
  • 终端操作:执行 Stream 的计算,并返回一个结果或副作用。例如 forEachcollectreduce 等。一旦终端操作执行,Stream 就会被消耗,不能再被使用。

常用的中间操作

Filter 操作

filter 操作用于根据给定的条件过滤 Stream 中的元素。它接受一个 Predicate 作为参数,返回满足条件的元素组成的新 Stream。

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class FilterExample {
    public static void main(String[] args) {
        List<Integer> numbers = new ArrayList<>();
        numbers.add(1);
        numbers.add(2);
        numbers.add(3);
        numbers.add(4);
        numbers.add(5);

        List<Integer> evenNumbers = numbers.stream()
                                          .filter(n -> n % 2 == 0)
                                          .collect(Collectors.toList());
        System.out.println(evenNumbers);
    }
}

在上述代码中,filter(n -> n % 2 == 0) 过滤出了列表中的偶数,最终通过 collect 终端操作将结果收集到一个新的 List 中。

Map 操作

map 操作将 Stream 中的每个元素按照给定的函数进行转换,返回一个新的 Stream,其元素类型可能与原 Stream 不同。

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class MapExample {
    public static void main(String[] args) {
        List<Integer> numbers = new ArrayList<>();
        numbers.add(1);
        numbers.add(2);
        numbers.add(3);
        numbers.add(4);
        numbers.add(5);

        List<Integer> squaredNumbers = numbers.stream()
                                              .map(n -> n * n)
                                              .collect(Collectors.toList());
        System.out.println(squaredNumbers);
    }
}

这里 map(n -> n * n) 将列表中的每个数平方,然后收集到新的列表中。

Sorted 操作

sorted 操作对 Stream 中的元素进行排序。如果 Stream 中的元素实现了 Comparable 接口,可以直接使用 sorted() 进行自然排序;也可以传入一个 Comparator 进行自定义排序。

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

public class SortedExample {
    public static void main(String[] args) {
        List<Integer> numbers = new ArrayList<>();
        numbers.add(3);
        numbers.add(1);
        numbers.add(4);
        numbers.add(2);
        numbers.add(5);

        List<Integer> sortedNumbers = numbers.stream()
                                             .sorted()
                                             .collect(Collectors.toList());
        System.out.println(sortedNumbers);

        List<Integer> reverseSortedNumbers = numbers.stream()
                                                   .sorted(Comparator.reverseOrder())
                                                   .collect(Collectors.toList());
        System.out.println(reverseSortedNumbers);
    }
}

上述代码展示了自然排序和逆序排序的两种方式。

常用的终端操作

ForEach 操作

forEach 操作对 Stream 中的每个元素执行给定的动作。它主要用于副作用操作,例如打印元素。

import java.util.ArrayList;
import java.util.List;

public class ForEachExample {
    public static void main(String[] args) {
        List<Integer> numbers = new ArrayList<>();
        numbers.add(1);
        numbers.add(2);
        numbers.add(3);
        numbers.add(4);
        numbers.add(5);

        numbers.stream().forEach(System.out::println);
    }
}

这里通过 forEach 打印了列表中的每个元素。

Collect 操作

collect 操作将 Stream 中的元素收集到一个结果容器中,例如 ListSetMap 等。Collectors 类提供了很多静态方法来支持不同类型的收集操作。

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class CollectExample {
    public static void main(String[] args) {
        List<Integer> numbers = new ArrayList<>();
        numbers.add(1);
        numbers.add(2);
        numbers.add(3);
        numbers.add(4);
        numbers.add(5);

        List<Integer> evenNumbers = numbers.stream()
                                          .filter(n -> n % 2 == 0)
                                          .collect(Collectors.toList());

        Map<Boolean, List<Integer>> partitionedNumbers = numbers.stream()
                                                               .collect(Collectors.partitioningBy(n -> n % 2 == 0));

        System.out.println(evenNumbers);
        System.out.println(partitionedNumbers);
    }
}

代码中不仅收集了偶数到一个 List 中,还通过 partitioningBy 方法将数字按照奇偶性进行了分区,收集到一个 Map 中。

Reduce 操作

reduce 操作通过一个累积函数将 Stream 中的元素进行聚合,产生一个单一的结果。它有几种重载形式,最常见的形式接受一个初始值和一个 BinaryOperator

import java.util.ArrayList;
import java.util.List;

public class ReduceExample {
    public static void main(String[] args) {
        List<Integer> numbers = new ArrayList<>();
        numbers.add(1);
        numbers.add(2);
        numbers.add(3);
        numbers.add(4);
        numbers.add(5);

        int sum = numbers.stream().reduce(0, (a, b) -> a + b);
        System.out.println(sum);
    }
}

这里 reduce(0, (a, b) -> a + b) 从初始值 0 开始,将列表中的数字依次相加,最终得到总和。

并行流与性能优化

并行流的原理

并行流利用多核 CPU 的优势,将 Stream 中的数据分成多个部分,并行地对这些部分进行操作,最后合并结果。Java 底层使用 Fork/Join 框架来实现并行流的并行处理。例如,在处理大数据集时,并行流可能会比顺序流快很多。

并行流的使用

将顺序流转换为并行流非常简单,只需要调用 parallel() 方法。同样,并行流也可以通过 sequential() 方法转换回顺序流。

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class ParallelStreamExample {
    public static void main(String[] args) {
        List<Integer> numbers = new ArrayList<>();
        for (int i = 0; i < 1000000; i++) {
            numbers.add(i);
        }

        long startTime = System.currentTimeMillis();
        List<Integer> squaredParallel = numbers.parallelStream()
                                               .map(n -> n * n)
                                               .collect(Collectors.toList());
        long endTime = System.currentTimeMillis();
        System.out.println("Parallel time: " + (endTime - startTime));

        startTime = System.currentTimeMillis();
        List<Integer> squaredSequential = numbers.stream()
                                                 .map(n -> n * n)
                                                 .collect(Collectors.toList());
        endTime = System.currentTimeMillis();
        System.out.println("Sequential time: " + (endTime - startTime));
    }
}

在这个例子中,对一个包含一百万个元素的列表进行平方操作,分别使用并行流和顺序流,通过对比时间可以看到并行流在大数据集处理时的性能优势。

并行流的性能优化要点

  • 数据规模:并行流在处理大数据集时优势明显,但对于小数据集,并行流的开销可能会超过其带来的性能提升。因为并行流的创建、数据分割和结果合并都需要额外的开销。
  • 操作复杂度:如果 Stream 中的操作复杂度较高,例如涉及复杂的计算或 I/O 操作,并行流的性能提升会更显著。但如果操作非常简单,如简单的过滤或映射,并行流的优势可能不明显。
  • 避免共享状态:在并行流操作中,应避免共享可变状态。因为并行处理可能会导致数据竞争和不一致的结果。例如,不要在 forEach 中修改外部的可变对象。

自定义 Collector

为什么需要自定义 Collector

虽然 Java 提供的 Collectors 类已经包含了很多常用的收集器,但在某些特定场景下,我们可能需要自定义收集逻辑。例如,我们可能需要按照特定的规则将元素收集到自定义的数据结构中,或者进行复杂的聚合操作。

自定义 Collector 的实现步骤

  • 定义 Supplier:用于创建收集结果的容器。例如,如果要收集到一个自定义的 MyContainer 中,需要提供一个创建 MyContainerSupplier
  • 定义 Accumulator:用于将 Stream 中的元素累积到结果容器中。这是一个 BiConsumer,接受结果容器和 Stream 中的元素作为参数。
  • 定义 Combiner:在并行流中,用于合并不同子任务的结果容器。这是一个 BinaryOperator
  • 定义 Finisher:在收集完成后,对结果容器进行最终处理的函数。如果不需要最终处理,可以直接返回结果容器。
  • 创建 Collector:使用 Collector.of 方法,将上述定义的 SupplierAccumulatorCombinerFinisher 组合成一个 Collector

自定义 Collector 示例

假设我们要将字符串收集到一个自定义的 WordContainer 中,该容器记录单词的数量和总长度。

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collector;

public class WordContainer {
    private int wordCount;
    private int totalLength;

    public WordContainer() {
        this.wordCount = 0;
        this.totalLength = 0;
    }

    public void addWord(String word) {
        wordCount++;
        totalLength += word.length();
    }

    public void merge(WordContainer other) {
        wordCount += other.wordCount;
        totalLength += other.totalLength;
    }

    public int getWordCount() {
        return wordCount;
    }

    public int getTotalLength() {
        return totalLength;
    }
}

public class CustomCollectorExample {
    public static Collector<String, WordContainer, WordContainer> wordCollector() {
        return Collector.of(
                WordContainer::new,
                WordContainer::addWord,
                (a, b) -> {
                    a.merge(b);
                    return a;
                }
        );
    }

    public static void main(String[] args) {
        List<String> words = new ArrayList<>();
        words.add("hello");
        words.add("world");
        words.add("java");

        WordContainer container = words.stream().collect(wordCollector());
        System.out.println("Word count: " + container.getWordCount());
        System.out.println("Total length: " + container.getTotalLength());
    }
}

在上述代码中,我们定义了 WordContainer 类来存储单词数量和总长度。通过 Collector.of 创建了自定义收集器 wordCollector,并在 main 方法中使用它来收集字符串列表中的单词信息。

Stream API 的高级应用

嵌套 Stream

有时候,我们需要处理嵌套的数据结构,例如一个包含多个列表的列表。可以使用嵌套 Stream 来处理这种情况。

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class NestedStreamExample {
    public static void main(String[] args) {
        List<List<Integer>> nestedLists = new ArrayList<>();
        nestedLists.add(List.of(1, 2));
        nestedLists.add(List.of(3, 4));
        nestedLists.add(List.of(5, 6));

        List<Integer> flatList = nestedLists.stream()
                                            .flatMap(List::stream)
                                            .collect(Collectors.toList());
        System.out.println(flatList);
    }
}

这里通过 flatMap 方法将嵌套的 List 扁平化,最终收集到一个单一的 List 中。

与 Optional 的结合使用

Optional 是 Java 8 引入的用于处理空值的类。Stream API 可以与 Optional 很好地结合。例如,当使用 findFirstfindAny 等操作可能返回空值时,可以返回 Optional

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

public class StreamOptionalExample {
    public static void main(String[] args) {
        List<Integer> numbers = new ArrayList<>();
        numbers.add(1);
        numbers.add(2);
        numbers.add(3);

        Optional<Integer> firstEven = numbers.stream()
                                             .filter(n -> n % 2 == 0)
                                             .findFirst();

        firstEven.ifPresent(System.out::println);
    }
}

在这个例子中,findFirst 可能返回空值,所以返回 Optional<Integer>。通过 ifPresent 方法可以安全地处理可能存在的值。

处理无限流

Stream API 支持创建和处理无限流。例如,可以使用 Stream.iterateStream.generate 创建无限流。但在使用无限流时,通常需要结合中间操作来限制流的长度,否则终端操作可能永远不会结束。

import java.util.stream.Stream;

public class InfiniteStreamExample {
    public static void main(String[] args) {
        Stream.iterate(1, n -> n + 1)
              .limit(10)
              .forEach(System.out::println);

        Stream.generate(Math::random)
              .limit(5)
              .forEach(System.out::println);
    }
}

上述代码中,Stream.iterate 创建了一个从 1 开始的无限递增流,通过 limit(10) 限制为 10 个元素;Stream.generate 创建了一个无限的随机数流,同样通过 limit(5) 限制为 5 个元素。

通过深入理解和掌握 Java Stream API 的各种操作、并行流的使用、自定义收集器以及高级应用,开发人员可以更高效、更优雅地处理数据,提高代码的可读性和性能。在实际应用中,需要根据具体的业务场景和数据规模来选择合适的 Stream 操作和优化策略。