Java案例如何实现异步调用?

wen java案例 12

本文目录导读:

Java案例如何实现异步调用?

  1. 传统方式:创建新线程
  2. 使用线程池
  3. CompletableFuture(最推荐)
  4. Spring @Async注解
  5. 实战案例:批量数据处理
  6. 选择建议
  7. 注意事项

在Java中实现异步调用的方式有很多,我会从最基础到现代化的实现方式逐一介绍。

传统方式:创建新线程

最简单的异步方式就是直接创建新线程:

public class AsyncExample {
    public static void main(String[] args) {
        System.out.println("主线程开始执行");
        // 方式1:继承Thread
        new Thread(() -> {
            try {
                Thread.sleep(2000);
                System.out.println("异步任务执行完成");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        // 方式2:使用Runnable
        Runnable task = () -> {
            System.out.println("另一个异步任务");
        };
        new Thread(task).start();
        System.out.println("主线程继续执行");
    }
}

使用线程池

更好的方式是用线程池管理线程:

import java.util.concurrent.*;
public class ThreadPoolExample {
    private static final ExecutorService executor = Executors.newFixedThreadPool(10);
    public static void main(String[] args) {
        System.out.println("开始执行");
        // 提交异步任务
        Future<String> future = executor.submit(() -> {
            Thread.sleep(2000);
            return "异步任务结果";
        });
        try {
            // 获取结果(会阻塞)
            String result = future.get(3, TimeUnit.SECONDS);
            System.out.println("结果: " + result);
        } catch (Exception e) {
            e.printStackTrace();
        }
        // 记得关闭线程池
        executor.shutdown();
    }
}

CompletableFuture(最推荐)

Java 8引入的CompletableFuture使异步编程更加优雅:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 基础用法
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "异步结果";
        });
        // 异步回调
        future.thenAccept(result -> {
            System.out.println("收到结果: " + result);
        });
        // 异常处理
        future.exceptionally(throwable -> {
            System.out.println("发生异常: " + throwable.getMessage());
            return "默认值";
        });
        // 链式调用
        CompletableFuture.supplyAsync(() -> {
            return "Hello";
        }).thenApply(result -> {
            return result + " World";
        }).thenAccept(finalResult -> {
            System.out.println(finalResult);
        });
        // 等待所有任务完成
        CompletableFuture.allOf(
            CompletableFuture.runAsync(() -> {
                // 任务1
            }),
            CompletableFuture.runAsync(() -> {
                // 任务2
            })
        ).join();
        // 等待结果
        String result = future.get();
        System.out.println("最终结果: " + result);
    }
}

Spring @Async注解

如果你使用Spring框架,可以用注解简化异步:

import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Service;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.concurrent.CompletableFuture;
@EnableAsync  // 在配置类上添加
@Configuration
@EnableAsync
public class AsyncConfig {
    @Bean
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("Async-");
        executor.initialize();
        return executor;
    }
}
@Service
public class AsyncService {
    @Async  // 标记为异步方法
    public void asyncMethod() {
        try {
            Thread.sleep(2000);
            System.out.println("异步方法执行完成");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    @Async
    public CompletableFuture<String> asyncWithResult() {
        try {
            Thread.sleep(1000);
            return CompletableFuture.completedFuture("异步结果");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return CompletableFuture.completedFuture("失败");
        }
    }
}
// 使用
@RestController
public class AsyncController {
    @Autowired
    private AsyncService asyncService;
    @GetMapping("/test")
    public String test() {
        asyncService.asyncMethod();  // 立即返回,不会等待
        return "请求已接收";
    }
}

实战案例:批量数据处理

一个完整的数据处理案例:

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class BatchDataProcessor {
    private final ExecutorService executor = Executors.newFixedThreadPool(5);
    public static void main(String[] args) {
        BatchDataProcessor processor = new BatchDataProcessor();
        // 模拟100个数据需要处理
        List<Integer> dataList = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            dataList.add(i);
        }
        try {
            List<String> results = processor.processBatch(dataList);
            System.out.println("处理完成,共处理 " + results.size() + " 条数据");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public List<String> processBatch(List<Integer> dataList) 
            throws InterruptedException, ExecutionException {
        // 将数据分批处理,每批10个
        List<List<Integer>> batches = partitionList(dataList, 10);
        List<CompletableFuture<List<String>>> futures = new ArrayList<>();
        for (List<Integer> batch : batches) {
            CompletableFuture<List<String>> future = 
                CompletableFuture.supplyAsync(() -> processBatchPart(batch), executor);
            futures.add(future);
        }
        // 等待所有任务完成
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
        // 收集结果
        List<String> allResults = new ArrayList<>();
        for (CompletableFuture<List<String>> future : futures) {
            allResults.addAll(future.get());
        }
        return allResults;
    }
    private List<String> processBatchPart(List<Integer> batch) {
        return batch.stream()
            .map(i -> {
                // 模拟数据处理
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                return "处理数据: " + i;
            })
            .collect(Collectors.toList());
    }
    private List<List<Integer>> partitionList(List<Integer> list, int size) {
        List<List<Integer>> partitions = new ArrayList<>();
        for (int i = 0; i < list.size(); i += size) {
            partitions.add(list.subList(i, Math.min(i + size, list.size())));
        }
        return partitions;
    }
}

选择建议

  1. 简单任务:使用线程池即可
  2. 需要回调或组合任务:使用CompletableFuture
  3. Spring项目:使用@Async注解
  4. 响应式编程:考虑使用WebFlux或RxJava

注意事项

  • 合理配置线程池大小,避免资源耗尽
  • 注意处理异常,避免任务静默失败
  • 适当使用超时机制,防止任务卡死
  • 异步任务中的用户上下文(如认证信息)需要特殊处理

选择哪种方式取决于你的具体需求和技术栈,对于大多数企业应用,CompletableFuture或Spring的@Async就能满足需求。

抱歉,评论功能暂时关闭!