本文目录导读:

在Java中实现异步调用的方式有很多,我会从最基础到现代化的实现方式逐一介绍。
传统方式:创建新线程
最简单的异步方式就是直接创建新线程:
public class AsyncExample {
public static void main(String[] args) {
System.out.println("主线程开始执行");
// 方式1:继承Thread
new Thread(() -> {
try {
Thread.sleep(2000);
System.out.println("异步任务执行完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 方式2:使用Runnable
Runnable task = () -> {
System.out.println("另一个异步任务");
};
new Thread(task).start();
System.out.println("主线程继续执行");
}
}
使用线程池
更好的方式是用线程池管理线程:
import java.util.concurrent.*;
public class ThreadPoolExample {
private static final ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) {
System.out.println("开始执行");
// 提交异步任务
Future<String> future = executor.submit(() -> {
Thread.sleep(2000);
return "异步任务结果";
});
try {
// 获取结果(会阻塞)
String result = future.get(3, TimeUnit.SECONDS);
System.out.println("结果: " + result);
} catch (Exception e) {
e.printStackTrace();
}
// 记得关闭线程池
executor.shutdown();
}
}
CompletableFuture(最推荐)
Java 8引入的CompletableFuture使异步编程更加优雅:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 基础用法
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "异步结果";
});
// 异步回调
future.thenAccept(result -> {
System.out.println("收到结果: " + result);
});
// 异常处理
future.exceptionally(throwable -> {
System.out.println("发生异常: " + throwable.getMessage());
return "默认值";
});
// 链式调用
CompletableFuture.supplyAsync(() -> {
return "Hello";
}).thenApply(result -> {
return result + " World";
}).thenAccept(finalResult -> {
System.out.println(finalResult);
});
// 等待所有任务完成
CompletableFuture.allOf(
CompletableFuture.runAsync(() -> {
// 任务1
}),
CompletableFuture.runAsync(() -> {
// 任务2
})
).join();
// 等待结果
String result = future.get();
System.out.println("最终结果: " + result);
}
}
Spring @Async注解
如果你使用Spring框架,可以用注解简化异步:
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Service;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.concurrent.CompletableFuture;
@EnableAsync // 在配置类上添加
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("Async-");
executor.initialize();
return executor;
}
}
@Service
public class AsyncService {
@Async // 标记为异步方法
public void asyncMethod() {
try {
Thread.sleep(2000);
System.out.println("异步方法执行完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Async
public CompletableFuture<String> asyncWithResult() {
try {
Thread.sleep(1000);
return CompletableFuture.completedFuture("异步结果");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return CompletableFuture.completedFuture("失败");
}
}
}
// 使用
@RestController
public class AsyncController {
@Autowired
private AsyncService asyncService;
@GetMapping("/test")
public String test() {
asyncService.asyncMethod(); // 立即返回,不会等待
return "请求已接收";
}
}
实战案例:批量数据处理
一个完整的数据处理案例:
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class BatchDataProcessor {
private final ExecutorService executor = Executors.newFixedThreadPool(5);
public static void main(String[] args) {
BatchDataProcessor processor = new BatchDataProcessor();
// 模拟100个数据需要处理
List<Integer> dataList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
dataList.add(i);
}
try {
List<String> results = processor.processBatch(dataList);
System.out.println("处理完成,共处理 " + results.size() + " 条数据");
} catch (Exception e) {
e.printStackTrace();
}
}
public List<String> processBatch(List<Integer> dataList)
throws InterruptedException, ExecutionException {
// 将数据分批处理,每批10个
List<List<Integer>> batches = partitionList(dataList, 10);
List<CompletableFuture<List<String>>> futures = new ArrayList<>();
for (List<Integer> batch : batches) {
CompletableFuture<List<String>> future =
CompletableFuture.supplyAsync(() -> processBatchPart(batch), executor);
futures.add(future);
}
// 等待所有任务完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
// 收集结果
List<String> allResults = new ArrayList<>();
for (CompletableFuture<List<String>> future : futures) {
allResults.addAll(future.get());
}
return allResults;
}
private List<String> processBatchPart(List<Integer> batch) {
return batch.stream()
.map(i -> {
// 模拟数据处理
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "处理数据: " + i;
})
.collect(Collectors.toList());
}
private List<List<Integer>> partitionList(List<Integer> list, int size) {
List<List<Integer>> partitions = new ArrayList<>();
for (int i = 0; i < list.size(); i += size) {
partitions.add(list.subList(i, Math.min(i + size, list.size())));
}
return partitions;
}
}
选择建议
- 简单任务:使用线程池即可
- 需要回调或组合任务:使用CompletableFuture
- Spring项目:使用@Async注解
- 响应式编程:考虑使用WebFlux或RxJava
注意事项
- 合理配置线程池大小,避免资源耗尽
- 注意处理异常,避免任务静默失败
- 适当使用超时机制,防止任务卡死
- 异步任务中的用户上下文(如认证信息)需要特殊处理
选择哪种方式取决于你的具体需求和技术栈,对于大多数企业应用,CompletableFuture或Spring的@Async就能满足需求。