MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 中 CompletableFuture 异步任务执行状态监控

2022-01-021.6k 阅读

Java 中 CompletableFuture 异步任务执行状态监控

在现代的 Java 应用开发中,异步编程变得越来越重要,尤其是在处理 I/O 密集型任务或者需要提高系统响应性能的场景下。CompletableFuture 作为 Java 8 引入的一个强大工具,为异步编程提供了丰富的功能。其中,对异步任务执行状态的监控是开发者在实际应用中常常需要处理的关键需求。

CompletableFuture 基础回顾

CompletableFuture 实现了 Future 接口,同时提供了更为灵活和强大的异步编程能力。它允许我们以链式调用的方式处理异步操作,并且支持多种异步任务的组合和转换。

创建 CompletableFuture

  1. 使用 supplyAsync 创建有返回值的异步任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟耗时操作
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "任务完成";
});

在上述代码中,supplyAsync 方法接受一个 Supplier 作为参数,该 Supplierget 方法返回异步任务的执行结果。supplyAsync 方法会在一个默认的 ForkJoinPool.commonPool() 线程池中异步执行任务。

  1. 使用 runAsync 创建无返回值的异步任务
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
    // 模拟耗时操作
    try {
        Thread.sleep(1500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("无返回值任务完成");
});

runAsync 方法接受一个 Runnable 作为参数,用于执行无返回值的异步任务。同样,它也在默认线程池中执行。

获取异步任务结果

  1. 使用 get 方法获取结果
try {
    String result = future.get();
    System.out.println("任务结果: " + result);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

get 方法会阻塞当前线程,直到异步任务完成并返回结果。如果任务执行过程中抛出异常,get 方法会将异常包装成 ExecutionException 重新抛出,同时,InterruptedException 用于处理线程在等待过程中被中断的情况。

  1. 使用 get(long timeout, TimeUnit unit) 方法设置超时获取结果
try {
    String result = future.get(3, TimeUnit.SECONDS);
    System.out.println("任务结果: " + result);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
    e.printStackTrace();
}

此方法在指定的超时时间内等待任务完成,如果超时任务仍未完成,则抛出 TimeoutException

CompletableFuture 异步任务状态

CompletableFuture 的异步任务存在多种状态,了解这些状态对于监控任务执行过程至关重要。

未完成状态(Pending)

CompletableFuture 对象被创建但任务尚未开始执行或者正在执行过程中,任务处于未完成状态。在这个状态下,调用 isDone() 方法会返回 false

已完成状态(Completed)

  1. 正常完成:当异步任务成功执行并返回结果时,任务进入正常完成状态。此时,isDone() 方法返回 trueisCompletedNormally() 方法也返回 true
CompletableFuture<String> normalFuture = CompletableFuture.supplyAsync(() -> "正常完成结果");
try {
    Thread.sleep(1000);
    System.out.println("isDone: " + normalFuture.isDone());
    System.out.println("isCompletedNormally: " + normalFuture.isCompletedNormally());
} catch (InterruptedException e) {
    e.printStackTrace();
}
  1. 异常完成:如果异步任务在执行过程中抛出异常,任务进入异常完成状态。同样,isDone() 方法返回 true,但 isCompletedExceptionally() 方法返回 trueisCompletedNormally() 方法返回 false
CompletableFuture<String> exceptionFuture = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("任务执行异常");
});
try {
    Thread.sleep(1000);
    System.out.println("isDone: " + exceptionFuture.isDone());
    System.out.println("isCompletedExceptionally: " + exceptionFuture.isCompletedExceptionally());
    System.out.println("isCompletedNormally: " + exceptionFuture.isCompletedNormally());
} catch (InterruptedException e) {
    e.printStackTrace();
}

监控 CompletableFuture 异步任务状态

  1. 使用 whenComplete 方法监控任务完成情况 whenComplete 方法会在异步任务完成(无论是正常完成还是异常完成)时执行一个回调函数。
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "任务完成";
});
future3.whenComplete((result, exception) -> {
    if (exception == null) {
        System.out.println("任务正常完成,结果: " + result);
    } else {
        System.out.println("任务异常完成,异常信息: " + exception.getMessage());
    }
});

在上述代码中,whenComplete 的回调函数接受两个参数,result 为任务正常完成时的返回值,如果任务异常完成则为 nullexception 为任务执行过程中抛出的异常,如果任务正常完成则为 null

  1. 使用 exceptionally 方法处理异常情况 exceptionally 方法专门用于处理异步任务执行过程中抛出的异常,并返回一个替代结果。
CompletableFuture<String> future4 = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("任务执行异常");
});
CompletableFuture<String> resultFuture = future4.exceptionally(exception -> {
    System.out.println("捕获到异常: " + exception.getMessage());
    return "默认结果";
});
try {
    String result = resultFuture.get();
    System.out.println("最终结果: " + result);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

在这个例子中,当 future4 任务抛出异常时,exceptionally 方法中的回调函数会被执行,返回一个默认结果,避免了因为异常导致程序中断。

  1. 使用 handle 方法同时处理正常和异常情况 handle 方法与 whenComplete 类似,但它可以返回一个新的 CompletableFuture 对象,用于进一步处理任务结果或异常。
CompletableFuture<String> future5 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "任务完成";
});
CompletableFuture<String> newFuture = future5.handle((result, exception) -> {
    if (exception == null) {
        return "处理后的正常结果: " + result;
    } else {
        return "处理后的异常结果: " + exception.getMessage();
    }
});
try {
    String newResult = newFuture.get();
    System.out.println("新的结果: " + newResult);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

handle 方法的回调函数同样接受任务结果和异常作为参数,并返回一个新的值,这个值会成为新的 CompletableFuture 对象的结果。

CompletableFuture 任务链中的状态监控

在实际应用中,我们常常需要将多个异步任务链接起来,形成一个任务链。在任务链中监控每个任务的执行状态同样重要。

任务链的创建

  1. 使用 thenApply 方法进行任务转换 thenApply 方法用于将一个异步任务的结果作为参数传递给下一个函数,并返回一个新的异步任务。
CompletableFuture<String> future6 = CompletableFuture.supplyAsync(() -> "初始任务结果");
CompletableFuture<String> newFuture2 = future6.thenApply(result -> {
    return "转换后的结果: " + result;
});
try {
    String finalResult = newFuture2.get();
    System.out.println("最终结果: " + finalResult);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

在上述代码中,thenApply 方法接受一个 Function,该 Function 的输入为上一个任务的结果,输出为新的任务结果。

  1. 使用 thenCompose 方法进行任务组合 thenCompose 方法与 thenApply 类似,但它接受的函数返回的是一个 CompletableFuture 对象,用于更复杂的任务组合。
CompletableFuture<String> future7 = CompletableFuture.supplyAsync(() -> "初始任务结果");
CompletableFuture<String> newFuture3 = future7.thenCompose(result -> {
    return CompletableFuture.supplyAsync(() -> "组合后的结果: " + result);
});
try {
    String finalResult = newFuture3.get();
    System.out.println("最终结果: " + finalResult);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

这里,thenCompose 方法会将上一个任务的结果传递给返回 CompletableFuture 的函数,并将其结果作为新的异步任务的结果。

任务链中的状态监控

  1. 在任务链中使用 whenComplete 等方法 我们可以在任务链的每个环节使用 whenCompleteexceptionally 等方法来监控任务状态。
CompletableFuture<String> future8 = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("初始任务异常");
});
CompletableFuture<String> newFuture4 = future8
      .exceptionally(exception -> {
            System.out.println("捕获初始任务异常: " + exception.getMessage());
            return "默认初始结果";
        })
      .thenApply(result -> {
            return "转换后的结果: " + result;
        })
      .whenComplete((finalResult, exception) -> {
            if (exception == null) {
                System.out.println("最终任务正常完成,结果: " + finalResult);
            } else {
                System.out.println("最终任务异常完成,异常信息: " + exception.getMessage());
            }
        });
try {
    String result = newFuture4.get();
    System.out.println("最终结果: " + result);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

在这个例子中,我们首先使用 exceptionally 方法处理初始任务抛出的异常,然后使用 thenApply 方法进行结果转换,最后使用 whenComplete 方法监控整个任务链的最终完成状态。

多 CompletableFuture 任务组合的状态监控

在实际场景中,我们可能需要同时执行多个异步任务,并监控它们的组合状态。

使用 allOf 方法等待所有任务完成

allOf 方法接受多个 CompletableFuture 对象作为参数,返回一个新的 CompletableFuture,该新任务会在所有传入的任务都完成时完成。

CompletableFuture<String> future9 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "任务1完成";
});
CompletableFuture<String> future10 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "任务2完成";
});
CompletableFuture<Void> allFuture = CompletableFuture.allOf(future9, future10);
allFuture.whenComplete((v, exception) -> {
    if (exception == null) {
        try {
            System.out.println("任务1结果: " + future9.get());
            System.out.println("任务2结果: " + future10.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    } else {
        System.out.println("任务组合异常: " + exception.getMessage());
    }
});
try {
    allFuture.get();
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

在上述代码中,allOf 方法返回的 allFuture 任务会在 future9future10 都完成后完成。我们通过 whenComplete 方法监控任务组合的完成状态,并获取每个任务的结果。

使用 anyOf 方法等待任一任务完成

anyOf 方法同样接受多个 CompletableFuture 对象作为参数,返回的新 CompletableFuture 会在任一传入的任务完成时完成。

CompletableFuture<String> future11 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(2500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "任务3完成";
});
CompletableFuture<String> future12 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "任务4完成";
});
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future11, future12);
anyFuture.whenComplete((result, exception) -> {
    if (exception == null) {
        System.out.println("首先完成的任务结果: " + result);
    } else {
        System.out.println("任务组合异常: " + exception.getMessage());
    }
});
try {
    anyFuture.get();
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

这里,anyFuture 任务会在 future11future12 中任一任务完成时完成,我们通过 whenComplete 方法获取首先完成的任务结果。

自定义线程池执行 CompletableFuture 任务及状态监控

默认情况下,CompletableFuture 使用 ForkJoinPool.commonPool() 线程池执行任务。但在实际应用中,我们可能需要使用自定义线程池来满足特定的需求,同时也需要关注在自定义线程池下的任务状态监控。

创建自定义线程池

ExecutorService executor = Executors.newFixedThreadPool(5);

上述代码创建了一个固定大小为 5 的线程池。

使用自定义线程池执行 CompletableFuture 任务

CompletableFuture<String> future13 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "使用自定义线程池的任务完成";
}, executor);

在这个例子中,supplyAsync 方法的第二个参数传入了自定义的线程池 executor,这样任务就会在自定义线程池中执行。

自定义线程池下的状态监控

在自定义线程池下,任务状态监控的方式与默认线程池下基本相同。

future13.whenComplete((result, exception) -> {
    if (exception == null) {
        System.out.println("任务正常完成,结果: " + result);
    } else {
        System.out.println("任务异常完成,异常信息: " + exception.getMessage());
    }
});
try {
    String result = future13.get();
    System.out.println("最终结果: " + result);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
// 关闭线程池
executor.shutdown();

我们依然可以使用 whenComplete 等方法监控任务状态,同时在任务执行完毕后,需要手动关闭自定义线程池以释放资源。

实际应用场景中的 CompletableFuture 异步任务状态监控

  1. Web 应用中的异步请求处理 在一个 Web 应用中,可能需要同时调用多个外部 API 获取数据,然后将这些数据进行整合返回给客户端。每个 API 调用可以作为一个 CompletableFuture 异步任务。
// 模拟调用外部 API 获取用户信息
CompletableFuture<String> userInfoFuture = CompletableFuture.supplyAsync(() -> {
    // 模拟 API 调用延迟
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "用户信息";
});
// 模拟调用外部 API 获取订单信息
CompletableFuture<String> orderInfoFuture = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "订单信息";
});
CompletableFuture<Void> allFuture2 = CompletableFuture.allOf(userInfoFuture, orderInfoFuture);
allFuture2.whenComplete((v, exception) -> {
    if (exception == null) {
        try {
            String userInfo = userInfoFuture.get();
            String orderInfo = orderInfoFuture.get();
            System.out.println("整合后返回给客户端的数据: " + userInfo + " " + orderInfo);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    } else {
        System.out.println("获取数据异常: " + exception.getMessage());
    }
});
try {
    allFuture2.get();
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

通过监控这些异步任务的状态,我们可以及时处理异常情况,确保应用的稳定性和可靠性。

  1. 数据处理和计算密集型任务 在大数据处理场景中,可能需要对大量数据进行并行计算。每个计算任务可以封装为 CompletableFuture
// 模拟数据计算任务
CompletableFuture<Integer> calculationFuture1 = CompletableFuture.supplyAsync(() -> {
    int sum = 0;
    for (int i = 0; i < 1000000; i++) {
        sum += i;
    }
    return sum;
});
CompletableFuture<Integer> calculationFuture2 = CompletableFuture.supplyAsync(() -> {
    int product = 1;
    for (int i = 1; i <= 10; i++) {
        product *= i;
    }
    return product;
});
CompletableFuture<Void> allCalculationFuture = CompletableFuture.allOf(calculationFuture1, calculationFuture2);
allCalculationFuture.whenComplete((v, exception) -> {
    if (exception == null) {
        try {
            int sum = calculationFuture1.get();
            int product = calculationFuture2.get();
            System.out.println("计算结果: 总和 " + sum + " 乘积 " + product);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    } else {
        System.out.println("计算任务异常: " + exception.getMessage());
    }
});
try {
    allCalculationFuture.get();
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

通过监控这些任务的状态,我们可以了解计算任务的执行情况,及时发现并处理可能出现的问题。

总结 CompletableFuture 异步任务状态监控要点

  1. 熟悉任务状态:深入理解 CompletableFuture 的未完成、正常完成和异常完成等状态,以及对应的判断方法(isDoneisCompletedNormallyisCompletedExceptionally)。
  2. 掌握监控方法:熟练运用 whenCompleteexceptionallyhandle 等方法监控任务完成情况和处理异常,并且要注意这些方法在任务链和任务组合中的使用。
  3. 关注线程池:无论是使用默认线程池还是自定义线程池,都要确保在任务执行完毕后正确处理线程池资源,同时监控任务在不同线程池环境下的执行状态。
  4. 实际应用结合:在实际的 Web 应用、数据处理等场景中,合理运用 CompletableFuture 异步任务状态监控,提高应用的性能和稳定性。

通过以上对 CompletableFuture 异步任务执行状态监控的详细介绍,开发者可以更好地利用这一强大的异步编程工具,编写出高效、可靠的 Java 应用程序。在实际开发中,不断实践和总结经验,能够更加熟练地运用这些技术来满足复杂的业务需求。同时,随着 Java 技术的不断发展,CompletableFuture 可能会有更多的特性和优化,开发者需要持续关注和学习,以保持技术的先进性。