MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java中的CompletableFuture与协程式异步编程

2022-12-204.0k 阅读

Java中的CompletableFuture基础

在Java的后端开发网络编程中,异步编程是提升程序性能和响应能力的关键技术。CompletableFuture作为Java 8引入的重要类,极大地简化了异步编程模型。它实现了FutureCompletionStage接口,不仅能获取异步操作的结果,还支持链式调用、组合多个异步操作等强大功能。

创建CompletableFuture

  1. 使用CompletableFuture.supplyAsync创建有返回值的异步任务

    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class CompletableFutureExample {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                // 模拟耗时操作
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "Hello, CompletableFuture!";
            });
            String result = future.get();
            System.out.println(result);
        }
    }
    

    在上述代码中,CompletableFuture.supplyAsync接受一个Supplier接口的实现,这里是一个Lambda表达式。它会在一个新的线程中执行这个Supplierget方法,并返回一个CompletableFuture对象。通过future.get()方法可以获取异步操作的结果,不过get方法是阻塞的,直到异步任务完成。

  2. 使用CompletableFuture.runAsync创建无返回值的异步任务

    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class CompletableFutureNoReturnExample {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                // 模拟耗时操作
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("This is a no - return CompletableFuture task.");
            });
            future.get();
        }
    }
    

    CompletableFuture.runAsync接受一个Runnable接口的实现,同样在新线程中执行。由于Runnable没有返回值,所以CompletableFuture的泛型为Void

获取异步任务结果

  1. get方法

    • get方法会阻塞当前线程,直到CompletableFuture完成并返回结果。如果异步任务抛出异常,get方法会将异常包装成ExecutionExceptionInterruptedException抛出。
    • 如前面示例中String result = future.get();,主线程会等待future任务完成并返回结果。
  2. get(long timeout, TimeUnit unit)方法

    • 该方法允许设置一个超时时间。如果在指定的时间内CompletableFuture没有完成,会抛出TimeoutException
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    
    public class CompletableFutureTimeoutExample {
        public static void main(String[] args) {
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                // 模拟耗时操作
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "Result";
            });
            try {
                String result = future.get(2, TimeUnit.SECONDS);
                System.out.println(result);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
    

    在这个示例中,设置了2秒的超时时间,而异步任务需要3秒完成,所以会抛出TimeoutException

  3. join方法

    • join方法与get方法类似,也是阻塞当前线程获取结果。不同的是,如果异步任务抛出异常,join方法会直接抛出原始异常,而不是包装成ExecutionExceptionInterruptedException
    import java.util.concurrent.CompletableFuture;
    
    public class CompletableFutureJoinExample {
        public static void main(String[] args) {
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                throw new RuntimeException("Task failed");
            });
            try {
                String result = future.join();
                System.out.println(result);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    这里异步任务抛出了RuntimeExceptionjoin方法会直接抛出这个异常,在catch块中可以捕获到原始异常。

  4. getNow(T valueIfAbsent)方法

    • 如果CompletableFuture已经完成,getNow方法返回结果;如果尚未完成,返回传入的valueIfAbsent参数。
    import java.util.concurrent.CompletableFuture;
    
    public class CompletableFutureGetNowExample {
        public static void main(String[] args) {
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "Completed";
            });
            String result1 = future.getNow("Not completed yet");
            System.out.println(result1);
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            String result2 = future.getNow("Not completed yet");
            System.out.println(result2);
        }
    }
    

    第一次调用getNow时,异步任务尚未完成,所以返回"Not completed yet";第二次调用时,异步任务已完成,返回"Completed"

  5. complete(T value)方法

    • 可以手动设置CompletableFuture的结果。如果CompletableFuture已经完成,调用complete方法不会生效。
    import java.util.concurrent.CompletableFuture;
    
    public class CompletableFutureCompleteExample {
        public static void main(String[] args) {
            CompletableFuture<String> future = new CompletableFuture<>();
            new Thread(() -> {
                try {
                    Thread.sleep(2000);
                    future.complete("Manually completed");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
            try {
                String result = future.get();
                System.out.println(result);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
    

    在这个例子中,通过complete方法手动设置了CompletableFuture的结果,主线程通过get方法获取这个结果。

CompletableFuture的链式调用

CompletableFuture的强大之处在于其支持链式调用,能够方便地对异步任务进行组合和转换。

thenApply方法

thenApply方法用于对CompletableFuture的结果进行转换,它接受一个Function接口的实现。

import java.util.concurrent.CompletableFuture;

public class CompletableFutureThenApplyExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> "Hello")
               .thenApply(s -> s + ", World")
               .thenApply(String::toUpperCase)
               .thenAccept(System.out::println);
    }
}

在上述代码中,首先通过CompletableFuture.supplyAsync创建一个异步任务返回"Hello"。接着使用thenApply方法,第一个thenApply将字符串转换为"Hello, World",第二个thenApply将其转换为大写"HELLO, WORLD",最后通过thenAccept方法消费这个结果并打印。

thenAccept方法

thenAccept方法用于消费CompletableFuture的结果,它接受一个Consumer接口的实现,没有返回值。

import java.util.concurrent.CompletableFuture;

public class CompletableFutureThenAcceptExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> "Result")
               .thenAccept(System.out::println);
    }
}

这里CompletableFuture完成后,thenAccept中的Consumer会处理返回的结果,在这个例子中就是将结果打印出来。

thenRun方法

thenRun方法在CompletableFuture完成后执行一个Runnable任务,不关心CompletableFuture的结果。

import java.util.concurrent.CompletableFuture;

public class CompletableFutureThenRunExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> "Some result")
               .thenRun(() -> System.out.println("Task completed, but I don't care about the result"));
    }
}

异步任务完成后,thenRun中的Runnable任务会被执行,打印出相应的信息。

handle方法

handle方法会在CompletableFuture完成时(无论成功还是失败)执行,它接受一个BiFunction接口的实现,该接口的第一个参数是CompletableFuture的结果,第二个参数是异常(如果有)。

import java.util.concurrent.CompletableFuture;

public class CompletableFutureHandleExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5) {
                return "Success";
            } else {
                throw new RuntimeException("Failure");
            }
        }).handle((result, ex) -> {
            if (ex != null) {
                System.out.println("Exception occurred: " + ex.getMessage());
                return "Default value";
            } else {
                System.out.println("Result: " + result);
                return result;
            }
        }).thenAccept(System.out::println);
    }
}

在这个例子中,根据随机数决定是否抛出异常。handle方法会处理任务的结果或异常,并返回一个新的值,这个值会传递给后续的thenAccept方法。

exceptionally方法

exceptionally方法用于处理CompletableFuture中的异常,它接受一个Function接口的实现,该接口的参数是异常对象,返回值是用于替代异常情况的结果。

import java.util.concurrent.CompletableFuture;

public class CompletableFutureExceptionallyExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("Task failed");
        }).exceptionally(ex -> {
            System.out.println("Caught exception: " + ex.getMessage());
            return "Default result";
        }).thenAccept(System.out::println);
    }
}

当异步任务抛出异常时,exceptionally中的Function会被调用,打印异常信息并返回一个默认结果,这个结果会被后续的thenAccept方法处理。

CompletableFuture的组合操作

在实际应用中,常常需要组合多个异步任务。CompletableFuture提供了丰富的方法来实现这一需求。

thenCombine方法

thenCombine方法用于将两个CompletableFuture的结果进行合并,它接受另一个CompletableFuture和一个BiFunction接口的实现。

import java.util.concurrent.CompletableFuture;

public class CompletableFutureThenCombineExample {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
        CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (s1, s2) -> s1 + ", " + s2);
        combinedFuture.thenAccept(System.out::println);
    }
}

在这个例子中,future1future2是两个异步任务,thenCombine方法将它们的结果合并成一个新的字符串"Hello, World",并通过thenAccept方法打印出来。

applyToEither方法

applyToEither方法表示两个CompletableFuture中只要有一个完成,就对其结果应用一个Function

import java.util.concurrent.CompletableFuture;

public class CompletableFutureApplyToEitherExample {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Future 1 result";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Future 2 result";
        });
        CompletableFuture<String> resultFuture = future1.applyToEither(future2, s -> s + " processed");
        resultFuture.thenAccept(System.out::println);
    }
}

这里future2会先完成,所以applyToEither方法会对future2的结果应用Function,打印出"Future 2 result processed"

runAfterEither方法

runAfterEither方法在两个CompletableFuture中任意一个完成后执行一个Runnable任务。

import java.util.concurrent.CompletableFuture;

public class CompletableFutureRunAfterEitherExample {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Future 1 result";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Future 2 result";
        });
        CompletableFuture<Void> voidFuture = future1.runAfterEither(future2, () -> System.out.println("One of the futures completed"));
        try {
            voidFuture.get();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

future2先完成时,runAfterEither中的Runnable任务会被执行,打印出相应信息。

allOf方法

allOf方法用于等待所有CompletableFuture都完成。它返回一个新的CompletableFuture<Void>,当所有传入的CompletableFuture都完成时,这个新的CompletableFuture才完成。

import java.util.concurrent.CompletableFuture;

public class CompletableFutureAllOfExample {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Future 1 result";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Future 2 result";
        });
        CompletableFuture<Void> allFuture = CompletableFuture.allOf(future1, future2);
        allFuture.thenRun(() -> {
            try {
                System.out.println(future1.get());
                System.out.println(future2.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).join();
    }
}

在这个例子中,allOf方法返回的CompletableFuture<Void>会在future1future2都完成后才完成。然后通过thenRun方法获取并打印两个CompletableFuture的结果。

anyOf方法

anyOf方法返回一个新的CompletableFuture,当传入的多个CompletableFuture中任意一个完成时,这个新的CompletableFuture就完成,并且其结果就是第一个完成的CompletableFuture的结果。

import java.util.concurrent.CompletableFuture;

public class CompletableFutureAnyOfExample {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Future 1 result";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Future 2 result";
        });
        CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);
        anyFuture.thenAccept(System.out::println).join();
    }
}

由于future2先完成,所以anyFuture的结果就是future2的结果,会打印出"Future 2 result"

协程式异步编程概念

协程是一种轻量级的线程,与传统线程不同,协程的调度由用户空间控制,而不是操作系统内核。在Java中,虽然没有原生的协程支持,但通过一些框架如Quasar可以实现协程式异步编程。

协程的优势

  1. 轻量级:协程的创建和销毁开销比线程小得多。一个应用程序可以创建数以万计的协程,而创建过多的线程会导致资源耗尽。
  2. 非抢占式调度:协程的调度是协作式的,即一个协程在执行过程中可以主动让出执行权,而不像线程那样由操作系统强制抢占。这使得编程模型更加可控,避免了线程切换带来的开销和资源竞争问题。

协程与线程的关系

协程可以运行在一个或多个线程之上。多个协程可以复用同一个线程,当一个协程让出执行权时,线程可以执行其他协程。这种复用机制提高了线程的利用率,减少了线程上下文切换的开销。

使用Quasar实现协程式异步编程

Quasar是一个基于Java的库,它提供了协程的支持。

引入Quasar依赖

pom.xml文件中添加Quasar依赖:

<dependency>
    <groupId>co.paralleluniverse</groupId>
    <artifactId>quasar-core</artifactId>
    <version>0.8.10</version>
</dependency>

创建和运行协程

import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.SuspendExecution;

public class QuasarCoroutineExample {
    public static void main(String[] args) {
        Fiber<Void> fiber = new Fiber<Void>() {
            @Override
            protected Void run() throws SuspendExecution, InterruptedException {
                System.out.println("Coroutine started");
                Fiber.sleep(2000);
                System.out.println("Coroutine resumed after 2 seconds");
                return null;
            }
        };
        fiber.start();
        try {
            fiber.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在上述代码中,通过继承Fiber类创建了一个协程。在run方法中,使用Fiber.sleep方法模拟了一个耗时操作,这里的Fiber.sleep不会阻塞线程,而是让出协程的执行权。fiber.start()启动协程,fiber.join()等待协程执行完毕。

协程的异步调用

import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.fibers.async.Async;

public class QuasarAsyncExample {
    public static void main(String[] args) {
        Fiber<Void> fiber = new Fiber<Void>() {
            @Override
            protected Void run() throws SuspendExecution, InterruptedException {
                Fiber<String> asyncFiber = Async.future(() -> {
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "Async result";
                });
                String result = asyncFiber.get();
                System.out.println("Result from async operation: " + result);
                return null;
            }
        };
        fiber.start();
        try {
            fiber.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

这里使用Async.future创建了一个异步的协程操作,asyncFiber.get()会等待异步操作完成并获取结果,整个过程不会阻塞主线程所在的线程,体现了协程式异步编程的优势。

CompletableFuture与协程式异步编程对比

  1. 编程模型

    • CompletableFuture:基于回调和链式调用的方式,通过Future接口获取异步结果。它的编程模型相对传统,易于理解和掌握,适合处理简单到中等复杂度的异步任务。
    • 协程式异步编程:采用类似同步编程的方式编写异步代码,通过yieldawait等方式暂停和恢复协程执行。这种方式使异步代码看起来更像同步代码,对于复杂的异步逻辑,代码的可读性和维护性更好。
  2. 性能和资源消耗

    • CompletableFuture:依赖于Java的线程池机制,虽然在处理异步任务时能提高效率,但线程的创建和上下文切换仍然有一定开销。当有大量异步任务时,可能会消耗较多系统资源。
    • 协程式异步编程:协程是轻量级的,创建和销毁开销小,多个协程可以复用同一个线程,大大减少了资源消耗。在处理高并发、大量异步任务时,协程式异步编程在性能和资源利用上更具优势。
  3. 应用场景

    • CompletableFuture:适合于一般的后端异步任务处理,如数据库查询、网络请求等。对于不需要大量并发异步任务的场景,它能很好地满足需求。
    • 协程式异步编程:更适合高并发、I/O密集型的场景,如网络服务器开发、海量数据处理等。在这些场景下,协程的轻量级特性和高效的调度机制能显著提升系统性能。

在实际的后端开发网络编程中,开发者需要根据具体的业务需求和场景来选择合适的异步编程方式。如果是简单的异步任务,CompletableFuture是一个不错的选择;而对于高并发、资源敏感的场景,协程式异步编程可能会带来更好的效果。通过合理运用这两种异步编程技术,能够开发出更高效、更健壮的后端应用程序。