MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Saga 模式中的事务补偿幂等性设计

2023-08-092.4k 阅读

Saga 模式简介

在分布式系统中,由于业务的复杂性和服务的拆分,单个业务操作往往需要跨多个服务完成,而每个服务又可能涉及到本地事务。传统的单体应用中的事务管理方式(如数据库的 ACID 事务)在分布式环境下难以直接应用,因为分布式环境存在网络延迟、服务故障等问题。Saga 模式应运而生,它通过将一个大的业务事务拆分成多个本地事务,每个本地事务对应一个服务的操作,并通过协调这些本地事务的执行来完成整个业务流程。

Saga 模式的核心思想是通过一系列本地事务的有序执行来完成一个分布式事务。如果其中某个本地事务失败,Saga 会执行一系列的补偿事务,将之前已执行的本地事务回滚,以保证数据的一致性。例如,在一个电商系统中,下单操作可能涉及到库存服务减少库存、订单服务创建订单、支付服务处理支付等多个服务。在 Saga 模式下,这些操作会作为一系列本地事务依次执行。如果支付服务出现故障,Saga 会调用库存服务的补偿事务(增加库存)和订单服务的补偿事务(删除订单)来恢复到事务开始前的状态。

事务补偿的概念

事务补偿是 Saga 模式中的关键机制。当 Saga 执行过程中某个本地事务失败时,为了保证数据的一致性,需要对已经成功执行的本地事务进行回滚操作,这个回滚操作就是事务补偿。事务补偿通常是通过执行与原本地事务相反的操作来实现的。比如,原本地事务是从账户 A 向账户 B 转账 100 元,那么补偿事务就是从账户 B 向账户 A 转账 100 元。

在实际应用中,事务补偿需要考虑多种因素。首先,补偿事务的执行必须保证可靠性,即无论在何种情况下,补偿事务都应该能够成功执行,以恢复数据到正确状态。其次,由于分布式系统的不确定性,补偿事务可能会被多次调用,因此需要保证补偿事务的幂等性。

幂等性的重要性

幂等性是指对同一操作的多次执行,其结果与执行一次是相同的。在 Saga 模式的事务补偿中,幂等性尤为重要。由于网络故障、服务重试机制等原因,补偿事务可能会被重复触发。如果补偿事务不具备幂等性,多次执行补偿事务可能会导致数据不一致等问题。

例如,假设一个订单创建后,库存减少了 10 件商品。当订单取消时,需要执行库存增加 10 件商品的补偿事务。如果这个补偿事务不具备幂等性,由于网络问题导致补偿事务被重复执行两次,那么库存就会增加 20 件商品,这显然不符合预期,会导致数据的不一致。

幂等性设计原则

  1. 基于唯一标识:为每个事务操作生成一个唯一标识,在执行补偿事务时,根据这个唯一标识判断该补偿事务是否已经执行过。如果已经执行过,则不再重复执行。例如,可以使用 UUID 作为唯一标识,在创建订单事务时生成一个 UUID,在库存减少事务和订单取消的补偿事务中都携带这个 UUID。当执行库存增加的补偿事务时,首先检查该 UUID 是否已经处理过库存增加操作,如果是,则直接返回成功,不再重复执行增加库存的逻辑。
  2. 状态机控制:设计一个状态机来跟踪事务和补偿事务的执行状态。只有当事务处于特定状态时,才允许执行补偿事务。并且在执行补偿事务后,及时更新状态机的状态,以防止重复执行。例如,一个订单创建事务可能有“初始”“订单创建成功”“库存减少成功”“支付成功”等状态,订单取消的补偿事务只有在“支付成功”状态下才允许执行,执行后将状态更新为“订单取消补偿完成”,再次收到取消请求时,根据状态判断已经完成补偿,不再执行。
  3. 数据库约束:利用数据库的唯一约束、乐观锁等机制来保证幂等性。例如,在执行库存增加的补偿事务时,可以在库存表中添加一个唯一索引,如(订单号,操作类型),其中操作类型标识是增加库存还是减少库存。当重复执行增加库存的补偿事务时,由于唯一索引的存在,数据库会抛出异常,应用层捕获异常后可以认为补偿事务已经执行过,直接返回成功。

幂等性设计的代码示例

以下以 Java 和 Spring Boot 框架为例,展示如何在 Saga 模式的事务补偿中实现幂等性设计。

基于唯一标识的幂等性实现

  1. 定义事务操作的唯一标识 首先,定义一个用于生成唯一标识的工具类:
import java.util.UUID;

public class TransactionIdGenerator {
    public static String generateTransactionId() {
        return UUID.randomUUID().toString();
    }
}
  1. 库存服务代码 库存服务的减少库存和增加库存方法:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;

@Service
public class InventoryService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    public void decreaseInventory(String transactionId, String productId, int quantity) {
        // 检查是否已经执行过该操作
        int count = jdbcTemplate.queryForObject("SELECT COUNT(*) FROM inventory_transaction WHERE transaction_id =? AND product_id =? AND operation = 'decrease'",
                Integer.class, transactionId, productId);
        if (count == 0) {
            jdbcTemplate.update("UPDATE inventory SET quantity = quantity -? WHERE product_id =?", quantity, productId);
            jdbcTemplate.update("INSERT INTO inventory_transaction (transaction_id, product_id, operation, quantity) VALUES (?,?, 'decrease',?)",
                    transactionId, productId, quantity);
        }
    }

    public void increaseInventory(String transactionId, String productId, int quantity) {
        // 检查是否已经执行过该操作
        int count = jdbcTemplate.queryForObject("SELECT COUNT(*) FROM inventory_transaction WHERE transaction_id =? AND product_id =? AND operation = 'increase'",
                Integer.class, transactionId, productId);
        if (count == 0) {
            jdbcTemplate.update("UPDATE inventory SET quantity = quantity +? WHERE product_id =?", quantity, productId);
            jdbcTemplate.update("INSERT INTO inventory_transaction (transaction_id, product_id, operation, quantity) VALUES (?,?, 'increase',?)",
                    transactionId, productId, quantity);
        }
    }
}
  1. 订单服务代码 订单服务的创建订单和取消订单方法:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;

@Service
public class OrderService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    public void createOrder(String transactionId, String orderId, String productId, int quantity) {
        // 检查是否已经执行过该操作
        int count = jdbcTemplate.queryForObject("SELECT COUNT(*) FROM order_transaction WHERE transaction_id =? AND order_id =?",
                Integer.class, transactionId, orderId);
        if (count == 0) {
            jdbcTemplate.update("INSERT INTO orders (order_id, product_id, quantity) VALUES (?,?,?)", orderId, productId, quantity);
            jdbcTemplate.update("INSERT INTO order_transaction (transaction_id, order_id, operation) VALUES (?,?, 'create')",
                    transactionId, orderId);
        }
    }

    public void cancelOrder(String transactionId, String orderId) {
        // 检查是否已经执行过该操作
        int count = jdbcTemplate.queryForObject("SELECT COUNT(*) FROM order_transaction WHERE transaction_id =? AND order_id =? AND operation = 'cancel'",
                Integer.class, transactionId, orderId);
        if (count == 0) {
            jdbcTemplate.update("DELETE FROM orders WHERE order_id =?", orderId);
            jdbcTemplate.update("INSERT INTO order_transaction (transaction_id, order_id, operation) VALUES (?,?, 'cancel')",
                    transactionId, orderId);
        }
    }
}
  1. Saga 协调器代码 Saga 协调器负责调用各个服务的方法,并处理事务的执行和补偿:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class SagaCoordinator {

    @Autowired
    private InventoryService inventoryService;
    @Autowired
    private OrderService orderService;

    public void placeOrder(String orderId, String productId, int quantity) {
        String transactionId = TransactionIdGenerator.generateTransactionId();
        try {
            inventoryService.decreaseInventory(transactionId, productId, quantity);
            orderService.createOrder(transactionId, orderId, productId, quantity);
            // 模拟支付服务成功
            System.out.println("Payment service successful");
        } catch (Exception e) {
            // 补偿操作
            orderService.cancelOrder(transactionId, orderId);
            inventoryService.increaseInventory(transactionId, productId, quantity);
        }
    }
}

基于状态机的幂等性实现

  1. 定义状态枚举
public enum OrderStatus {
    INITIAL, ORDER_CREATED, INVENTORY_DECREASED, PAYMENT_SUCCESS, ORDER_CANCELED, INVENTORY_INCREASED
}
  1. 订单服务状态机相关代码
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;

@Service
public class OrderService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    public void createOrder(String orderId, String productId, int quantity, OrderStatus currentStatus) {
        if (currentStatus == OrderStatus.INITIAL) {
            jdbcTemplate.update("INSERT INTO orders (order_id, product_id, quantity) VALUES (?,?,?)", orderId, productId, quantity);
            jdbcTemplate.update("UPDATE order_status SET status =? WHERE order_id =?", OrderStatus.ORDER_CREATED, orderId);
        }
    }

    public void cancelOrder(String orderId, OrderStatus currentStatus) {
        if (currentStatus == OrderStatus.PAYMENT_SUCCESS) {
            jdbcTemplate.update("DELETE FROM orders WHERE order_id =?", orderId);
            jdbcTemplate.update("UPDATE order_status SET status =? WHERE order_id =?", OrderStatus.ORDER_CANCELED, orderId);
        }
    }
}
  1. 库存服务状态机相关代码
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;

@Service
public class InventoryService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    public void decreaseInventory(String productId, int quantity, OrderStatus currentStatus) {
        if (currentStatus == OrderStatus.ORDER_CREATED) {
            jdbcTemplate.update("UPDATE inventory SET quantity = quantity -? WHERE product_id =?", quantity, productId);
            jdbcTemplate.update("UPDATE order_status SET status =? WHERE product_id =?", OrderStatus.INVENTORY_DECREASED, productId);
        }
    }

    public void increaseInventory(String productId, int quantity, OrderStatus currentStatus) {
        if (currentStatus == OrderStatus.ORDER_CANCELED) {
            jdbcTemplate.update("UPDATE inventory SET quantity = quantity +? WHERE product_id =?", quantity, productId);
            jdbcTemplate.update("UPDATE order_status SET status =? WHERE product_id =?", OrderStatus.INVENTORY_INCREASED, productId);
        }
    }
}
  1. Saga 协调器状态机相关代码
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class SagaCoordinator {

    @Autowired
    private InventoryService inventoryService;
    @Autowired
    private OrderService orderService;

    public void placeOrder(String orderId, String productId, int quantity) {
        // 初始化订单状态
        jdbcTemplate.update("INSERT INTO order_status (order_id, status) VALUES (?,?)", orderId, OrderStatus.INITIAL);
        OrderStatus currentStatus = getOrderStatus(orderId);
        try {
            orderService.createOrder(orderId, productId, quantity, currentStatus);
            currentStatus = getOrderStatus(orderId);
            inventoryService.decreaseInventory(productId, quantity, currentStatus);
            currentStatus = getOrderStatus(orderId);
            // 模拟支付服务成功
            System.out.println("Payment service successful");
            currentStatus = getOrderStatus(orderId);
            jdbcTemplate.update("UPDATE order_status SET status =? WHERE order_id =?", OrderStatus.PAYMENT_SUCCESS, orderId);
        } catch (Exception e) {
            currentStatus = getOrderStatus(orderId);
            orderService.cancelOrder(orderId, currentStatus);
            currentStatus = getOrderStatus(orderId);
            inventoryService.increaseInventory(productId, quantity, currentStatus);
        }
    }

    private OrderStatus getOrderStatus(String orderId) {
        return jdbcTemplate.queryForObject("SELECT status FROM order_status WHERE order_id =?", OrderStatus.class, orderId);
    }
}

基于数据库约束的幂等性实现

  1. 库存服务基于数据库约束的代码
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;

@Service
public class InventoryService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    public void decreaseInventory(String productId, int quantity) {
        try {
            jdbcTemplate.update("UPDATE inventory SET quantity = quantity -? WHERE product_id =? AND quantity >=?", quantity, productId, quantity);
        } catch (Exception e) {
            // 由于唯一约束等导致的异常,认为已经执行过
        }
    }

    public void increaseInventory(String productId, int quantity) {
        try {
            jdbcTemplate.update("UPDATE inventory SET quantity = quantity +? WHERE product_id =?", quantity, productId);
        } catch (Exception e) {
            // 由于唯一约束等导致的异常,认为已经执行过
        }
    }
}
  1. 订单服务基于数据库约束的代码
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;

@Service
public class OrderService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    public void createOrder(String orderId, String productId, int quantity) {
        try {
            jdbcTemplate.update("INSERT INTO orders (order_id, product_id, quantity) VALUES (?,?,?)", orderId, productId, quantity);
        } catch (Exception e) {
            // 由于唯一约束等导致的异常,认为已经执行过
        }
    }

    public void cancelOrder(String orderId) {
        try {
            jdbcTemplate.update("DELETE FROM orders WHERE order_id =?", orderId);
        } catch (Exception e) {
            // 由于唯一约束等导致的异常,认为已经执行过
        }
    }
}
  1. Saga 协调器基于数据库约束的代码
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class SagaCoordinator {

    @Autowired
    private InventoryService inventoryService;
    @Autowired
    private OrderService orderService;

    public void placeOrder(String orderId, String productId, int quantity) {
        try {
            inventoryService.decreaseInventory(productId, quantity);
            orderService.createOrder(orderId, productId, quantity);
            // 模拟支付服务成功
            System.out.println("Payment service successful");
        } catch (Exception e) {
            // 补偿操作
            orderService.cancelOrder(orderId);
            inventoryService.increaseInventory(productId, quantity);
        }
    }
}

幂等性设计的挑战与应对

  1. 性能问题:基于唯一标识的幂等性实现可能需要额外的数据库查询来判断是否已经执行过操作,这可能会影响系统的性能。应对方法可以是合理使用缓存,将已经执行过的事务标识缓存起来,减少数据库查询次数。例如,使用 Redis 缓存已经执行过的事务标识,在执行事务操作前先查询缓存,如果缓存中存在,则直接返回成功,避免数据库查询。
  2. 状态同步问题:在基于状态机的幂等性实现中,不同服务之间的状态同步可能会出现问题。如果某个服务的状态更新失败,但其他服务认为状态已经更新,可能会导致不一致。可以通过引入分布式事务解决方案(如 TCC 模式辅助)来保证状态更新的一致性。或者采用消息队列来异步更新状态,并且在消息处理中增加重试机制,确保状态更新成功。
  3. 数据库约束冲突:基于数据库约束的幂等性实现可能会因为数据库约束冲突导致事务失败。例如,在高并发场景下,多个补偿事务同时执行,可能会因为唯一索引冲突而失败。可以采用乐观锁的方式,在更新数据时带上版本号,每次更新后版本号递增,避免并发冲突。或者在业务层面进行排队处理,将补偿事务请求放入队列中,依次处理,避免同时执行导致约束冲突。

总结幂等性设计要点

在 Saga 模式的事务补偿中,幂等性设计是保证数据一致性的关键。通过基于唯一标识、状态机控制和数据库约束等方法,可以有效地实现幂等性。然而,每种方法都有其优缺点和适用场景,需要根据具体的业务需求和系统架构来选择合适的幂等性设计方案。同时,在实现幂等性的过程中,要注意性能、状态同步和数据库约束冲突等问题,并采取相应的应对措施,以确保分布式系统的稳定运行和数据的一致性。