本文目录导读:

在Java中实现分布式事务,通常需要解决跨多个数据库、消息队列或服务的数据一致性问题,由于传统的本地事务(如JDBC事务)无法跨越多个资源,因此需要引入专门的技术方案。
下面从核心原理、主流实现方案、具体代码示例以及选型建议几个方面进行详细说明。
核心原理:从ACID到CAP/BASE
分布式事务不可能同时满足ACID(原子性、一致性、隔离性、持久性)的强一致性,通常需要在一致性(C)、可用性(A) 和分区容忍性(P)之间做出权衡。
- 强一致性方案:追求数据实时一致,如2PC(两阶段提交)、3PC。
- 最终一致性方案:允许短暂的不一致,通过补偿机制保证数据最终一致,这是大多数互联网业务的首选。
主流实现方案对比
| 方案 | 原理 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| 2PC(两阶段提交) | 协调者(Coordinator)先问所有参与者能否提交,都同意则提交,否则回滚。 | 实现简单,强一致性。 | 同步阻塞、性能差、单点问题、协调者故障时无法释放锁。 | 银行转账、对一致性要求极高的传统金融场景。 |
| TCC(Try-Confirm-Cancel) | 业务层面的补偿,Try阶段预留资源,Confirm阶段确认执行,Cancel阶段释放资源。 | 性能高、无锁、灵活。 | 业务侵入性强(每个接口都要写Try、Confirm、Cancel三个方法),开发成本高。 | 高并发、对一致性要求高但允许短时不一致的场景。 |
| 可靠消息最终一致性 | 结合本地消息表或消息队列的ACK机制。 | 解耦性好、性能高。 | 极端情况下可能丢失消息(需配合定时任务补偿)。 | 跨服务异步调用(如下单后发优惠券、发积分)。 |
| 最大努力通知 | 不断重试回调,直到成功或达到最大次数。 | 实现简单、成本低。 | 无法保证最终一致性(业务方需自己做幂等)。 | 支付回调、第三方接口通知。 |
| Seata 框架 | 阿里巴巴开源的方案,支持AT(自动补偿,类似2PC改良)、TCC、Saga。 | 开箱即用、对业务代码侵入小(AT模式)、社区活跃。 | AT模式性能受限于全局锁;配置复杂。 | 微服务架构下的通用分布式事务需求。 |
典型案例与代码实现
案例:用户下单扣库存 + 积分 + 优惠券
业务逻辑:用户下单成功后,需要扣减库存(服务A)、增加用户积分(服务B)、扣减优惠券(服务C)。
方案1:使用 Seata AT 模式(推荐,业务侵入最小)
Seata AT模式自动通过JDBC代理生成UNDO_LOG来实现回滚,对业务代码几乎无侵入。
环境准备
- 引入依赖:
seata-spring-boot-starter。 - 配置
file.conf、registry.conf(或用Nacos配置)。 - 启动Seata Server(TC,事务协调器)。
事务发起方(Order Service)
@RestController
public class OrderController {
@Autowired
private OrderService orderService;
@PostMapping("/createOrder")
@GlobalTransactional // 开启全局事务
public String createOrder(@RequestBody OrderDTO orderDTO) {
// 1. 创建订单(本地事务)
// 2. 调用库存服务扣减库存
// 3. 调用积分服务增加积分
// 4. 调用优惠券服务扣减优惠券
orderService.createOrder(orderDTO);
return "success";
}
}
各微服务(RM,资源管理器)
// Service A - 库存服务
@Service
public class StorageService {
@Autowired
private StorageMapper storageMapper;
// 这里不需要任何分布式事务注解,Seata通过JDBC代理自动参与
@Transactional // 仅本地事务
public void deduct(Long productId, Integer count) {
storageMapper.deduct(productId, count);
}
}
// Service B - 积分服务 (同理)
// Service C - 优惠券服务 (同理)
执行流程:
@GlobalTransactional -> 开启全局XID -> 调用各服务 -> 所有RM都执行成功 -> TC发起全局提交,若某一RM失败,TC通知所有RM回滚(靠UNDO_LOG)。
方案2:TCC 模式(灵活可控,业务侵入)
适合需要精细化控制资源预留和释放的场景,例如库存扣减(预占库存 vs 真实扣减)。
Define TCC Interface (Try/Confirm/Cancel)
public interface StorageTccService {
// Try: 预占库存(冻结库存)
@TwoPhaseBusinessAction(name = "deductTcc", commitMethod = "confirm", rollbackMethod = "cancel")
public boolean deduct(BusinessActionContext actionContext,
@BusinessActionContextParameter(paramName = "productId") Long productId,
@BusinessActionContextParameter(paramName = "count") Integer count,
@BusinessActionContextParameter(paramName = "orderId") String orderId);
// Confirm: 确认扣减(从冻结库存中真实扣除)
public boolean confirm(BusinessActionContext actionContext);
// Cancel: 取消扣减(解冻库存)
public boolean cancel(BusinessActionContext actionContext);
}
实现类(库存服务)
@Service
public class StorageTccServiceImpl implements StorageTccService {
@Autowired
private StorageMapper storageMapper;
@Override
public boolean deduct(BusinessActionContext actionContext,
Long productId, Integer count, String orderId) {
// 冻结库存 update storage set frozen = frozen + ? where product_id = ? and available >= ?
storageMapper.freezeStock(productId, count);
return true; // 必须返回true,表示Try成功
}
@Override
public boolean confirm(BusinessActionContext actionContext) {
Long productId = Long.parseLong(actionContext.getActionContext("productId").toString());
Integer count = Integer.parseInt(actionContext.getActionContext("count").toString());
// 真正扣减:update storage set available = available - ?, frozen = frozen - ? where product_id = ? and frozen >= ?
storageMapper.realDeduct(productId, count);
return true;
}
@Override
public boolean cancel(BusinessActionContext actionContext) {
Long productId = Long.parseLong(actionContext.getActionContext("productId").toString());
Integer count = Integer.parseInt(actionContext.getActionContext("count").toString());
// 解冻库存:update storage set frozen = frozen - ? where product_id = ? and frozen >= ?
storageMapper.unfreezeStock(productId, count);
return true;
}
}
事务发起方
@Service
public class OrderService {
@Autowired
private StorageTccService storageTccService;
@GlobalTransactional // 同样需要全局事务注解
@Transactional
public void createOrder(OrderDTO orderDTO) {
// 本地订单写入
orderMapper.insert(...);
// 调用TCC接口
storageTccService.deduct(null, orderDTO.getProductId(), orderDTO.getCount(), orderDTO.getOrderId());
// 其他服务同理...
}
}
方案3:可靠消息 + 本地消息表(异步最终一致性)
适用于对实时性要求不高、但需要保证数据最终一致的场景(如积分赠送)。
核心流程:
- 在业务A的数据库中建一张
event_publisher消息表。 - 业务A在本地事务中:执行核心业务(如订单创建) + 写入消息表(状态:待发送)。
- 定时任务扫描消息表,将待发送消息推送给MQ。
- 业务B消费MQ消息,执行本地业务(如增加积分)。
- 业务B执行成功,向MQ发送ACK,业务A从消息表中删除该记录。
- 如果业务B执行失败,MQ会不断重试,若重试多次失败,业务B需提供幂等处理并记录错误。
示例代码(发送方):
// 1. 业务方法
@Transactional
public void createOrder(OrderDTO order) {
// 核心业务
orderDao.insert(order);
// 构造消息事件
EventMessage event = new EventMessage();
event.setContent(JSON.toJSONString(order));
event.setStatus("NEW");
event.setCreateTime(new Date());
// 插入本地事件表
eventDao.insert(event);
}
// 2. 定时任务
@Scheduled(fixedDelay = 5000)
public void sendMessage() {
// 扫描状态为NEW且未发送的消息
List<EventMessage> messages = eventDao.selectNewMessages();
for (EventMessage msg : messages) {
try {
// 使用Kafka/RocketMQ发送
kafkaTemplate.send("order-topic", msg.getContent());
// 发送成功,更新状态为SENT
eventDao.updateStatus(msg.getId(), "SENT");
} catch (Exception e) {
log.error("发送失败,下次重试", e);
}
}
}
选型建议与避坑指南
| 场景 | 推荐方案 | 说明 |
|---|---|---|
| 新项目,微服务架构 | Seata AT 模式 | 对代码侵入最小,只需加@GlobalTransactional和配置,注意性能瓶颈(全局锁)。 |
| 老项目改造,无法修改业务代码 | Seata AT 模式 | 不需要修改SQL和业务逻辑,但需要建立undo_log表。 |
| 高并发,可接受短时不一致 | RocketMQ 事务消息 | 性能极高,但需要业务方做好幂等。 |
| 需要精确控制资源(如库存) | TCC 模式 | 隔离性最好,能防止超卖,使用Seata的TCC模块或ByteTCC框架。 |
| 涉及第三方系统(无法控制代码) | 最大努力通知 | 不断重试回调URL,直至成功。 |
| 需要回滚复杂业务逻辑 | Saga 模式 | 每个子事务对应一个补偿(回滚)方法,适用于长事务,如旅游、快递。 |
避坑指南:
- 幂等性:任何分布式事务方案都无法保证100%不重复执行,你的接口必须支持幂等(无论调用多少次,结果一致),常见方法:唯一主键、去重表、版本号。
- 事务的超时与重试:设计好合理的超时时间和指数退避重试策略,避免重试风暴。
- 全局ID传递:确保Seata框架传递的
XID(全局事务ID)能在服务间成功传递,通常通过Dubbo/RocketMQ的隐式参数或HTTP的Header传递。 - 不要滥用分布式事务:大部分业务场景应该设计为“本地事务 + 异步队列”,尽可能避免引入分布式事务,因为它会降低系统吞吐量。
Java案例实现分布式事务的最佳实践是:优先考虑业务重构,看能否规避分布式事务(如通过ID路由将同用户的操作放在同一个数据库分片中),如果实在无法避免,推荐使用Seata AT模式作为通用解决方案,对于高并发场景,TCC或可靠消息最终一致性是更优的选择。
选择方案时,需要在一致性、可用性、性能、开发成本之间做出最适合当前业务的权衡。