Java案例如何获取异步结果?

wen java案例 10

本文目录导读:

Java案例如何获取异步结果?

  1. Future + Callable(传统方式)
  2. CompletableFuture(推荐方式)
  3. 回调函数模式
  4. 使用 CountDownLatch 等待多个结果
  5. Spring @Async 注解
  6. RxJava(响应式编程)
  7. 最佳实践建议

在Java中获取异步结果有多种方式,从传统到现代,下面详细介绍最常用的几种方案。

Future + Callable(传统方式)

最基础的异步结果获取方式:

import java.util.concurrent.*;
public class FutureDemo {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        // 提交异步任务
        Future<String> future = executor.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(2000);
                return "异步任务结果";
            }
        });
        System.out.println("主线程继续执行...");
        try {
            // 获取结果(阻塞等待)
            String result = future.get();
            System.out.println("异步结果: " + result);
            // 带超时的获取
            // String result = future.get(3, TimeUnit.SECONDS);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        executor.shutdown();
    }
}

缺点get() 方法是阻塞的,无法实现真正的回调。

CompletableFuture(推荐方式)

Java 8 引入的现代化异步编程方式:

基本使用

import java.util.concurrent.CompletableFuture;
public class CompletableFutureDemo {
    public static void main(String[] args) {
        // 创建异步任务
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "异步结果";
        });
        System.out.println("主线程继续执行...");
        // 1. 阻塞获取结果
        String result = future.join();
        System.out.println("结果: " + result);
        // 2. 或者使用回调(非阻塞)
        future.thenAccept(r -> System.out.println("回调结果: " + r));
    }
}

链式调用和组合

public class CompletableFutureChainDemo {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            return "Hello";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            return " World";
        });
        // 组合两个异步结果
        CompletableFuture<String> combined = future1.thenCombine(future2, (s1, s2) -> s1 + s2);
        System.out.println(combined.join()); // 输出: Hello World
        // 转换结果
        CompletableFuture<Integer> lengthFuture = combined.thenApply(String::length);
        System.out.println("长度: " + lengthFuture.join());
        // 消费结果(无返回值)
        combined.thenAccept(System.out::println);
    }
}

异常处理

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (Math.random() > 0.5) {
        throw new RuntimeException("出错了");
    }
    return "成功";
}).exceptionally(ex -> {
    System.out.println("异常: " + ex.getMessage());
    return "默认值";
}).thenApply(result -> result + " 处理完成");
System.out.println(future.join());

回调函数模式

通过接口实现回调:

import java.util.function.Consumer;
public class CallbackDemo {
    public static void main(String[] args) {
        asyncTask(result -> {
            System.out.println("回调结果: " + result);
        });
        System.out.println("主线程继续执行...");
        // 防止主线程提前退出
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    static void asyncTask(Consumer<String> callback) {
        new Thread(() -> {
            try {
                Thread.sleep(2000);
                String result = "任务结果";
                callback.accept(result);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

使用 CountDownLatch 等待多个结果

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        int taskCount = 3;
        CountDownLatch latch = new CountDownLatch(taskCount);
        ExecutorService executor = Executors.newFixedThreadPool(taskCount);
        for (int i = 0; i < taskCount; i++) {
            final int taskId = i;
            executor.submit(() -> {
                try {
                    Thread.sleep(1000);
                    System.out.println("任务 " + taskId + " 完成");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    latch.countDown(); // 任务完成,计数器减1
                }
            });
        }
        System.out.println("等待所有任务完成...");
        latch.await(); // 主线程等待
        System.out.println("所有任务完成");
        executor.shutdown();
    }
}

Spring @Async 注解

Spring 框架中的异步支持:

import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
@Service
@EnableAsync
public class AsyncService {
    @Async
    public CompletableFuture<String> asyncMethod() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return CompletableFuture.completedFuture("异步结果");
    }
}
// 使用
@Autowired
private AsyncService asyncService;
public void test() {
    CompletableFuture<String> future = asyncService.asyncMethod();
    future.thenAccept(System.out::println);
}

RxJava(响应式编程)

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
public class RxJavaDemo {
    public static void main(String[] args) {
        Observable<String> observable = Observable.create(emitter -> {
            try {
                Thread.sleep(2000);
                emitter.onNext("异步结果");
                emitter.onComplete();
            } catch (InterruptedException e) {
                emitter.onError(e);
            }
        }).subscribeOn(Schedulers.io());
        observable.subscribe(
            result -> System.out.println("结果: " + result),
            error -> System.out.println("错误: " + error),
            () -> System.out.println("完成")
        );
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

最佳实践建议

  1. 优先使用 CompletableFuture:功能强大,支持链式调用和组合
  2. 使用超时机制:避免永久阻塞
  3. 适当处理异常:使用 exceptionally() 或 handle() 方法
  4. 考虑线程池配置:避免默认的 ForkJoinPool
  5. 注意线程安全问题:异步结果可能涉及共享状态
// 自定义线程池的 CompletableFuture
ExecutorService executor = Executors.newFixedThreadPool(5);
CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> {
        return "结果";
    }, executor)
    .orTimeout(3, TimeUnit.SECONDS) // 超时
    .exceptionally(ex -> "处理异常");
future.thenAccept(System.out::println);
executor.shutdown();

选择哪种方式取决于具体需求:简单的异步用 Future,复杂的异步编排用 CompletableFuture,Spring 项目用 @Async,高并发场景考虑 RxJava。

抱歉,评论功能暂时关闭!