如何实现基于消息的最终一致性事务?

wen PHP项目 46

本文目录导读:

如何实现基于消息的最终一致性事务?

  1. 核心架构
  2. 实现步骤(以最常见的方法为例)
  3. 核心难点及解决方案
  4. 代码示例(简化版)
  5. 缺点与适用场景

基于消息的最终一致性事务,是分布式系统中解决跨服务数据一致性问题的一种常见模式,它的核心思想是:通过可靠的异步消息传递,结合本地事务和补偿机制,保证数据在最终时刻是一致的

以下是实现该模式的完整架构、核心步骤、关键问题及代码示例。


核心架构

通常包含三个主要角色:

  1. 主动方服务:发起事务的服务,它先执行本地事务并发送消息。
  2. 消息中间件:存储和路由消息,如 RocketMQ、Kafka 或 RabbitMQ。
  3. 被动方服务:消费消息并执行自身业务逻辑的服务,如果执行失败,需要触发补偿。

实现步骤(以最常见的方法为例)

本地消息表(最经典、可靠)

这种方法需要一个额外的“本地消息表”来记录事务状态,保证消息一定被发送。

流程:

  1. 开启本地事务: 主动方服务在自己的数据库中,同时执行:

    • 更新业务数据(如“扣减库存”)。
    • 向“本地消息表”插入一条状态为“待发送”的消息记录。 注意: 这两个操作必须在同一个数据库本地事务中完成(ACID)。
  2. 发送消息: 主动方服务中的定时任务或后台线程,轮询“本地消息表”中状态为“待发送”的消息,并发送到消息队列。

  3. 被动方消费消息并执行: 被动方服务从队列接收到消息后,执行自己的业务逻辑(如“新增订单”)。 关键: 需要保证消费的幂等性(通过唯一业务ID去重)。

  4. 确认与更新状态

    • 如果被动方执行成功,主动方收到回调或被动方通过消息队列的自动确认机制,主动方更新“本地消息表”状态为“已发送”。
    • 如果被动方执行失败(如业务异常、服务崩溃),消息会重试,如果多次重试后依然失败,主动方可将消息状态改为“需人工处理”或“回滚”,并触发补偿。

RocketMQ 事务消息(推荐)

RocketMQ 原生支持事务消息,省去了自行实现本地消息表的轮询逻辑。

流程:

  1. 发送“半消息”(prepare): 主动方服务向 RocketMQ 发送一条“半消息”,此时消息处于“暂不可见”状态,消费者无法消费。

  2. 执行本地事务: 主动方服务执行本地业务操作。

  3. 提交/回滚消息

    • 如果本地事务执行成功,向 RocketMQ 发送 commit 指令,消息变为“可见”,消费者可消费。
    • 如果本地事务执行失败,发送 rollback 指令,消息被丢弃。
  4. 回查机制: RocketMQ 长时间未收到 commit/rollback,会主动回调主动方服务的 checkLocalTransaction 方法,询问本地事务最终状态(成功、失败、未知),这是保证消息100%不丢失的关键。


核心难点及解决方案

  1. 消息的可靠性(必须发出去)

    • 方案:本地消息表 + 定时重试 + 幂等性。
    • 原理:只要本地事务提交成功,消息记录就在数据库里,后续无论发送失败多少次,定时任务都会尝试重发。
  2. 消费的幂等性(必须只执行一次)

    • 方案:使用业务唯一键(如订单号、事务ID)做去重表或数据库主键冲突。
    • 原理:被动方在执行业务前,先查一下去重表中是否有该ID,如果有则跳过。
  3. 被动方执行失败(需补偿)

    • 方案:提供补偿接口。
    • 原理:如果被动手变更失败(如库存不足),主动方需要调用被动方的“回滚接口”(如释放已锁定的库存),通常是异步的。

代码示例(简化版)

主动方:本地消息表实现

// 主动方服务
@Service
public class OrderService {
    @Autowired
    private LocalMessageMapper localMessageMapper; // 数据库
    @Transactional
    public void createOrder(Order order) {
        // 1. 业务操作:扣库存,生成订单
        inventoryService.deduct(order.getProductId());
        orderDao.insert(order);
        // 2. 插入本地消息表(状态:待发送)
        LocalMessage msg = new LocalMessage();
        msg.setId(UUID.randomUUID().toString());
        msg.setBusinessType("order");
        msg.setMsgBody(JSON.toJSONString(order));
        msg.setStatus(0); // 0-待发送
        localMessageMapper.insert(msg);
    }
    // 定时任务:扫描并发送待发送消息
    @Scheduled(fixedDelay = 5000)
    public void sendPendingMessages() {
        List<LocalMessage> pendingMsgs = localMessageMapper.selectByStatus(0);
        for (LocalMessage msg : pendingMsgs) {
            try {
                // 发送到MQ
                messageQueue.send(msg.getBusinessType(), msg.getMsgBody());
                // 发送成功,更新状态
                msg.setStatus(1); // 1-已发送
                localMessageMapper.updateById(msg);
            } catch (Exception e) {
                // 记录失败,下次再重试
                log.error("Send msg failed, msgId:{}", msg.getId());
            }
        }
    }
}

被动方:幂等消费 + 补偿

@Component
@RocketMQMessageListener(topic = "order", consumerGroup = "order_group")
public class OrderConsumer implements RocketMQListener<String> {
    @Transactional
    public void onMessage(String message) {
        // 1. 获取业务ID(如订单号)
        String orderId = extractOrderId(message);
        // 2. 幂等性检查(去重表或唯一索引)
        if (dedupDao.exists(orderId)) {
            log.info("重复消息,直接ack: {}", orderId);
            return; // 如果RocketMQ自动ack,此时会返回成功
        }
        // 3. 执行业务逻辑(如新增订单)
        Order order = JSON.parseObject(message, Order.class);
        orderDao.insert(order);
        // 4. 插入去重记录
        dedupDao.insert(new DedupRecord(orderId));
    }
    // 如果业务失败,触发补偿
    public void compensate(String orderId) {
        // 调用主动方或被动方自身的回滚接口
        inventoryService.addBack(orderId);
    }
}

缺点与适用场景

  • 优点:高可用、高可靠,适用于对最终一致性要求严格的场景(如支付、订单、库存)。
  • 缺点
    • 实现复杂度高(需考虑重试、去重、回查)。
    • 存在秒级延迟(消息投递、定时轮询)。
    • 不支持强一致性,只保证最终一致性。
  • 适用场景:转账、下单(库存扣减与订单生成)、用户注册(积分发放)等。

基于消息的最终一致性事务,本质上是用消息队列的可靠投递 + 本地事务的原子性 + 消费的幂等性,来确保分布式系统下数据最终一致,它不保证实时一致,但保证最终一致

如果你的业务对强一致性要求高(如余额扣减与订单同步),可能需要考虑 Seata 等分布式事务框架;但如果允许秒级延迟,基于消息的最终一致性模式是最常见且成熟的选择。

抱歉,评论功能暂时关闭!