本文目录导读:

- Future + Callable(传统方式)
- CompletableFuture(推荐方式)
- 回调函数模式
- 使用 CountDownLatch 等待多个结果
- Spring @Async 注解
- RxJava(响应式编程)
- 最佳实践建议
在Java中获取异步结果有多种方式,从传统到现代,下面详细介绍最常用的几种方案。
Future + Callable(传统方式)
最基础的异步结果获取方式:
import java.util.concurrent.*;
public class FutureDemo {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
// 提交异步任务
Future<String> future = executor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(2000);
return "异步任务结果";
}
});
System.out.println("主线程继续执行...");
try {
// 获取结果(阻塞等待)
String result = future.get();
System.out.println("异步结果: " + result);
// 带超时的获取
// String result = future.get(3, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
executor.shutdown();
}
}
缺点:get() 方法是阻塞的,无法实现真正的回调。
CompletableFuture(推荐方式)
Java 8 引入的现代化异步编程方式:
基本使用
import java.util.concurrent.CompletableFuture;
public class CompletableFutureDemo {
public static void main(String[] args) {
// 创建异步任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "异步结果";
});
System.out.println("主线程继续执行...");
// 1. 阻塞获取结果
String result = future.join();
System.out.println("结果: " + result);
// 2. 或者使用回调(非阻塞)
future.thenAccept(r -> System.out.println("回调结果: " + r));
}
}
链式调用和组合
public class CompletableFutureChainDemo {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "Hello";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return " World";
});
// 组合两个异步结果
CompletableFuture<String> combined = future1.thenCombine(future2, (s1, s2) -> s1 + s2);
System.out.println(combined.join()); // 输出: Hello World
// 转换结果
CompletableFuture<Integer> lengthFuture = combined.thenApply(String::length);
System.out.println("长度: " + lengthFuture.join());
// 消费结果(无返回值)
combined.thenAccept(System.out::println);
}
}
异常处理
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("出错了");
}
return "成功";
}).exceptionally(ex -> {
System.out.println("异常: " + ex.getMessage());
return "默认值";
}).thenApply(result -> result + " 处理完成");
System.out.println(future.join());
回调函数模式
通过接口实现回调:
import java.util.function.Consumer;
public class CallbackDemo {
public static void main(String[] args) {
asyncTask(result -> {
System.out.println("回调结果: " + result);
});
System.out.println("主线程继续执行...");
// 防止主线程提前退出
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static void asyncTask(Consumer<String> callback) {
new Thread(() -> {
try {
Thread.sleep(2000);
String result = "任务结果";
callback.accept(result);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
使用 CountDownLatch 等待多个结果
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
int taskCount = 3;
CountDownLatch latch = new CountDownLatch(taskCount);
ExecutorService executor = Executors.newFixedThreadPool(taskCount);
for (int i = 0; i < taskCount; i++) {
final int taskId = i;
executor.submit(() -> {
try {
Thread.sleep(1000);
System.out.println("任务 " + taskId + " 完成");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown(); // 任务完成,计数器减1
}
});
}
System.out.println("等待所有任务完成...");
latch.await(); // 主线程等待
System.out.println("所有任务完成");
executor.shutdown();
}
}
Spring @Async 注解
Spring 框架中的异步支持:
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
@Service
@EnableAsync
public class AsyncService {
@Async
public CompletableFuture<String> asyncMethod() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return CompletableFuture.completedFuture("异步结果");
}
}
// 使用
@Autowired
private AsyncService asyncService;
public void test() {
CompletableFuture<String> future = asyncService.asyncMethod();
future.thenAccept(System.out::println);
}
RxJava(响应式编程)
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
public class RxJavaDemo {
public static void main(String[] args) {
Observable<String> observable = Observable.create(emitter -> {
try {
Thread.sleep(2000);
emitter.onNext("异步结果");
emitter.onComplete();
} catch (InterruptedException e) {
emitter.onError(e);
}
}).subscribeOn(Schedulers.io());
observable.subscribe(
result -> System.out.println("结果: " + result),
error -> System.out.println("错误: " + error),
() -> System.out.println("完成")
);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
最佳实践建议
- 优先使用 CompletableFuture:功能强大,支持链式调用和组合
- 使用超时机制:避免永久阻塞
- 适当处理异常:使用 exceptionally() 或 handle() 方法
- 考虑线程池配置:避免默认的 ForkJoinPool
- 注意线程安全问题:异步结果可能涉及共享状态
// 自定义线程池的 CompletableFuture
ExecutorService executor = Executors.newFixedThreadPool(5);
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
return "结果";
}, executor)
.orTimeout(3, TimeUnit.SECONDS) // 超时
.exceptionally(ex -> "处理异常");
future.thenAccept(System.out::println);
executor.shutdown();
选择哪种方式取决于具体需求:简单的异步用 Future,复杂的异步编排用 CompletableFuture,Spring 项目用 @Async,高并发场景考虑 RxJava。