如何用Java案例实现批量数据处理?

wen java案例 4

本文目录导读:

如何用Java案例实现批量数据处理?

  1. 使用JDBC批量操作
  2. 使用Stream API处理大数据集合
  3. 使用ExecutorService并行处理
  4. 使用Spring Batch批量处理框架
  5. 文件批量处理示例
  6. 完整的分批工具类
  7. 性能优化建议

我来介绍几种Java实现批量数据处理的常用方案,包含具体案例。

使用JDBC批量操作

批量插入示例

public class BatchInsertExample {
    public void batchInsert(List<User> users) {
        String sql = "INSERT INTO users (name, email, age) VALUES (?, ?, ?)";
        try (Connection conn = getConnection();
             PreparedStatement pstmt = conn.prepareStatement(sql)) {
            // 关闭自动提交
            conn.setAutoCommit(false);
            int batchSize = 1000; // 每批处理1000条
            int count = 0;
            for (User user : users) {
                pstmt.setString(1, user.getName());
                pstmt.setString(2, user.getEmail());
                pstmt.setInt(3, user.getAge());
                pstmt.addBatch();
                count++;
                // 每1000条执行一次批量操作
                if (count % batchSize == 0) {
                    pstmt.executeBatch();
                    conn.commit();
                    pstmt.clearBatch();
                }
            }
            // 处理剩余数据
            if (count % batchSize != 0) {
                pstmt.executeBatch();
                conn.commit();
            }
        } catch (SQLException e) {
            // 发生错误时回滚
            if (conn != null) {
                try {
                    conn.rollback();
                } catch (SQLException ex) {
                    ex.printStackTrace();
                }
            }
            e.printStackTrace();
        }
    }
}

使用Stream API处理大数据集合

分批处理示例

import java.util.*;
import java.util.stream.*;
public class StreamBatchProcessor {
    public <T> void processInBatches(List<T> items, int batchSize, 
                                       Consumer<List<T>> processor) {
        int totalSize = items.size();
        IntStream.range(0, (totalSize + batchSize - 1) / batchSize)
            .mapToObj(i -> items.subList(
                i * batchSize, 
                Math.min((i + 1) * batchSize, totalSize)
            ))
            .forEach(processor);
    }
    // 使用示例
    public static void main(String[] args) {
        List<Integer> numbers = IntStream.range(1, 10001)
                                        .boxed()
                                        .collect(Collectors.toList());
        StreamBatchProcessor processor = new StreamBatchProcessor();
        // 每批处理500个元素
        processor.processInBatches(numbers, 500, batch -> {
            System.out.println("Processing batch of " + batch.size() + " items");
            // 处理这批数据
            batch.forEach(num -> {
                // 执行处理逻辑
                System.out.println("Processing: " + num);
            });
        });
    }
}

使用ExecutorService并行处理

import java.util.concurrent.*;
import java.util.*;
public class ParallelBatchProcessor {
    private final ExecutorService executor;
    private final int batchSize;
    public ParallelBatchProcessor(int threadCount, int batchSize) {
        this.executor = Executors.newFixedThreadPool(threadCount);
        this.batchSize = batchSize;
    }
    public <T> void processDataset(List<T> dataset, 
                                   Function<List<T>, Boolean> processor) 
                                   throws InterruptedException {
        List<Future<Boolean>> futures = new ArrayList<>();
        // 分割数据集并提交到线程池
        for (int i = 0; i < dataset.size(); i += batchSize) {
            int end = Math.min(i + batchSize, dataset.size());
            List<T> batch = dataset.subList(i, end);
            Callable<Boolean> task = () -> processor.apply(batch);
            futures.add(executor.submit(task));
        }
        // 等待所有任务完成
        for (Future<Boolean> future : futures) {
            try {
                future.get(); // 获取结果,检查是否成功
            } catch (ExecutionException e) {
                System.err.println("Batch processing failed: " + e.getMessage());
            }
        }
    }
    public void shutdown() {
        executor.shutdown();
        try {
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
        }
    }
    // 使用示例
    public static void main(String[] args) throws InterruptedException {
        List<Integer> data = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {
            data.add(i);
        }
        ParallelBatchProcessor processor = new ParallelBatchProcessor(4, 500);
        processor.processDataset(data, batch -> {
            System.out.println(Thread.currentThread().getName() + 
                             " processing " + batch.size() + " items");
            for (Integer item : batch) {
                // 模拟处理逻辑
                System.out.println("Processing: " + item);
            }
            return true;
        });
        processor.shutdown();
    }
}

使用Spring Batch批量处理框架

配置文件

@Configuration
@EnableBatchProcessing
public class BatchConfig {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Bean
    public ItemReader<User> reader() {
        return new JdbcCursorItemReaderBuilder<User>()
            .dataSource(dataSource)
            .name("userReader")
            .sql("SELECT id, name, email FROM users")
            .rowMapper(new UserRowMapper())
            .build();
    }
    @Bean
    public ItemProcessor<User, ProcessedUser> processor() {
        return user -> {
            // 处理逻辑
            return new ProcessedUser(user.getName().toUpperCase(), 
                                   user.getEmail(),
                                   calculateAge(user.getBirthDate()));
        };
    }
    @Bean
    public ItemWriter<ProcessedUser> writer() {
        return new RepositoryItemWriterBuilder<ProcessedUser>()
            .repository(userRepository)
            .methodName("save")
            .build();
    }
    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
            .<User, ProcessedUser>chunk(1000) // 每1000条一批
            .reader(reader())
            .processor(processor())
            .writer(writer())
            .build();
    }
    @Bean
    public Job importUserJob() {
        return jobBuilderFactory.get("importUserJob")
            .incrementer(new RunIdIncrementer())
            .flow(step1())
            .end()
            .build();
    }
}

文件批量处理示例

public class FileBatchProcessor {
    public void processLargeFile(String filePath, int batchSize) 
            throws IOException {
        BufferedReader reader = new BufferedReader(new FileReader(filePath));
        List<String> batch = new ArrayList<>(batchSize);
        String line;
        int lineCount = 0;
        try {
            while ((line = reader.readLine()) != null) {
                batch.add(line);
                lineCount++;
                if (lineCount % batchSize == 0) {
                    processBatch(batch);
                    batch.clear();
                }
            }
            // 处理最后一批
            if (!batch.isEmpty()) {
                processBatch(batch);
            }
        } finally {
            reader.close();
        }
    }
    private void processBatch(List<String> batch) {
        // 处理一批数据
        batch.parallelStream().forEach(line -> {
            // 处理每一行数据
            System.out.println("Processing: " + line);
        });
    }
}

完整的分批工具类

public class BatchUtils {
    /**
     * 将集合分为指定大小的批次
     */
    public static <T> List<List<T>> partition(List<T> list, int batchSize) {
        List<List<T>> batches = new ArrayList<>();
        int totalSize = list.size();
        for (int i = 0; i < totalSize; i += batchSize) {
            int end = Math.min(i + batchSize, totalSize);
            batches.add(new ArrayList<>(list.subList(i, end)));
        }
        return batches;
    }
    /**
     * 批量处理并统计结果
     */
    public static <T, R> BatchResult<R> processBatch(
            List<T> items, 
            int batchSize, 
            Function<List<T>, List<R>> processor) {
        BatchResult<R> result = new BatchResult<>();
        List<List<T>> batches = partition(items, batchSize);
        for (List<T> batch : batches) {
            try {
                List<R> processedItems = processor.apply(batch);
                result.addSuccessItems(processedItems);
                result.incrementProcessed(batch.size());
            } catch (Exception e) {
                result.addFailedBatch(batch);
                result.incrementFailed(batch.size());
                System.err.println("Batch processing failed: " + e.getMessage());
            }
        }
        return result;
    }
    // 结果统计类
    public static class BatchResult<R> {
        private int totalProcessed = 0;
        private int totalFailed = 0;
        private List<R> successItems = new ArrayList<>();
        private List<List<?>> failedBatches = new ArrayList<>();
        // getters and setters
        // ...
    }
}
// 使用示例
public class BatchProcessingDemo {
    public static void main(String[] args) {
        List<Integer> numbers = IntStream.range(1, 1001)
                                        .boxed()
                                        .collect(Collectors.toList());
        BatchUtils.BatchResult<String> result = 
            BatchUtils.processBatch(numbers, 100, batch -> {
                List<String> processed = new ArrayList<>();
                for (Integer num : batch) {
                    processed.add("Number: " + num);
                    // 模拟处理时间
                    Thread.sleep(10);
                }
                return processed;
            });
        System.out.println("Processed: " + result.getTotalProcessed());
        System.out.println("Failed: " + result.getTotalFailed());
    }
}

性能优化建议

  1. 选择合适的批次大小:通常1000-5000条之间性能较好
  2. 使用预编译SQL:提高数据库操作性能
  3. 关闭自动提交:批量操作时手动控制事务
  4. 使用索引:确保查询和更新操作使用合适的索引
  5. 内存控制:注意JVM堆内存,避免OOM
  6. 异常处理:实现完善的错误处理和回滚机制

这些方案可以根据实际需求选择使用,对于特别大的数据集,建议使用Spring Batch等专业框架。

抱歉,评论功能暂时关闭!