Java案例怎么实现数据迁移?

wen java案例 66

本文目录导读:

Java案例怎么实现数据迁移?

  1. 核心思路
  2. 方案一:使用 JDBC 纯手工迁移(简单、可控)
  3. 方案二:使用 Spring Batch(企业级、可扩展、支持大数据量)
  4. 方案三:使用 ETL 工具集成(适合异构系统、复杂转换)
  5. 方案四:使用 JDBC 并行处理(大数据量优化)
  6. 总结:如何选择?

在Java中实现数据迁移,通常指的是将数据从一个数据库(或数据源)迁移到另一个数据库(或数据格式),根据迁移的复杂度和数据量,可以选择不同的实现方案。

以下是几种常见的Java数据迁移实现方式,从简单到复杂,附带代码示例和适用场景。


核心思路

  1. 读取源数据:从源数据库查询数据。
  2. 数据转换:对数据进行清洗、类型转换、字段映射。
  3. 写入目标数据:将转换后的数据插入目标数据库。
  4. 事务与断点续传:确保数据一致性,支持大数据量迁移。

使用 JDBC 纯手工迁移(简单、可控)

适用于数据量不大(几万到几十万条)、结构简单、无高性能要求的场景。

步骤与代码示例:

import java.sql.*;
public class SimpleJdbcMigration {
    public static void main(String[] args) {
        String sourceUrl = "jdbc:mysql://localhost:3306/source_db";
        String sourceUser = "root";
        String sourcePass = "password";
        String targetUrl = "jdbc:mysql://localhost:3306/target_db";
        String targetUser = "root";
        String targetPass = "password";
        final int batchSize = 1000; // 分批处理,避免内存溢出
        Connection sourceConn = null;
        Connection targetConn = null;
        try {
            // 1. 建立连接
            sourceConn = DriverManager.getConnection(sourceUrl, sourceUser, sourcePass);
            targetConn = DriverManager.getConnection(targetUrl, targetUser, targetPass);
            // 2. 读取源数据
            String selectSql = "SELECT id, name, age, email FROM users";
            Statement stmt = sourceConn.createStatement();
            ResultSet rs = stmt.executeQuery(selectSql);
            // 3. 准备目标插入语句 (PreparedStatement 防SQL注入且高效)
            String insertSql = "INSERT INTO users (id, name, age, email) VALUES (?, ?, ?, ?)";
            PreparedStatement pstmt = targetConn.prepareStatement(insertSql);
            // 4. 关闭自动提交,分批提交事务
            targetConn.setAutoCommit(false);
            int count = 0;
            while (rs.next()) {
                // 数据转换(可根据需要修改字段)
                pstmt.setInt(1, rs.getInt("id"));
                pstmt.setString(2, rs.getString("name"));
                pstmt.setInt(3, rs.getInt("age"));
                // 示例:将email字段转为小写
                String email = rs.getString("email");
                pstmt.setString(4, email != null ? email.toLowerCase() : null);
                pstmt.addBatch();
                count++;
                // 每 batchSize 条提交一次
                if (count % batchSize == 0) {
                    pstmt.executeBatch();
                    targetConn.commit();
                    System.out.println("已迁移 " + count + " 条");
                }
            }
            // 提交剩余数据
            pstmt.executeBatch();
            targetConn.commit();
            System.out.println("迁移完成,共 " + count + " 条");
            // 5. 关闭资源
            rs.close();
            pstmt.close();
            stmt.close();
        } catch (SQLException e) {
            try { if (targetConn != null) targetConn.rollback(); } catch (SQLException ex) { ex.printStackTrace(); }
            e.printStackTrace();
        } finally {
            try { if (sourceConn != null) sourceConn.close(); } catch (SQLException e) { e.printStackTrace(); }
            try { if (targetConn != null) targetConn.close(); } catch (SQLException e) { e.printStackTrace(); }
        }
    }
}

优点:无额外依赖,灵活控制转换逻辑。 缺点:代码冗长,需要手动处理分页、异常、性能调优。


使用 Spring Batch(企业级、可扩展、支持大数据量)

Spring Batch 是专门为批处理(包括数据迁移)设计的框架,它提供读-处理-写的架构,自带事务管理跳过重试分区指标监控等功能。

核心组件:

  • ItemReader:从源读取数据。
  • ItemProcessor:处理/转换数据(可选)。
  • ItemWriter:写入目标数据库。

代码示例(基于 Spring Boot 和 Spring Batch):

Maven 依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

定义实体类和 Mapper:

// 源数据实体
@Entity
@Table(name = "old_users")
public class OldUser {
    @Id private Long id;
    private String name;
    private Integer age;
    private String email;
    // getters/setters
}
// 目标数据实体
@Entity
@Table(name = "new_users")
public class NewUser {
    @Id private Long id;
    private String name;
    private Integer age;
    private String emailLower; // 目标结构不同,例如email字段名改变
    // getters/setters
}

编写 Job 配置(核心):

@Configuration
@EnableBatchProcessing
public class MigrationJobConfig {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private EntityManagerFactory emf;
    @Autowired
    private DataSource targetDataSource;
    @Bean
    public Job migrateUserJob() {
        return jobBuilderFactory.get("migrateUserJob")
                .incrementer(new RunIdIncrementer())
                .flow(migrateStep())
                .end()
                .build();
    }
    @Bean
    public Step migrateStep() {
        return stepBuilderFactory.get("migrateStep")
                .<OldUser, NewUser>chunk(1000) // 每次处理1000条
                .reader(userReader())
                .processor(userProcessor())
                .writer(userWriter())
                .build();
    }
    // 读取数据
    @Bean
    public JpaPagingItemReader<OldUser> userReader() {
        JpaPagingItemReader<OldUser> reader = new JpaPagingItemReader<>();
        reader.setEntityManagerFactory(emf);
        reader.setQueryString("select u from OldUser u"); // JPQL
        reader.setPageSize(200);
        return reader;
    }
    // 处理数据(转换)
    @Bean
    public ItemProcessor<OldUser, NewUser> userProcessor() {
        return oldUser -> {
            NewUser newUser = new NewUser();
            newUser.setId(oldUser.getId());
            newUser.setName(oldUser.getName());
            newUser.setAge(oldUser.getAge());
            // 转换逻辑:将email转为小写并赋值给emailLower
            String email = oldUser.getEmail();
            newUser.setEmailLower(email != null ? email.toLowerCase() : null);
            return newUser;
        };
    }
    // 写入目标数据库
    @Bean
    public JdbcBatchItemWriter<NewUser> userWriter() {
        JdbcBatchItemWriter<NewUser> writer = new JdbcBatchItemWriter<>();
        writer.setDataSource(targetDataSource);
        writer.setSql("INSERT INTO new_users (id, name, age, email_lower) VALUES (:id, :name, :age, :emailLower)");
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
        return writer;
    }
}

优点:功能强大,性能可控,支持大表、多线程、失败重试、跳过错误行。 缺点:学习曲线较陡,项目结构较复杂。


使用 ETL 工具集成(适合异构系统、复杂转换)

如果源和目标是非常不同的系统(如 Oracle 到 MongoDB,或 MySQL 到 Elasticsearch),建议使用成熟的 ETL (Extract, Transform, Load) 工具,然后通过 Java 或 Spring Cloud Task 调用。

推荐工具

  • Apache Camel:轻量级集成框架,支持几百种数据格式和协议。
  • Debezium:用于捕获数据库变更(CDC),实现实时迁移。
  • Kettle (Pentaho Data Integration):图形化 ETL,Java 可调用其 API。

简单示例:Camel + JDBC

from("jdbc:dataSource?query=select * from old_users")
    .process(exchange -> {
        // 转换逻辑
        Map<String, Object> row = exchange.getIn().getBody(Map.class);
        row.put("email", row.get("email").toString().toLowerCase());
        exchange.getIn().setBody(row);
    })
    .to("jdbc:targetDataSource?useHeadersAsParameters=true&" +
        "sql=INSERT INTO new_users(id, name, email_lower) " +
        "VALUES(:id, :name, :email)");

使用 JDBC 并行处理(大数据量优化)

当数据量达到千万级时,单线程迁移会很慢,可以使用多线程分页分区方式。

核心优化点

  1. 并行读取:按主键范围(ID 区间)拆分成多个子任务。
  2. 批量写入:使用 executeBatch()INSERT ... VALUES (...), (...) 语法。
  3. 目标索引优化:先删除索引再插入,最后重建索引。

简单多线程示例(伪代码):

int totalCount = getTotalCount("source_db", "users");
int threadCount = 4;
int pageSize = 5000;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
    int startId = i * (totalCount / threadCount);
    int endId = (i == threadCount - 1) ? totalCount : (i + 1) * (totalCount / threadCount);
    executor.submit(() -> {
        try {
            migrateWithRange(startId, endId, pageSize);
        } finally {
            latch.countDown();
        }
    });
}
latch.await();
executor.shutdown();

注意:多线程迁移需要保证目标数据库支持并发写入,且注意主键冲突事务隔离


如何选择?

场景 推荐方案 原因
小数据量(< 10万)、一次性 JDBC 纯手工 简单、无依赖
大数据量(百万级)、定期任务 Spring Batch 支持分区、失败恢复、监控
异构数据库、实时同步 Debezium / Apache Camel 支持 CDC、多种数据源
数据量极大(千万级) 自定义并行导入(JDBC多线程) 极致性能可控

最佳实践建议

  1. 先小流量测试:在小表或测试环境验证转换逻辑。
  2. 添加断点续传:记录已迁移的最大 ID,下次从该 ID 开始。
  3. 监控与日志:记录每条失败记录,便于事后修复。
  4. 目标表加锁:若目标表正在使用,迁移前可考虑 LOCK TABLES 或使用新表替换方式。

希望上述示例能帮助你快速实现数据迁移!如果有具体的迁移场景(如从 MySQL 到 MongoDB、或某厂商数据库),可以进一步细化。

抱歉,评论功能暂时关闭!