MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java CompletableFuture并行处理在分布式系统中的应用

2024-03-032.7k 阅读

Java CompletableFuture 基础概述

在Java 8引入CompletableFuture之前,处理异步任务相对繁琐。传统的Future模式虽然能实现异步操作,但存在局限性,例如无法主动获取任务完成状态,只能被动等待。CompletableFuture的出现,为Java开发者提供了更强大、灵活的异步编程模型。

CompletableFuture实现了Future和CompletionStage接口。Future接口主要用于获取异步任务的结果,而CompletionStage接口定义了一系列方法,用于对异步任务进行链式调用、组合等操作。

创建CompletableFuture

  1. 使用supplyAsync方法创建有返回值的CompletableFuture
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟异步任务
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "任务完成";
});

在上述代码中,supplyAsync方法接受一个Supplier函数式接口作为参数,该接口定义的get方法返回异步任务的结果。这里通过Thread.sleep模拟了一个耗时2秒的异步任务。

  1. 使用runAsync方法创建无返回值的CompletableFuture
CompletableFuture<Void> futureVoid = CompletableFuture.runAsync(() -> {
    // 模拟异步任务
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("无返回值任务完成");
});

runAsync方法接受一个Runnable接口作为参数,Runnable接口的run方法中定义异步执行的逻辑,该方法返回的CompletableFuture没有返回值(类型为Void)。

获取CompletableFuture的结果

  1. 使用get方法获取结果(阻塞方式)
try {
    String result = future.get();
    System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

get方法会阻塞当前线程,直到CompletableFuture任务完成并返回结果。如果任务抛出异常,get方法会抛出ExecutionException,并将原始异常封装在其中。InterruptedException则是在等待过程中线程被中断时抛出。

  1. 使用getNow方法获取结果(非阻塞方式)
String resultNow = future.getNow(null);
System.out.println(resultNow);

getNow方法不会阻塞线程。如果任务已经完成,返回任务结果;否则返回传入的默认值(这里为null)。

处理CompletableFuture的完成状态

  1. 使用whenComplete方法
future.whenComplete((result, ex) -> {
    if (ex == null) {
        System.out.println("任务成功:" + result);
    } else {
        System.out.println("任务失败:" + ex.getMessage());
    }
});

whenComplete方法接受一个BiConsumer,在任务完成时(无论成功或失败),会执行该BiConsumer。第一个参数为任务结果,第二个参数为异常(如果任务成功完成,异常为null)。

  1. 使用exceptionally方法处理异常
CompletableFuture<String> futureWithException = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("模拟异常");
});
CompletableFuture<String> handledFuture = futureWithException.exceptionally(ex -> {
    System.out.println("捕获到异常:" + ex.getMessage());
    return "默认结果";
});
try {
    String result = handledFuture.get();
    System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

exceptionally方法在任务抛出异常时,会执行传入的Function,该Function返回一个替代结果,从而避免异常向上传播。

CompletableFuture并行处理

在分布式系统中,并行处理多个任务可以显著提高系统的性能和响应速度。CompletableFuture提供了丰富的方法来实现并行处理。

并行执行多个任务并获取所有结果

  1. 使用allOf方法
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "任务1完成";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "任务2完成";
});
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);
allFutures.join();
try {
    String result1 = future1.get();
    String result2 = future2.get();
    System.out.println(result1);
    System.out.println(result2);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

allOf方法接受多个CompletableFuture作为参数,返回一个新的CompletableFuture。当所有传入的CompletableFuture都完成时,这个新的CompletableFuture才会完成。通过join方法等待所有任务完成,然后分别获取每个任务的结果。

  1. 使用thenCombine方法
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "任务3";
});
CompletableFuture<String> future4 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "任务4";
});
CompletableFuture<String> combinedFuture = future3.thenCombine(future4, (r1, r2) -> r1 + " 和 " + r2 + " 都完成");
try {
    String result = combinedFuture.get();
    System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

thenCombine方法将两个CompletableFuture的结果合并,它接受另一个CompletableFuture和一个BiFunction作为参数。当这两个CompletableFuture都完成时,会执行BiFunction,将两个任务的结果作为参数传入,返回合并后的结果。

并行执行多个任务并获取第一个完成的结果

使用anyOf方法:

CompletableFuture<String> future5 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "任务5完成";
});
CompletableFuture<String> future6 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "任务6完成";
});
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future5, future6);
try {
    Object result = anyFuture.get();
    System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

anyOf方法接受多个CompletableFuture作为参数,返回一个新的CompletableFuture。当其中任何一个CompletableFuture完成时,这个新的CompletableFuture就会完成,并且其结果就是第一个完成的CompletableFuture的结果。

在分布式系统中的应用场景

微服务间数据聚合

在分布式系统中,通常由多个微服务提供不同的数据。例如,一个电商系统可能有商品微服务、库存微服务、价格微服务等。当需要展示商品详情时,可能需要从多个微服务获取数据并聚合。

// 模拟商品微服务调用
CompletableFuture<String> productServiceCall = CompletableFuture.supplyAsync(() -> {
    // 实际调用商品微服务接口
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "商品信息";
});
// 模拟库存微服务调用
CompletableFuture<String> inventoryServiceCall = CompletableFuture.supplyAsync(() -> {
    // 实际调用库存微服务接口
    try {
        Thread.sleep(1500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "库存信息";
});
// 模拟价格微服务调用
CompletableFuture<String> priceServiceCall = CompletableFuture.supplyAsync(() -> {
    // 实际调用价格微服务接口
    try {
        Thread.sleep(2500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "价格信息";
});
CompletableFuture<Void> allServiceCalls = CompletableFuture.allOf(productServiceCall, inventoryServiceCall, priceServiceCall);
allServiceCalls.join();
try {
    String productInfo = productServiceCall.get();
    String inventoryInfo = inventoryServiceCall.get();
    String priceInfo = priceServiceCall.get();
    String aggregatedInfo = productInfo + "," + inventoryInfo + "," + priceInfo;
    System.out.println(aggregatedInfo);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

通过CompletableFuture并行调用多个微服务,然后等待所有调用完成并聚合数据,大大提高了数据获取的效率。

分布式任务调度

在分布式系统中,可能需要在多个节点上执行一些任务,例如数据清理、日志压缩等。可以利用CompletableFuture实现任务的并行调度。

// 模拟在不同节点执行任务
List<CompletableFuture<String>> nodeTasks = new ArrayList<>();
for (int i = 0; i < 5; i++) {
    int finalI = i;
    CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> {
        // 模拟在节点上执行任务
        try {
            Thread.sleep((long) (Math.random() * 3000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "节点 " + finalI + " 任务完成";
    });
    nodeTasks.add(task);
}
CompletableFuture<Void> allNodeTasks = CompletableFuture.allOf(nodeTasks.toArray(new CompletableFuture[0]));
allNodeTasks.join();
for (CompletableFuture<String> task : nodeTasks) {
    try {
        System.out.println(task.get());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

上述代码创建了多个在不同节点执行任务的CompletableFuture,并使用allOf方法等待所有任务完成,实现了分布式任务的并行调度。

容错处理

在分布式系统中,节点故障或网络问题可能导致任务失败。CompletableFuture提供的异常处理机制可以在任务失败时进行容错处理。

// 模拟可能失败的微服务调用
CompletableFuture<String> faultyServiceCall = CompletableFuture.supplyAsync(() -> {
    if (Math.random() < 0.5) {
        throw new RuntimeException("模拟服务故障");
    }
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "服务正常响应";
});
CompletableFuture<String> handledFaultyCall = faultyServiceCall.exceptionally(ex -> {
    System.out.println("服务调用失败:" + ex.getMessage());
    return "默认响应";
});
try {
    String result = handledFaultyCall.get();
    System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

通过exceptionally方法,当微服务调用失败时,可以返回默认响应,避免整个业务流程因局部故障而中断。

性能优化与注意事项

线程池的合理使用

CompletableFuture默认使用ForkJoinPool.commonPool()作为线程池来执行异步任务。在高并发场景下,可能会出现线程资源不足的情况。可以自定义线程池来优化性能。

ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> futureWithCustomExecutor = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "使用自定义线程池的任务";
}, executor);
try {
    String result = futureWithCustomExecutor.get();
    System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
} finally {
    executor.shutdown();
}

在上述代码中,创建了一个固定大小为10的线程池,并将其作为参数传递给supplyAsync方法。这样可以根据业务需求合理分配线程资源,提高系统的并发处理能力。

避免死锁

在使用CompletableFuture进行链式调用和任务组合时,要注意避免死锁。例如,以下代码可能会导致死锁:

CompletableFuture<String> futureA = new CompletableFuture<>();
CompletableFuture<String> futureB = futureA.thenApplyAsync(result -> {
    // 这里可能会出现死锁,因为futureA还未完成
    return "处理结果";
});
futureA.complete("初始结果");
try {
    String result = futureB.get();
    System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

在这个例子中,futureB依赖futureA的结果,但如果futureAthenApplyAsync调用时还未完成,就可能导致死锁。要避免这种情况,确保任务之间的依赖关系合理,并且及时完成前置任务。

异常处理的完整性

在处理CompletableFuture的异常时,要确保异常处理的完整性。例如,在链式调用中,如果一个环节没有处理异常,异常可能会向上传播,导致整个流程中断。

CompletableFuture<String> futureChain = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("第一步异常");
}).thenApply(result -> {
    // 这里没有处理第一步的异常,异常会继续传播
    return result + " 处理后";
});
futureChain.exceptionally(ex -> {
    System.out.println("捕获到异常:" + ex.getMessage());
    return "默认结果";
});
try {
    String result = futureChain.get();
    System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

在上述代码中,thenApply方法没有处理第一步抛出的异常,异常会传播到get方法。因此,在链式调用中,每个可能产生异常的步骤都应该考虑适当的异常处理,以保证系统的稳定性。

与其他异步框架的比较

与Guava的ListenableFuture比较

Guava的ListenableFuture也是一个异步编程框架,提供了类似的异步任务处理功能。与CompletableFuture相比,CompletableFuture的API更加丰富和简洁。例如,CompletableFuture的链式调用和组合方法(如thenApplythenCombine等)使得代码更加易读和可维护。而Guava的ListenableFuture通常需要使用Futures.addCallback等方法来处理异步结果,代码结构相对复杂。

// Guava的ListenableFuture示例
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<String> guavaFuture = executorService.submit(() -> {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Guava任务完成";
});
Futures.addCallback(guavaFuture, new FutureCallback<String>() {
    @Override
    public void onSuccess(String result) {
        System.out.println(result);
    }

    @Override
    public void onFailure(Throwable t) {
        System.out.println("任务失败:" + t.getMessage());
    }
});

对比CompletableFuture的whenComplete方法,代码结构上CompletableFuture更简洁。

与RxJava比较

RxJava是一个基于观察者模式的异步编程框架,功能强大且灵活。与CompletableFuture相比,RxJava更侧重于事件流的处理,适用于复杂的异步场景,如事件驱动的编程、数据变换和组合等。而CompletableFuture更专注于单个异步任务的处理和组合。在简单的异步任务并行处理场景下,CompletableFuture的API相对更轻量级和易于理解。

// RxJava示例
Observable.just("RxJava任务")
      .map(s -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return s + " 处理后";
        })
      .subscribe(result -> System.out.println(result),
                error -> System.out.println("任务失败:" + error.getMessage()));

从代码示例可以看出,RxJava通过链式调用的方式处理事件流,与CompletableFuture处理异步任务的方式有所不同。

总结

CompletableFuture为Java开发者在分布式系统中实现并行处理提供了强大而灵活的工具。通过合理使用其创建、组合、异常处理等方法,可以有效地提高系统的性能和响应速度。在应用过程中,要注意线程池的合理配置、避免死锁以及确保异常处理的完整性。与其他异步框架相比,CompletableFuture在简单异步任务并行处理场景下具有一定的优势。随着分布式系统的不断发展,CompletableFuture将在更多的实际项目中发挥重要作用。无论是微服务间的数据聚合,还是分布式任务调度,CompletableFuture都能为开发者提供高效的解决方案。