本文目录导读:

- 核心概念:为什么需要同步?
- 方案一:基于 JVM 本地锁/同步工具(单机环境)
- 方案二:基于数据库乐观锁/悲观锁(关系型数据库)
- 方案三:基于 Redis 分布式锁(跨实例同步)
- 方案四:基于消息队列(最终一致性)
- 方案五:基于 CDC(Change Data Capture)—— 数据库变更捕获
- 方案六:基于定时任务轮询(批量化)
- 总结表格:如何选择?
在Java中实现数据同步通常涉及数据库同步、缓存同步、跨系统数据一致性等场景,数据同步的核心挑战在于保证数据一致性、高可用和低延迟。
以下是几种常见的Java数据同步实现方案,从简单到复杂,并附带核心代码示例。
核心概念:为什么需要同步?
- 本地多线程同步:防止共享变量(如计数器、缓存Map)的并发修改。
- 主从数据库同步:读写分离后,写库到读库的数据复制延迟。
- 应用缓存与数据库同步:Redis缓存与MySQL数据的一致性。
- 分布式系统数据同步:不同微服务之间(如订单服务与库存服务)的业务数据最终一致。
基于 JVM 本地锁/同步工具(单机环境)
适用于单个Java服务实例内的临界资源访问。
使用 synchronized 或 ReentrantLock(线程安全)
public class LocalDataSync {
private final Object lock = new Object();
private Map<String, String> dataMap = new HashMap<>();
// 同步写入
public void syncPut(String key, String value) {
synchronized (lock) {
// 模拟同步操作:先检查,再写入
if (!dataMap.containsKey(key)) {
dataMap.put(key, value);
}
}
}
// 使用 ReentrantLock 实现带超时的同步
private final ReentrantLock reentrantLock = new ReentrantLock();
public boolean tryUpdateWithTimeout(String key, String value, long timeout, TimeUnit unit) {
try {
if (reentrantLock.tryLock(timeout, unit)) {
try {
dataMap.put(key, value);
return true;
} finally {
reentrantLock.unlock();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return false;
}
}
- 适用场景:单机服务的本地缓存刷新、计数器。
基于数据库乐观锁/悲观锁(关系型数据库)
适用于多服务实例需要保证同一份数据不被重复修改。
乐观锁(CAS思想,推荐高并发场景)
-- 数据库表增加 version 字段 ALTER TABLE product_stock ADD COLUMN version INT DEFAULT 1;
// Java 使用 MyBatis-Plus 实现乐观锁
public class StockSyncService {
@Transactional
public boolean deductStock(Long productId, Integer amount) {
// 1. 查询带版本号的记录
ProductStock stock = stockMapper.selectById(productId);
// 2. 检查库存
if (stock.getStock() < amount) {
throw new BizException("库存不足");
}
// 3. 更新 stock=stock-1, version=version+1, where version=oldVersion
int rows = stockMapper.update(
new LambdaUpdateWrapper<ProductStock>()
.eq(ProductStock::getId, productId)
.eq(ProductStock::getVersion, stock.getVersion()) // 乐观锁条件
.set(ProductStock::getStock, stock.getStock() - amount)
.set(ProductStock::getVersion, stock.getVersion() + 1)
);
// rows == 0,说明版本已变更,重试或报错
if (rows == 0) {
// 抛出异常触发事务回滚,或进行重试
throw new OptimisticLockException("数据已被其他线程修改");
}
return true;
}
}
悲观锁(SELECT ... FOR UPDATE)
适用于写冲突较少的场景。
@Transactional
public void syncUpdateWithPessimisticLock(Long orderId, String status) {
// 数据库行锁:其他事务无法同时操作该行
Order order = orderMapper.selectForUpdate(orderId);
if (order == null) return;
order.setStatus(status);
orderMapper.updateById(order);
}
基于 Redis 分布式锁(跨实例同步)
适用于多个Java服务实例抢锁执行同步任务。
使用 Redis + Redisson (推荐)
<!-- Maven 依赖 -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.27.0</version>
</dependency>
@Service
public class DistributedSyncService {
@Autowired
private RedissonClient redissonClient;
public void syncDeductStock(Long productId, Integer amount) {
String lockKey = "stock:lock:" + productId;
RLock lock = redissonClient.getLock(lockKey);
try {
// 1. 尝试获取锁(等待30秒,锁自动释放30秒)
if (lock.tryLock(30, 30, TimeUnit.SECONDS)) {
// 2. 业务操作:查询-判断-更新库存
ProductStock stock = stockMapper.selectById(productId);
if (stock.getStock() >= amount) {
stock.setStock(stock.getStock() - amount);
stockMapper.updateById(stock);
}
} else {
throw new BizException("系统繁忙,请稍后重试");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// 3. 释放锁
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
}
基于消息队列(最终一致性)
适用于跨系统、非实时要求的同步(如订单成功后同步履约系统)。
典型流程:本地事务 + 消息确认
@Service
public class OrderSyncService {
@Autowired
private TransactionTemplate transactionTemplate;
@Autowired
private KafkaTemplate<String, OrderEvent> kafkaTemplate;
public void createOrder(Order order) {
// 1. 执行本地事务(写订单表)
Order savedOrder = transactionTemplate.execute(status -> {
orderMapper.insert(order);
// 记录待发送消息(如消息表)
messageDao.insert(new Message(order.getId(), "OrderCreated"));
return order;
});
// 2. 发送消息(异步或同步重试)
if (savedOrder != null) {
// 可以借助本地消息表、RocketMQ事务消息、Kafka Exactly Once等机制
kafkaTemplate.send("order-sync-topic",
new OrderEvent(savedOrder.getId(), savedOrder.getUserId(), savedOrder.getAmount()));
// 3. 删除本地消息标记(或更新为已发送)
messageDao.deleteByOrderId(savedOrder.getId());
}
}
}
- 消费者:监听消息,更新下游系统(如库存、积分)。
- 关键点:生产者保证“本地事务提交”与“消息发送”的一致性;消费者保证幂等。
基于 CDC(Change Data Capture)—— 数据库变更捕获
使用 Debezium、Canal 等工具监听数据库 binlog,推送给Java应用进行同步。
示例:Canal 监听 MySQL binlog
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.7</version>
</dependency>
public void startCanalClient() {
// 连接到 Canal Server
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
connector.connect();
connector.subscribe("user_db.user_table"); // 订阅数据库
while (running) {
Message message = connector.getWithoutAck(100); // 获取100条
long batchId = message.getId();
List<CanalEntry.Entry> entries = message.getEntries();
for (CanalEntry.Entry entry : entries) {
// 解析 entry 得到 RowChange
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
for (RowData rowData : rowChange.getRowDatasList()) {
// 判断是 INSERT / UPDATE / DELETE
// 同步到另一个数据库、Elasticsearch 或 Redis
syncToTargetSystem(rowData);
}
}
connector.ack(batchId);
}
}
- 适用场景:实时数据仓库、缓存与数据库强一致、异地多活。
基于定时任务轮询(批量化)
适用于数据量不大、对实时性要求不高的同步(如每天凌晨同步商品信息)。
Spring Schedule + 幂等控制
@Component
public class DataSyncJob {
@Autowired
private JdbcTemplate jdbcTemplate;
@Scheduled(cron = "0 0 1 * * ?") // 每天凌晨1点
public void syncDataFromSourceToTarget() {
// 假设从A库同步到B库
List<Map<String, Object>> sourceData = jdbcTemplate.queryForList(
"SELECT * FROM source_table WHERE update_time > ?",
lastSyncTime);
// 批量更新或插入到目标表
for (Map<String, Object> row : sourceData) {
jdbcTemplate.update(
"INSERT INTO target_table (id, name, ...) VALUES (?,?,...) ON DUPLICATE KEY UPDATE ...",
row.get("id"), row.get("name"));
}
// 记录本次同步时间戳
lastSyncTime = LocalDateTime.now();
}
}
- 注意:需要处理重复数据、同步失败重试、分布式唯一性。
总结表格:如何选择?
| 场景 | 推荐方案 | 一致性级别 | 复杂度 |
|---|---|---|---|
| 单机多线程共享变量 | synchronized / ReentrantLock |
强一致 | 低 |
| 多实例修改同一行数据(如扣库存) | Redis分布式锁 + 乐观锁 | 强一致(最终锁定) | 中 |
| 微服务之间同步业务数据 | 消息队列(RocketMQ/Kafka) | 最终一致 | 中高 |
| 数据库复制/同步 | Canal/Debezium (CDC) | 最终一致(低延迟) | 高 |
| 批量报表同步(T+1) | 定时任务 + 批处理 | 最终一致 | 低 |
| 缓存与数据库的双写一致性 | 先写库 + 删除缓存 + 延迟双删 / Canal | 最终一致 | 中 |
核心原则:不要试图用“一把锁”解决所有同步问题,而是根据业务对一致性、性能、复杂度的容忍度,选择最适合的方案。