MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 流同步与异步模式对比

2021-09-092.9k 阅读

Java 流同步与异步模式对比

Java 流基础概述

在深入探讨 Java 流的同步与异步模式之前,先来回顾一下 Java 流的基本概念。Java 流(Stream)是 Java 8 引入的一个强大特性,它允许以一种声明式的方式处理集合数据。流提供了一种流水线式的操作序列,可对数据进行过滤、映射、归约等操作,极大地简化了集合数据的处理逻辑。

例如,假设有一个整数列表,需要计算其中所有偶数的平方和。在 Java 8 之前,可能会使用迭代的方式来实现:

import java.util.ArrayList;
import java.util.List;

public class BeforeJava8 {
    public static void main(String[] args) {
        List<Integer> numbers = new ArrayList<>();
        numbers.add(1);
        numbers.add(2);
        numbers.add(3);
        numbers.add(4);
        numbers.add(5);

        int sum = 0;
        for (Integer number : numbers) {
            if (number % 2 == 0) {
                sum += number * number;
            }
        }
        System.out.println("Sum of squared even numbers: " + sum);
    }
}

而使用 Java 流,可以这样写:

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class Java8StreamExample {
    public static void main(String[] args) {
        List<Integer> numbers = new ArrayList<>();
        numbers.add(1);
        numbers.add(2);
        numbers.add(3);
        numbers.add(4);
        numbers.add(5);

        int sum = numbers.stream()
               .filter(n -> n % 2 == 0)
               .mapToInt(n -> n * n)
               .sum();
        System.out.println("Sum of squared even numbers: " + sum);
    }
}

从上述代码可以看出,Java 流的写法更加简洁、声明式,代码的意图更加清晰。流操作分为中间操作(如 filtermap 等)和终端操作(如 sumcollect 等)。中间操作返回一个新的流,允许链式调用,而终端操作会触发流的处理并返回结果。

同步流模式

同步流的执行机制

同步流是指流操作按照顺序依次执行,前一个操作完成后才会执行下一个操作。在同步流中,流的处理是在主线程中进行的,所有操作都是顺序执行,不存在并发执行的情况。

例如,继续以上面计算偶数平方和的例子,在同步流中,filter 操作会先对列表中的每个元素进行过滤,只有满足条件(偶数)的元素才会进入 mapToInt 操作,mapToInt 操作会对过滤后的元素进行平方计算,最后 sum 操作会将所有平方后的结果累加起来。

这种顺序执行的方式确保了流操作的确定性和数据的一致性。因为所有操作都是在同一个线程中顺序执行,不存在多线程环境下的数据竞争和并发问题。

同步流的性能特点

  1. 优点
    • 易于理解和调试:由于操作是顺序执行的,代码的逻辑非常清晰,很容易理解每个操作的执行过程。在调试时,也更容易定位问题,因为可以按照操作的顺序逐步排查。
    • 数据一致性:同步流保证了数据的一致性,不会出现由于并发操作导致的数据不一致问题。例如,在对一个集合进行过滤和映射操作时,不会出现某个元素在过滤操作还未完成时就进入映射操作的情况。
  2. 缺点
    • 性能瓶颈:在处理大量数据时,同步流可能会成为性能瓶颈。因为所有操作都是顺序执行的,如果某个操作比较耗时,整个流的处理过程都会被阻塞。例如,在对一个包含数百万条记录的数据库表进行复杂的过滤和计算操作时,同步流的执行时间可能会很长。

同步流代码示例

下面通过一个更复杂的示例来展示同步流的使用。假设我们有一个包含员工信息的列表,每个员工对象包含姓名、年龄和工资信息。我们需要从这个列表中筛选出年龄大于 30 岁且工资高于 10000 的员工,并计算他们的平均工资。

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

class Employee {
    private String name;
    private int age;
    private double salary;

    public Employee(String name, int age, double salary) {
        this.name = name;
        this.age = age;
        this.salary = salary;
    }

    public String getName() {
        return name;
    }

    public int getAge() {
        return age;
    }

    public double getSalary() {
        return salary;
    }
}

public class SynchronousStreamExample {
    public static void main(String[] args) {
        List<Employee> employees = new ArrayList<>();
        employees.add(new Employee("Alice", 28, 8000));
        employees.add(new Employee("Bob", 32, 12000));
        employees.add(new Employee("Charlie", 35, 15000));
        employees.add(new Employee("David", 25, 7000));

        double averageSalary = employees.stream()
               .filter(e -> e.getAge() > 30 && e.getSalary() > 10000)
               .mapToDouble(Employee::getSalary)
               .average()
               .orElse(0);

        System.out.println("Average salary of eligible employees: " + averageSalary);
    }
}

在上述代码中,首先创建了一个包含多个员工信息的列表。然后通过同步流,先使用 filter 方法筛选出符合条件的员工,再使用 mapToDouble 方法提取出这些员工的工资,最后使用 average 方法计算平均工资。整个过程是顺序执行的,在单线程环境下完成。

异步流模式

异步流的执行机制

异步流则打破了同步流顺序执行的模式,它允许流操作在多个线程中并发执行。Java 流提供了 parallelStream 方法来将一个普通流转换为并行流,从而实现异步操作。

当使用并行流时,流中的元素会被分成多个部分,每个部分由不同的线程进行处理。例如,在并行流中执行 filter 操作时,不同的线程可以同时对不同部分的元素进行过滤,然后将过滤后的结果合并起来,再进行后续的 map 等操作。

这种并发执行的方式利用了多核处理器的优势,可以显著提高流处理的性能,尤其是在处理大量数据时。

异步流的性能特点

  1. 优点
    • 高性能:对于大量数据的处理,异步流可以充分利用多核处理器的并行计算能力,大大提高处理速度。例如,在对一个大型文件进行数据处理时,并行流可以将文件内容分成多个部分,同时在多个线程中进行处理,从而缩短整体的处理时间。
  2. 缺点
    • 数据一致性问题:由于异步流是并发执行的,可能会出现数据一致性问题。例如,在对共享资源进行读写操作时,如果没有适当的同步机制,可能会导致数据竞争和不一致。比如在并行流中对一个共享的计数器进行累加操作,如果没有同步,可能会得到错误的结果。
    • 调试困难:异步流的并发执行使得代码的调试变得更加困难。由于多个操作可能同时在不同线程中执行,很难按照顺序跟踪代码的执行过程,定位问题也更加复杂。

异步流代码示例

同样以上面员工信息处理的例子,我们可以将其改为使用异步流(并行流)来实现:

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

class Employee {
    private String name;
    private int age;
    private double salary;

    public Employee(String name, int age, double salary) {
        this.name = name;
        this.age = age;
        this.salary = salary;
    }

    public String getName() {
        return name;
    }

    public int getAge() {
        return age;
    }

    public double getSalary() {
        return salary;
    }
}

public class AsynchronousStreamExample {
    public static void main(String[] args) {
        List<Employee> employees = new ArrayList<>();
        employees.add(new Employee("Alice", 28, 8000));
        employees.add(new Employee("Bob", 32, 12000));
        employees.add(new Employee("Charlie", 35, 15000));
        employees.add(new Employee("David", 25, 7000));

        double averageSalary = employees.parallelStream()
               .filter(e -> e.getAge() > 30 && e.getSalary() > 10000)
               .mapToDouble(Employee::getSalary)
               .average()
               .orElse(0);

        System.out.println("Average salary of eligible employees: " + averageSalary);
    }
}

在上述代码中,只需要将 stream 改为 parallelStream,就将同步流转换为了异步流(并行流)。此时,流操作会在多个线程中并发执行,提高了处理性能。但需要注意的是,如果员工列表非常大,并且操作更加复杂,可能需要考虑数据一致性和线程安全问题,例如使用 Atomic 类型的数据结构来避免数据竞争。

同步与异步流模式选择策略

根据数据量选择

  1. 小数据量:当处理的数据量较小,例如几百条以内的数据,同步流通常是一个不错的选择。因为同步流的代码逻辑简单,易于理解和维护,而且在小数据量情况下,性能差异并不明显。例如,在处理一个简单的配置文件中的少量数据时,使用同步流即可。
  2. 大数据量:对于大数据量的处理,异步流(并行流)具有明显的性能优势。如果数据量达到数万条甚至更多,并且流操作中包含一些耗时的计算或 I/O 操作,使用并行流可以充分利用多核处理器的性能,大大缩短处理时间。比如处理一个包含数百万条交易记录的日志文件时,并行流能显著提高处理效率。

根据操作类型选择

  1. 无状态操作:如果流操作是无状态的,例如简单的 filtermap 操作,并且数据量较大,使用异步流可以安全地提高性能。无状态操作不依赖于之前元素的处理结果,所以在并发执行时不会出现数据一致性问题。例如,从一个大列表中筛选出符合某个简单条件的元素,并对其进行简单的转换操作,使用并行流是很合适的。
  2. 有状态操作:当流操作包含有状态的操作时,如 reduce 操作且涉及到共享可变状态,需要谨慎使用异步流。因为有状态操作在并发执行时可能会导致数据竞争和不一致问题。在这种情况下,可能需要使用合适的同步机制或者考虑使用同步流,以确保数据的正确性。例如,在并行流中对共享的计数器进行累加操作时,就需要使用 AtomicInteger 等线程安全的类型来保证结果的正确性。

根据线程安全要求选择

  1. 严格线程安全要求:如果对数据的一致性和线程安全有严格要求,并且流操作涉及到共享资源的读写,同步流是更可靠的选择。虽然同步流性能可能不如异步流,但能确保数据的正确性,避免并发问题。例如,在金融交易系统中处理账户余额等敏感数据时,通常会选择同步流以保证数据的准确性。
  2. 宽松线程安全要求:如果对线程安全要求相对宽松,并且能通过合理的设计和同步机制来保证数据一致性,异步流可以在提高性能的同时满足需求。例如,在一些数据统计分析场景中,即使偶尔出现微小的数据偏差对结果影响不大,就可以使用异步流来提高处理速度。

同步与异步流模式在实际场景中的应用案例

同步流在文件读取与简单处理中的应用

假设我们有一个文本文件,每行包含一个整数,我们需要读取文件中的所有整数,并计算它们的总和。在这种情况下,由于文件内容可能不是特别大,并且操作相对简单,使用同步流即可。

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.stream.Collectors;

public class SynchronousFileProcessing {
    public static void main(String[] args) {
        String filePath = "numbers.txt";
        try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {
            int sum = reader.lines()
                   .mapToInt(Integer::parseInt)
                   .sum();
            System.out.println("Sum of numbers in the file: " + sum);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

在上述代码中,通过 BufferedReader 读取文件的每一行,然后使用同步流将每一行转换为整数,并计算总和。这种方式简单直接,适合处理相对较小的文件和简单的计算。

异步流在大数据集分析中的应用

在大数据分析场景中,经常需要处理海量的数据。例如,有一个包含数十亿条用户行为记录的日志文件,需要统计不同年龄段用户的平均访问时长。由于数据量巨大,使用异步流(并行流)可以显著提高处理效率。

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

class UserBehavior {
    private int age;
    private long visitDuration;

    public UserBehavior(int age, long visitDuration) {
        this.age = age;
        this.visitDuration = visitDuration;
    }

    public int getAge() {
        return age;
    }

    public long getVisitDuration() {
        return visitDuration;
    }
}

public class AsynchronousBigDataAnalysis {
    public static void main(String[] args) {
        String filePath = "user_behavior_log.txt";
        try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {
            Map<Integer, Map<String, Double>> ageGroupStatistics = reader.lines()
                   .map(line -> {
                        String[] parts = line.split(",");
                        int age = Integer.parseInt(parts[0]);
                        long visitDuration = Long.parseLong(parts[1]);
                        return new UserBehavior(age, visitDuration);
                    })
                   .collect(Collectors.groupingByConcurrent(
                        UserBehavior::getAge,
                        ConcurrentHashMap::new,
                        Collectors.collectingAndThen(
                            Collectors.summarizingLong(UserBehavior::getVisitDuration),
                            summary -> {
                                Map<String, Double> stats = new HashMap<>();
                                stats.put("average", summary.getAverage());
                                return stats;
                            }
                        )
                    ));

            ageGroupStatistics.forEach((age, stats) -> {
                System.out.println("Age group: " + age + ", Average visit duration: " + stats.get("average"));
            });
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

在上述代码中,通过并行流对日志文件中的每一行进行处理,将用户行为数据转换为 UserBehavior 对象,然后使用 groupingByConcurrent 方法按年龄分组,并计算每个年龄组的平均访问时长。这里使用 ConcurrentHashMapcollectingAndThen 等方法来确保在并发处理时的数据一致性和正确性。通过这种方式,能够高效地处理大数据集的复杂分析任务。

同步与异步流模式的优化与调优

同步流优化

  1. 减少中间操作的开销:在同步流中,尽量减少不必要的中间操作。每个中间操作都会产生一定的开销,例如创建新的流对象等。如果可以在一个中间操作中完成多个过滤或转换逻辑,尽量合并操作。例如,原本使用两个 filter 操作进行两次条件过滤,可以合并为一个 filter 操作,将两个条件合并在一个逻辑中。
  2. 合理使用终端操作:选择合适的终端操作可以提高性能。例如,在计算总和时,如果流中的元素是整数类型,使用 mapToInt 后直接调用 sum 方法比使用 map 转换为 Integer 再进行累加效率更高。因为 mapToInt 避免了装箱和拆箱的开销。

异步流调优

  1. 调整并行度:异步流(并行流)的并行度是指同时执行操作的线程数量。可以通过 ForkJoinPool 来调整并行度。默认情况下,并行度等于处理器的核心数。但在某些情况下,根据任务的性质和数据量,可能需要手动调整并行度。例如,如果任务是 I/O 密集型的,适当增加并行度可能会提高性能;如果是 CPU 密集型任务,过高的并行度可能会导致线程竞争加剧,反而降低性能。
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class ParallelStreamTuning {
    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool(4); // 设置并行度为 4
        int sum = forkJoinPool.submit(() ->
                IntStream.range(1, 1000000)
                       .parallel()
                       .mapToObj(i -> i * i)
                       .collect(Collectors.summingInt(Integer::intValue))
        ).join();
        System.out.println("Sum: " + sum);
    }
}
  1. 避免数据竞争:在异步流中,要特别注意避免数据竞争。对于共享可变状态的操作,要使用线程安全的数据结构或同步机制。如前面提到的,在对共享计数器进行累加操作时,使用 AtomicInteger 来保证线程安全。同时,尽量设计无状态的流操作,这样可以避免数据竞争问题,充分发挥异步流的性能优势。

同步与异步流模式与其他并发编程模型的比较

与线程池的比较

  1. 同步流与线程池的同步操作:同步流的操作是在主线程中顺序执行的,类似于单线程的操作方式,与线程池中的单线程执行任务有些类似。但同步流更专注于集合数据的处理,提供了丰富的声明式操作方法,而线程池主要用于管理多个线程执行不同的任务。例如,在同步流中对一个列表进行简单的过滤和映射操作,是在一个线程中顺序完成的;而在线程池中,可以提交一个任务,该任务在单线程中执行类似的列表处理操作,但线程池的任务管理和调度机制更加复杂。
  2. 异步流与线程池的并发操作:异步流(并行流)通过内部的 ForkJoinPool 实现并发操作,与线程池有相似之处,但也有区别。异步流的并行操作是针对集合数据的分割和并行处理,将数据分成多个部分在不同线程中处理,然后合并结果。而线程池可以执行各种类型的任务,包括 I/O 操作、计算任务等,任务之间可能没有直接的数据关联。例如,在异步流中并行处理一个大列表时,数据是按一定规则分割处理的;而在线程池中,可以同时提交多个不同的文件读取任务,这些任务之间的数据相对独立。

与 Future 和 CompletableFuture 的比较

  1. 同步流与 Future 的同步特性:同步流的同步执行特性与 Future 获取结果的同步方式有一定关联。当使用 Future 执行一个任务并获取结果时,如果调用 get 方法,主线程会阻塞直到任务完成,这类似于同步流顺序执行操作,等待每个操作完成后再进行下一个。但 Future 主要用于异步任务的结果获取,而同步流是对集合数据的顺序处理,应用场景有所不同。例如,使用 Future 执行一个数据库查询任务,通过 get 方法获取查询结果时会阻塞主线程;而同步流在处理数据库查询返回的结果集时,是在主线程中顺序进行过滤、映射等操作。
  2. 异步流与 CompletableFuture 的异步特性:异步流和 CompletableFuture 都涉及异步操作。异步流通过并行处理集合数据实现异步,而 CompletableFuture 提供了更灵活的异步任务组合和结果处理方式。CompletableFuture 可以将多个异步任务进行链式调用、组合等操作,更适合复杂的异步业务逻辑。例如,在一个电商系统中,可能使用 CompletableFuture 组合多个异步服务调用,如查询商品信息、获取库存、计算价格等;而异步流更侧重于对大量数据集合的并行处理,如对订单列表进行统计分析。

总结同步与异步流模式的差异与应用要点

同步流和异步流模式在 Java 流处理中各有优劣。同步流适用于数据量较小、对代码逻辑清晰度和数据一致性要求较高的场景,其优点是易于理解和调试,缺点是在大数据量处理时性能可能受限。而异步流(并行流)则适合处理大数据量、对性能要求较高的场景,能充分利用多核处理器的优势,但需要注意数据一致性和线程安全问题,调试也相对复杂。

在实际应用中,要根据数据量、操作类型、线程安全要求等因素综合选择合适的模式。对于小数据量和简单操作,优先选择同步流;对于大数据量和复杂计算,考虑使用异步流,并通过合理的优化和调优来提高性能和保证数据正确性。同时,要清楚同步与异步流模式与其他并发编程模型的差异,以便在不同的业务场景中选择最合适的技术方案,实现高效、可靠的程序开发。