Java案例如何实现分布式事务?

wen java案例 77

本文目录导读:

Java案例如何实现分布式事务?

  1. 核心原理:从ACID到CAP/BASE
  2. 主流实现方案对比
  3. 典型案例与代码实现
  4. 选型建议与避坑指南

在Java中实现分布式事务,通常需要解决跨多个数据库、消息队列或服务的数据一致性问题,由于传统的本地事务(如JDBC事务)无法跨越多个资源,因此需要引入专门的技术方案。

下面从核心原理主流实现方案具体代码示例以及选型建议几个方面进行详细说明。


核心原理:从ACID到CAP/BASE

分布式事务不可能同时满足ACID(原子性、一致性、隔离性、持久性)的强一致性,通常需要在一致性(C)可用性(A)分区容忍性(P)之间做出权衡。

  • 强一致性方案:追求数据实时一致,如2PC(两阶段提交)、3PC。
  • 最终一致性方案:允许短暂的不一致,通过补偿机制保证数据最终一致,这是大多数互联网业务的首选。

主流实现方案对比

方案 原理 优点 缺点 适用场景
2PC(两阶段提交) 协调者(Coordinator)先问所有参与者能否提交,都同意则提交,否则回滚。 实现简单,强一致性。 同步阻塞、性能差、单点问题、协调者故障时无法释放锁。 银行转账、对一致性要求极高的传统金融场景。
TCC(Try-Confirm-Cancel) 业务层面的补偿,Try阶段预留资源,Confirm阶段确认执行,Cancel阶段释放资源。 性能高、无锁、灵活。 业务侵入性强(每个接口都要写Try、Confirm、Cancel三个方法),开发成本高。 高并发、对一致性要求高但允许短时不一致的场景。
可靠消息最终一致性 结合本地消息表或消息队列的ACK机制。 解耦性好、性能高。 极端情况下可能丢失消息(需配合定时任务补偿)。 跨服务异步调用(如下单后发优惠券、发积分)。
最大努力通知 不断重试回调,直到成功或达到最大次数。 实现简单、成本低。 无法保证最终一致性(业务方需自己做幂等)。 支付回调、第三方接口通知。
Seata 框架 阿里巴巴开源的方案,支持AT(自动补偿,类似2PC改良)、TCC、Saga。 开箱即用、对业务代码侵入小(AT模式)、社区活跃。 AT模式性能受限于全局锁;配置复杂。 微服务架构下的通用分布式事务需求。

典型案例与代码实现

案例:用户下单扣库存 + 积分 + 优惠券

业务逻辑:用户下单成功后,需要扣减库存(服务A)、增加用户积分(服务B)、扣减优惠券(服务C)。

方案1:使用 Seata AT 模式(推荐,业务侵入最小)

Seata AT模式自动通过JDBC代理生成UNDO_LOG来实现回滚,对业务代码几乎无侵入。

环境准备

  • 引入依赖:seata-spring-boot-starter
  • 配置file.confregistry.conf(或用Nacos配置)。
  • 启动Seata Server(TC,事务协调器)。

事务发起方(Order Service)

@RestController
public class OrderController {
    @Autowired
    private OrderService orderService;
    @PostMapping("/createOrder")
    @GlobalTransactional // 开启全局事务
    public String createOrder(@RequestBody OrderDTO orderDTO) {
        // 1. 创建订单(本地事务)
        // 2. 调用库存服务扣减库存
        // 3. 调用积分服务增加积分
        // 4. 调用优惠券服务扣减优惠券
        orderService.createOrder(orderDTO);
        return "success";
    }
}

各微服务(RM,资源管理器)

// Service A - 库存服务
@Service
public class StorageService {
    @Autowired
    private StorageMapper storageMapper;
    // 这里不需要任何分布式事务注解,Seata通过JDBC代理自动参与
    @Transactional // 仅本地事务
    public void deduct(Long productId, Integer count) {
        storageMapper.deduct(productId, count);
    }
}
// Service B - 积分服务 (同理)
// Service C - 优惠券服务 (同理)

执行流程@GlobalTransactional -> 开启全局XID -> 调用各服务 -> 所有RM都执行成功 -> TC发起全局提交,若某一RM失败,TC通知所有RM回滚(靠UNDO_LOG)。

方案2:TCC 模式(灵活可控,业务侵入)

适合需要精细化控制资源预留和释放的场景,例如库存扣减(预占库存 vs 真实扣减)。

Define TCC Interface (Try/Confirm/Cancel)

public interface StorageTccService {
    // Try: 预占库存(冻结库存)
    @TwoPhaseBusinessAction(name = "deductTcc", commitMethod = "confirm", rollbackMethod = "cancel")
    public boolean deduct(BusinessActionContext actionContext,
                          @BusinessActionContextParameter(paramName = "productId") Long productId,
                          @BusinessActionContextParameter(paramName = "count") Integer count, 
                          @BusinessActionContextParameter(paramName = "orderId") String orderId);
    // Confirm: 确认扣减(从冻结库存中真实扣除)
    public boolean confirm(BusinessActionContext actionContext);
    // Cancel: 取消扣减(解冻库存)
    public boolean cancel(BusinessActionContext actionContext);
}

实现类(库存服务)

@Service
public class StorageTccServiceImpl implements StorageTccService {
    @Autowired
    private StorageMapper storageMapper;
    @Override
    public boolean deduct(BusinessActionContext actionContext, 
                          Long productId, Integer count, String orderId) {
        // 冻结库存 update storage set frozen = frozen + ? where product_id = ? and available >= ?
        storageMapper.freezeStock(productId, count);
        return true; // 必须返回true,表示Try成功
    }
    @Override
    public boolean confirm(BusinessActionContext actionContext) {
        Long productId = Long.parseLong(actionContext.getActionContext("productId").toString());
        Integer count = Integer.parseInt(actionContext.getActionContext("count").toString());
        // 真正扣减:update storage set available = available - ?, frozen = frozen - ? where product_id = ? and frozen >= ?
        storageMapper.realDeduct(productId, count);
        return true;
    }
    @Override
    public boolean cancel(BusinessActionContext actionContext) {
        Long productId = Long.parseLong(actionContext.getActionContext("productId").toString());
        Integer count = Integer.parseInt(actionContext.getActionContext("count").toString());
        // 解冻库存:update storage set frozen = frozen - ? where product_id = ? and frozen >= ?
        storageMapper.unfreezeStock(productId, count);
        return true;
    }
}

事务发起方

@Service
public class OrderService {
    @Autowired
    private StorageTccService storageTccService;
    @GlobalTransactional // 同样需要全局事务注解
    @Transactional
    public void createOrder(OrderDTO orderDTO) {
        // 本地订单写入
        orderMapper.insert(...);
        // 调用TCC接口
        storageTccService.deduct(null, orderDTO.getProductId(), orderDTO.getCount(), orderDTO.getOrderId());
        // 其他服务同理...
    }
}

方案3:可靠消息 + 本地消息表(异步最终一致性)

适用于对实时性要求不高、但需要保证数据最终一致的场景(如积分赠送)。

核心流程:

  1. 在业务A的数据库中建一张event_publisher消息表。
  2. 业务A在本地事务中:执行核心业务(如订单创建) + 写入消息表(状态:待发送)。
  3. 定时任务扫描消息表,将待发送消息推送给MQ。
  4. 业务B消费MQ消息,执行本地业务(如增加积分)。
  5. 业务B执行成功,向MQ发送ACK,业务A从消息表中删除该记录。
  6. 如果业务B执行失败,MQ会不断重试,若重试多次失败,业务B需提供幂等处理并记录错误。

示例代码(发送方):

// 1. 业务方法
@Transactional
public void createOrder(OrderDTO order) {
    // 核心业务
    orderDao.insert(order);
    // 构造消息事件
    EventMessage event = new EventMessage();
    event.setContent(JSON.toJSONString(order));
    event.setStatus("NEW");
    event.setCreateTime(new Date());
    // 插入本地事件表
    eventDao.insert(event);
}
// 2. 定时任务
@Scheduled(fixedDelay = 5000)
public void sendMessage() {
    // 扫描状态为NEW且未发送的消息
    List<EventMessage> messages = eventDao.selectNewMessages();
    for (EventMessage msg : messages) {
        try {
            // 使用Kafka/RocketMQ发送
            kafkaTemplate.send("order-topic", msg.getContent());
            // 发送成功,更新状态为SENT
            eventDao.updateStatus(msg.getId(), "SENT");
        } catch (Exception e) {
            log.error("发送失败,下次重试", e);
        }
    }
}

选型建议与避坑指南

场景 推荐方案 说明
新项目,微服务架构 Seata AT 模式 对代码侵入最小,只需加@GlobalTransactional和配置,注意性能瓶颈(全局锁)。
老项目改造,无法修改业务代码 Seata AT 模式 不需要修改SQL和业务逻辑,但需要建立undo_log表。
高并发,可接受短时不一致 RocketMQ 事务消息 性能极高,但需要业务方做好幂等。
需要精确控制资源(如库存) TCC 模式 隔离性最好,能防止超卖,使用Seata的TCC模块或ByteTCC框架。
涉及第三方系统(无法控制代码) 最大努力通知 不断重试回调URL,直至成功。
需要回滚复杂业务逻辑 Saga 模式 每个子事务对应一个补偿(回滚)方法,适用于长事务,如旅游、快递。

避坑指南:

  1. 幂等性:任何分布式事务方案都无法保证100%不重复执行,你的接口必须支持幂等(无论调用多少次,结果一致),常见方法:唯一主键、去重表、版本号。
  2. 事务的超时与重试:设计好合理的超时时间和指数退避重试策略,避免重试风暴。
  3. 全局ID传递:确保Seata框架传递的XID(全局事务ID)能在服务间成功传递,通常通过Dubbo/RocketMQ的隐式参数或HTTP的Header传递。
  4. 不要滥用分布式事务:大部分业务场景应该设计为“本地事务 + 异步队列”,尽可能避免引入分布式事务,因为它会降低系统吞吐量。

Java案例实现分布式事务的最佳实践是:优先考虑业务重构,看能否规避分布式事务(如通过ID路由将同用户的操作放在同一个数据库分片中),如果实在无法避免,推荐使用Seata AT模式作为通用解决方案,对于高并发场景,TCC或可靠消息最终一致性是更优的选择。

选择方案时,需要在一致性、可用性、性能、开发成本之间做出最适合当前业务的权衡。

抱歉,评论功能暂时关闭!