本文目录导读:

- 核心思路
- 方案一:使用 JDBC 纯手工迁移(简单、可控)
- 方案二:使用 Spring Batch(企业级、可扩展、支持大数据量)
- 方案三:使用 ETL 工具集成(适合异构系统、复杂转换)
- 方案四:使用 JDBC 并行处理(大数据量优化)
- 总结:如何选择?
在Java中实现数据迁移,通常指的是将数据从一个数据库(或数据源)迁移到另一个数据库(或数据格式),根据迁移的复杂度和数据量,可以选择不同的实现方案。
以下是几种常见的Java数据迁移实现方式,从简单到复杂,附带代码示例和适用场景。
核心思路
- 读取源数据:从源数据库查询数据。
- 数据转换:对数据进行清洗、类型转换、字段映射。
- 写入目标数据:将转换后的数据插入目标数据库。
- 事务与断点续传:确保数据一致性,支持大数据量迁移。
使用 JDBC 纯手工迁移(简单、可控)
适用于数据量不大(几万到几十万条)、结构简单、无高性能要求的场景。
步骤与代码示例:
import java.sql.*;
public class SimpleJdbcMigration {
public static void main(String[] args) {
String sourceUrl = "jdbc:mysql://localhost:3306/source_db";
String sourceUser = "root";
String sourcePass = "password";
String targetUrl = "jdbc:mysql://localhost:3306/target_db";
String targetUser = "root";
String targetPass = "password";
final int batchSize = 1000; // 分批处理,避免内存溢出
Connection sourceConn = null;
Connection targetConn = null;
try {
// 1. 建立连接
sourceConn = DriverManager.getConnection(sourceUrl, sourceUser, sourcePass);
targetConn = DriverManager.getConnection(targetUrl, targetUser, targetPass);
// 2. 读取源数据
String selectSql = "SELECT id, name, age, email FROM users";
Statement stmt = sourceConn.createStatement();
ResultSet rs = stmt.executeQuery(selectSql);
// 3. 准备目标插入语句 (PreparedStatement 防SQL注入且高效)
String insertSql = "INSERT INTO users (id, name, age, email) VALUES (?, ?, ?, ?)";
PreparedStatement pstmt = targetConn.prepareStatement(insertSql);
// 4. 关闭自动提交,分批提交事务
targetConn.setAutoCommit(false);
int count = 0;
while (rs.next()) {
// 数据转换(可根据需要修改字段)
pstmt.setInt(1, rs.getInt("id"));
pstmt.setString(2, rs.getString("name"));
pstmt.setInt(3, rs.getInt("age"));
// 示例:将email字段转为小写
String email = rs.getString("email");
pstmt.setString(4, email != null ? email.toLowerCase() : null);
pstmt.addBatch();
count++;
// 每 batchSize 条提交一次
if (count % batchSize == 0) {
pstmt.executeBatch();
targetConn.commit();
System.out.println("已迁移 " + count + " 条");
}
}
// 提交剩余数据
pstmt.executeBatch();
targetConn.commit();
System.out.println("迁移完成,共 " + count + " 条");
// 5. 关闭资源
rs.close();
pstmt.close();
stmt.close();
} catch (SQLException e) {
try { if (targetConn != null) targetConn.rollback(); } catch (SQLException ex) { ex.printStackTrace(); }
e.printStackTrace();
} finally {
try { if (sourceConn != null) sourceConn.close(); } catch (SQLException e) { e.printStackTrace(); }
try { if (targetConn != null) targetConn.close(); } catch (SQLException e) { e.printStackTrace(); }
}
}
}
优点:无额外依赖,灵活控制转换逻辑。 缺点:代码冗长,需要手动处理分页、异常、性能调优。
使用 Spring Batch(企业级、可扩展、支持大数据量)
Spring Batch 是专门为批处理(包括数据迁移)设计的框架,它提供读-处理-写的架构,自带事务管理、跳过重试、分区、指标监控等功能。
核心组件:
- ItemReader:从源读取数据。
- ItemProcessor:处理/转换数据(可选)。
- ItemWriter:写入目标数据库。
代码示例(基于 Spring Boot 和 Spring Batch):
Maven 依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
定义实体类和 Mapper:
// 源数据实体
@Entity
@Table(name = "old_users")
public class OldUser {
@Id private Long id;
private String name;
private Integer age;
private String email;
// getters/setters
}
// 目标数据实体
@Entity
@Table(name = "new_users")
public class NewUser {
@Id private Long id;
private String name;
private Integer age;
private String emailLower; // 目标结构不同,例如email字段名改变
// getters/setters
}
编写 Job 配置(核心):
@Configuration
@EnableBatchProcessing
public class MigrationJobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private EntityManagerFactory emf;
@Autowired
private DataSource targetDataSource;
@Bean
public Job migrateUserJob() {
return jobBuilderFactory.get("migrateUserJob")
.incrementer(new RunIdIncrementer())
.flow(migrateStep())
.end()
.build();
}
@Bean
public Step migrateStep() {
return stepBuilderFactory.get("migrateStep")
.<OldUser, NewUser>chunk(1000) // 每次处理1000条
.reader(userReader())
.processor(userProcessor())
.writer(userWriter())
.build();
}
// 读取数据
@Bean
public JpaPagingItemReader<OldUser> userReader() {
JpaPagingItemReader<OldUser> reader = new JpaPagingItemReader<>();
reader.setEntityManagerFactory(emf);
reader.setQueryString("select u from OldUser u"); // JPQL
reader.setPageSize(200);
return reader;
}
// 处理数据(转换)
@Bean
public ItemProcessor<OldUser, NewUser> userProcessor() {
return oldUser -> {
NewUser newUser = new NewUser();
newUser.setId(oldUser.getId());
newUser.setName(oldUser.getName());
newUser.setAge(oldUser.getAge());
// 转换逻辑:将email转为小写并赋值给emailLower
String email = oldUser.getEmail();
newUser.setEmailLower(email != null ? email.toLowerCase() : null);
return newUser;
};
}
// 写入目标数据库
@Bean
public JdbcBatchItemWriter<NewUser> userWriter() {
JdbcBatchItemWriter<NewUser> writer = new JdbcBatchItemWriter<>();
writer.setDataSource(targetDataSource);
writer.setSql("INSERT INTO new_users (id, name, age, email_lower) VALUES (:id, :name, :age, :emailLower)");
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
return writer;
}
}
优点:功能强大,性能可控,支持大表、多线程、失败重试、跳过错误行。 缺点:学习曲线较陡,项目结构较复杂。
使用 ETL 工具集成(适合异构系统、复杂转换)
如果源和目标是非常不同的系统(如 Oracle 到 MongoDB,或 MySQL 到 Elasticsearch),建议使用成熟的 ETL (Extract, Transform, Load) 工具,然后通过 Java 或 Spring Cloud Task 调用。
推荐工具:
- Apache Camel:轻量级集成框架,支持几百种数据格式和协议。
- Debezium:用于捕获数据库变更(CDC),实现实时迁移。
- Kettle (Pentaho Data Integration):图形化 ETL,Java 可调用其 API。
简单示例:Camel + JDBC
from("jdbc:dataSource?query=select * from old_users")
.process(exchange -> {
// 转换逻辑
Map<String, Object> row = exchange.getIn().getBody(Map.class);
row.put("email", row.get("email").toString().toLowerCase());
exchange.getIn().setBody(row);
})
.to("jdbc:targetDataSource?useHeadersAsParameters=true&" +
"sql=INSERT INTO new_users(id, name, email_lower) " +
"VALUES(:id, :name, :email)");
使用 JDBC 并行处理(大数据量优化)
当数据量达到千万级时,单线程迁移会很慢,可以使用多线程分页或分区方式。
核心优化点:
- 并行读取:按主键范围(ID 区间)拆分成多个子任务。
- 批量写入:使用
executeBatch()或INSERT ... VALUES (...), (...)语法。 - 目标索引优化:先删除索引再插入,最后重建索引。
简单多线程示例(伪代码):
int totalCount = getTotalCount("source_db", "users");
int threadCount = 4;
int pageSize = 5000;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
int startId = i * (totalCount / threadCount);
int endId = (i == threadCount - 1) ? totalCount : (i + 1) * (totalCount / threadCount);
executor.submit(() -> {
try {
migrateWithRange(startId, endId, pageSize);
} finally {
latch.countDown();
}
});
}
latch.await();
executor.shutdown();
注意:多线程迁移需要保证目标数据库支持并发写入,且注意主键冲突和事务隔离。
如何选择?
| 场景 | 推荐方案 | 原因 |
|---|---|---|
| 小数据量(< 10万)、一次性 | JDBC 纯手工 | 简单、无依赖 |
| 大数据量(百万级)、定期任务 | Spring Batch | 支持分区、失败恢复、监控 |
| 异构数据库、实时同步 | Debezium / Apache Camel | 支持 CDC、多种数据源 |
| 数据量极大(千万级) | 自定义并行导入(JDBC多线程) | 极致性能可控 |
最佳实践建议:
- 先小流量测试:在小表或测试环境验证转换逻辑。
- 添加断点续传:记录已迁移的最大 ID,下次从该 ID 开始。
- 监控与日志:记录每条失败记录,便于事后修复。
- 目标表加锁:若目标表正在使用,迁移前可考虑
LOCK TABLES或使用新表替换方式。
希望上述示例能帮助你快速实现数据迁移!如果有具体的迁移场景(如从 MySQL 到 MongoDB、或某厂商数据库),可以进一步细化。