Java案例如何实现数据同步?

wen java案例 74

本文目录导读:

Java案例如何实现数据同步?

  1. 核心概念:为什么需要同步?
  2. 方案一:基于 JVM 本地锁/同步工具(单机环境)
  3. 方案二:基于数据库乐观锁/悲观锁(关系型数据库)
  4. 方案三:基于 Redis 分布式锁(跨实例同步)
  5. 方案四:基于消息队列(最终一致性)
  6. 方案五:基于 CDC(Change Data Capture)—— 数据库变更捕获
  7. 方案六:基于定时任务轮询(批量化)
  8. 总结表格:如何选择?

在Java中实现数据同步通常涉及数据库同步缓存同步跨系统数据一致性等场景,数据同步的核心挑战在于保证数据一致性高可用低延迟

以下是几种常见的Java数据同步实现方案,从简单到复杂,并附带核心代码示例。


核心概念:为什么需要同步?

  • 本地多线程同步:防止共享变量(如计数器、缓存Map)的并发修改。
  • 主从数据库同步:读写分离后,写库到读库的数据复制延迟。
  • 应用缓存与数据库同步:Redis缓存与MySQL数据的一致性。
  • 分布式系统数据同步:不同微服务之间(如订单服务与库存服务)的业务数据最终一致。

基于 JVM 本地锁/同步工具(单机环境)

适用于单个Java服务实例内的临界资源访问。

使用 synchronizedReentrantLock(线程安全)

public class LocalDataSync {
    private final Object lock = new Object();
    private Map<String, String> dataMap = new HashMap<>();
    // 同步写入
    public void syncPut(String key, String value) {
        synchronized (lock) {
            // 模拟同步操作:先检查,再写入
            if (!dataMap.containsKey(key)) {
                dataMap.put(key, value);
            }
        }
    }
    // 使用 ReentrantLock 实现带超时的同步
    private final ReentrantLock reentrantLock = new ReentrantLock();
    public boolean tryUpdateWithTimeout(String key, String value, long timeout, TimeUnit unit) {
        try {
            if (reentrantLock.tryLock(timeout, unit)) {
                try {
                    dataMap.put(key, value);
                    return true;
                } finally {
                    reentrantLock.unlock();
                }
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return false;
    }
}
  • 适用场景:单机服务的本地缓存刷新、计数器。

基于数据库乐观锁/悲观锁(关系型数据库)

适用于多服务实例需要保证同一份数据不被重复修改。

乐观锁(CAS思想,推荐高并发场景)

-- 数据库表增加 version 字段
ALTER TABLE product_stock ADD COLUMN version INT DEFAULT 1;
// Java 使用 MyBatis-Plus 实现乐观锁
public class StockSyncService {
    @Transactional
    public boolean deductStock(Long productId, Integer amount) {
        // 1. 查询带版本号的记录
        ProductStock stock = stockMapper.selectById(productId);
        // 2. 检查库存
        if (stock.getStock() < amount) {
            throw new BizException("库存不足");
        }
        // 3. 更新 stock=stock-1, version=version+1, where version=oldVersion
        int rows = stockMapper.update(
                new LambdaUpdateWrapper<ProductStock>()
                        .eq(ProductStock::getId, productId)
                        .eq(ProductStock::getVersion, stock.getVersion()) // 乐观锁条件
                        .set(ProductStock::getStock, stock.getStock() - amount)
                        .set(ProductStock::getVersion, stock.getVersion() + 1)
        );
        // rows == 0,说明版本已变更,重试或报错
        if (rows == 0) {
            // 抛出异常触发事务回滚,或进行重试
            throw new OptimisticLockException("数据已被其他线程修改");
        }
        return true;
    }
}

悲观锁(SELECT ... FOR UPDATE

适用于写冲突较少的场景。

@Transactional
public void syncUpdateWithPessimisticLock(Long orderId, String status) {
    // 数据库行锁:其他事务无法同时操作该行
    Order order = orderMapper.selectForUpdate(orderId); 
    if (order == null) return;
    order.setStatus(status);
    orderMapper.updateById(order);
}

基于 Redis 分布式锁(跨实例同步)

适用于多个Java服务实例抢锁执行同步任务。

使用 Redis + Redisson (推荐)

<!-- Maven 依赖 -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.27.0</version>
</dependency>
@Service
public class DistributedSyncService {
    @Autowired
    private RedissonClient redissonClient;
    public void syncDeductStock(Long productId, Integer amount) {
        String lockKey = "stock:lock:" + productId;
        RLock lock = redissonClient.getLock(lockKey);
        try {
            // 1. 尝试获取锁(等待30秒,锁自动释放30秒)
            if (lock.tryLock(30, 30, TimeUnit.SECONDS)) {
                // 2. 业务操作:查询-判断-更新库存
                ProductStock stock = stockMapper.selectById(productId);
                if (stock.getStock() >= amount) {
                    stock.setStock(stock.getStock() - amount);
                    stockMapper.updateById(stock);
                }
            } else {
                throw new BizException("系统繁忙,请稍后重试");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            // 3. 释放锁
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }
}

基于消息队列(最终一致性)

适用于跨系统、非实时要求的同步(如订单成功后同步履约系统)。

典型流程:本地事务 + 消息确认

@Service
public class OrderSyncService {
    @Autowired
    private TransactionTemplate transactionTemplate;
    @Autowired
    private KafkaTemplate<String, OrderEvent> kafkaTemplate;
    public void createOrder(Order order) {
        // 1. 执行本地事务(写订单表)
        Order savedOrder = transactionTemplate.execute(status -> {
            orderMapper.insert(order);
            // 记录待发送消息(如消息表)
            messageDao.insert(new Message(order.getId(), "OrderCreated"));
            return order;
        });
        // 2. 发送消息(异步或同步重试)
        if (savedOrder != null) {
            // 可以借助本地消息表、RocketMQ事务消息、Kafka Exactly Once等机制
            kafkaTemplate.send("order-sync-topic", 
                new OrderEvent(savedOrder.getId(), savedOrder.getUserId(), savedOrder.getAmount()));
            // 3. 删除本地消息标记(或更新为已发送)
            messageDao.deleteByOrderId(savedOrder.getId());
        }
    }
}
  • 消费者:监听消息,更新下游系统(如库存、积分)。
  • 关键点:生产者保证“本地事务提交”与“消息发送”的一致性;消费者保证幂等。

基于 CDC(Change Data Capture)—— 数据库变更捕获

使用 Debezium、Canal 等工具监听数据库 binlog,推送给Java应用进行同步。

示例:Canal 监听 MySQL binlog

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.7</version>
</dependency>
public void startCanalClient() {
    // 连接到 Canal Server
    CanalConnector connector = CanalConnectors.newSingleConnector(
            new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
    connector.connect();
    connector.subscribe("user_db.user_table"); // 订阅数据库
    while (running) {
        Message message = connector.getWithoutAck(100); // 获取100条
        long batchId = message.getId();
        List<CanalEntry.Entry> entries = message.getEntries();
        for (CanalEntry.Entry entry : entries) {
            // 解析 entry 得到 RowChange
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            for (RowData rowData : rowChange.getRowDatasList()) {
                // 判断是 INSERT / UPDATE / DELETE
                // 同步到另一个数据库、Elasticsearch 或 Redis
                syncToTargetSystem(rowData);
            }
        }
        connector.ack(batchId);
    }
}
  • 适用场景:实时数据仓库、缓存与数据库强一致、异地多活。

基于定时任务轮询(批量化)

适用于数据量不大、对实时性要求不高的同步(如每天凌晨同步商品信息)。

Spring Schedule + 幂等控制

@Component
public class DataSyncJob {
    @Autowired
    private JdbcTemplate jdbcTemplate;
    @Scheduled(cron = "0 0 1 * * ?")  // 每天凌晨1点
    public void syncDataFromSourceToTarget() {
        // 假设从A库同步到B库
        List<Map<String, Object>> sourceData = jdbcTemplate.queryForList(
                "SELECT * FROM source_table WHERE update_time > ?", 
                lastSyncTime);
        // 批量更新或插入到目标表
        for (Map<String, Object> row : sourceData) {
            jdbcTemplate.update(
                "INSERT INTO target_table (id, name, ...) VALUES (?,?,...) ON DUPLICATE KEY UPDATE ...",
                row.get("id"), row.get("name"));
        }
        // 记录本次同步时间戳
        lastSyncTime = LocalDateTime.now();
    }
}
  • 注意:需要处理重复数据、同步失败重试、分布式唯一性。

总结表格:如何选择?

场景 推荐方案 一致性级别 复杂度
单机多线程共享变量 synchronized / ReentrantLock 强一致
多实例修改同一行数据(如扣库存) Redis分布式锁 + 乐观锁 强一致(最终锁定)
微服务之间同步业务数据 消息队列(RocketMQ/Kafka) 最终一致 中高
数据库复制/同步 Canal/Debezium (CDC) 最终一致(低延迟)
批量报表同步(T+1) 定时任务 + 批处理 最终一致
缓存与数据库的双写一致性 先写库 + 删除缓存 + 延迟双删 / Canal 最终一致

核心原则:不要试图用“一把锁”解决所有同步问题,而是根据业务对一致性、性能、复杂度的容忍度,选择最适合的方案。

抱歉,评论功能暂时关闭!