MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

分布式系统中的分布式事务一致性解决方案

2022-05-046.3k 阅读

分布式事务概述

在分布式系统中,不同的服务和节点之间需要协同完成一系列操作,这些操作可能涉及多个数据库、消息队列或其他资源。当这些操作需要满足原子性、一致性、隔离性和持久性(ACID)特性时,就引入了分布式事务的概念。

分布式事务一致性面临的挑战主要源于网络的不可靠性、节点的故障以及数据的分布性。由于各个节点之间通过网络进行通信,网络延迟、丢包等问题可能导致节点之间状态不一致。同时,部分节点的故障也可能影响整个事务的执行结果。

分布式事务一致性模型

  1. 强一致性:强一致性要求任何时刻,所有节点上的数据都保持完全一致。对数据的更新操作一旦完成,后续的任何读取操作都必须返回最新的值。在分布式系统中实现强一致性难度较大,因为需要确保所有节点在更新操作完成后立刻同步,这通常需要大量的网络通信和协调,会严重影响系统的性能和可用性。

  2. 弱一致性:弱一致性允许系统在一段时间内存在数据不一致的情况。当更新操作完成后,不同节点上的数据可能需要一段时间才能达到一致状态。这种一致性模型对系统性能和可用性的影响较小,但可能会导致用户在某些情况下读取到旧数据。

  3. 最终一致性:最终一致性是弱一致性的一种特殊情况,它保证在没有新的更新操作的情况下,经过一段时间后,所有节点上的数据最终会达到一致。最终一致性在很多分布式系统中被广泛应用,它在性能、可用性和一致性之间取得了较好的平衡。

分布式事务一致性解决方案分类

  1. 基于 XA 协议的两阶段提交(2PC)
    • 原理:两阶段提交协议是一种经典的分布式事务解决方案。它将事务的提交过程分为两个阶段:准备阶段(Prepare)和提交阶段(Commit)。
      • 准备阶段:协调者向所有参与者发送 PREPARE 消息,参与者收到消息后执行事务操作,并将 Undo 和 Redo 信息记录到日志中,然后向协调者返回 VOTE_COMMITVOTE_ABORT 消息,表示是否准备好提交事务。
      • 提交阶段:如果协调者收到所有参与者的 VOTE_COMMIT 消息,那么它向所有参与者发送 COMMIT 消息,参与者收到 COMMIT 消息后正式提交事务;如果协调者收到任何一个参与者的 VOTE_ABORT 消息,那么它向所有参与者发送 ROLLBACK 消息,参与者收到 ROLLBACK 消息后回滚事务。
    • 代码示例(以 Java 和 JDBC 为例模拟 2PC)
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class TwoPhaseCommitExample {
    private static final String URL1 = "jdbc:mysql://localhost:3306/db1";
    private static final String URL2 = "jdbc:mysql://localhost:3306/db2";
    private static final String USER = "root";
    private static final String PASSWORD = "password";

    public static void main(String[] args) {
        Connection conn1 = null;
        Connection conn2 = null;
        try {
            conn1 = DriverManager.getConnection(URL1, USER, PASSWORD);
            conn2 = DriverManager.getConnection(URL2, USER, PASSWORD);

            // 开启事务
            conn1.setAutoCommit(false);
            conn2.setAutoCommit(false);

            // 执行操作
            PreparedStatement pstmt1 = conn1.prepareStatement("UPDATE table1 SET column1 =? WHERE id =?");
            pstmt1.setString(1, "new value");
            pstmt1.setInt(2, 1);
            pstmt1.executeUpdate();

            PreparedStatement pstmt2 = conn2.prepareStatement("UPDATE table2 SET column2 =? WHERE id =?");
            pstmt2.setString(1, "new value");
            pstmt2.setInt(2, 1);
            pstmt2.executeUpdate();

            // 准备阶段
            boolean vote1 = true;
            boolean vote2 = true;
            if (vote1 && vote2) {
                // 提交阶段
                conn1.commit();
                conn2.commit();
                System.out.println("事务提交成功");
            } else {
                // 回滚
                conn1.rollback();
                conn2.rollback();
                System.out.println("事务回滚");
            }
        } catch (SQLException e) {
            e.printStackTrace();
            try {
                if (conn1!= null) {
                    conn1.rollback();
                }
                if (conn2!= null) {
                    conn2.rollback();
                }
                System.out.println("事务回滚");
            } catch (SQLException ex) {
                ex.printStackTrace();
            }
        } finally {
            try {
                if (conn1!= null) {
                    conn1.close();
                }
                if (conn2!= null) {
                    conn2.close();
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}
  • 优缺点:2PC 的优点是能够保证强一致性,实现相对简单。缺点也很明显,它存在单点故障问题,协调者一旦出现故障,整个事务无法继续进行。同时,在准备阶段和提交阶段,参与者需要锁定资源,这会降低系统的并发性能。而且在网络延迟较高的情况下,整个事务的执行时间会很长。
  1. 三阶段提交(3PC)
    • 原理:三阶段提交协议是在两阶段提交协议的基础上进行改进的。它将事务提交过程分为三个阶段:询问阶段(CanCommit)、预提交阶段(PreCommit)和提交阶段(DoCommit)。
      • 询问阶段:协调者向所有参与者发送 CAN_COMMIT 消息,询问参与者是否可以进行事务操作。参与者收到消息后检查自身状态,如果可以进行事务操作,则返回 YES,否则返回 NO
      • 预提交阶段:如果协调者收到所有参与者的 YES 消息,那么它向所有参与者发送 PRE_COMMIT 消息,参与者收到消息后执行事务操作,并将 Undo 和 Redo 信息记录到日志中,然后向协调者返回 ACK 消息。如果协调者收到任何一个参与者的 NO 消息,那么它向所有参与者发送 ABORT 消息,参与者收到 ABORT 消息后放弃事务。
      • 提交阶段:如果协调者收到所有参与者的 ACK 消息,那么它向所有参与者发送 DO_COMMIT 消息,参与者收到 DO_COMMIT 消息后正式提交事务;如果协调者在一定时间内没有收到所有参与者的 ACK 消息,那么它向所有参与者发送 ABORT 消息,参与者收到 ABORT 消息后回滚事务。
    • 代码示例(简化模拟 3PC,以 Java 和 JDBC 为例)
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class ThreePhaseCommitExample {
    private static final String URL1 = "jdbc:mysql://localhost:3306/db1";
    private static final String URL2 = "jdbc:mysql://localhost:3306/db2";
    private static final String USER = "root";
    private static final String PASSWORD = "password";

    public static void main(String[] args) {
        Connection conn1 = null;
        Connection conn2 = null;
        try {
            conn1 = DriverManager.getConnection(URL1, USER, PASSWORD);
            conn2 = DriverManager.getConnection(URL2, USER, PASSWORD);

            // 询问阶段
            boolean canCommit1 = true;
            boolean canCommit2 = true;
            if (canCommit1 && canCommit2) {
                // 预提交阶段
                conn1.setAutoCommit(false);
                conn2.setAutoCommit(false);

                PreparedStatement pstmt1 = conn1.prepareStatement("UPDATE table1 SET column1 =? WHERE id =?");
                pstmt1.setString(1, "new value");
                pstmt1.setInt(2, 1);
                pstmt1.executeUpdate();

                PreparedStatement pstmt2 = conn2.prepareStatement("UPDATE table2 SET column2 =? WHERE id =?");
                pstmt2.setString(1, "new value");
                pstmt2.setInt(2, 1);
                pstmt2.executeUpdate();

                boolean preCommit1 = true;
                boolean preCommit2 = true;
                if (preCommit1 && preCommit2) {
                    // 提交阶段
                    conn1.commit();
                    conn2.commit();
                    System.out.println("事务提交成功");
                } else {
                    // 回滚
                    conn1.rollback();
                    conn2.rollback();
                    System.out.println("事务回滚");
                }
            } else {
                System.out.println("事务放弃");
            }
        } catch (SQLException e) {
            e.printStackTrace();
            try {
                if (conn1!= null) {
                    conn1.rollback();
                }
                if (conn2!= null) {
                    conn2.rollback();
                }
                System.out.println("事务回滚");
            } catch (SQLException ex) {
                ex.printStackTrace();
            }
        } finally {
            try {
                if (conn1!= null) {
                    conn1.close();
                }
                if (conn2!= null) {
                    conn2.close();
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}
  • 优缺点:3PC 相比 2PC 解决了单点故障导致的事务阻塞问题,在协调者故障后,参与者可以根据自身状态继续进行事务处理。它通过引入询问阶段,使得参与者有机会在早期发现无法进行事务操作的情况,减少不必要的资源锁定。然而,3PC 也增加了协议的复杂性,网络通信开销更大,而且依然存在性能问题,特别是在高并发场景下,由于多了一个阶段,整体事务的执行时间会更长。
  1. TCC(Try - Confirm - Cancel)
    • 原理:TCC 是一种补偿型的事务解决方案。它将事务分为三个阶段:Try 阶段、Confirm 阶段和 Cancel 阶段。
      • Try 阶段:主要是对业务系统资源进行初步的预留和检查,例如在资金转账场景中,Try 阶段会检查转出账户余额是否足够,并锁定相应的金额。
      • Confirm 阶段:在 Try 阶段成功后,执行真正的业务操作,如完成资金的转账。
      • Cancel 阶段:如果 Try 阶段成功,但 Confirm 阶段由于某些原因失败,那么执行 Cancel 阶段,对 Try 阶段预留的资源进行释放,如解锁转出账户锁定的金额。
    • 代码示例(以 Spring Boot 和 MySQL 为例实现简单的 TCC 转账)
    • 定义 Try 接口
public interface TransferTryService {
    boolean tryTransfer(String fromAccount, String toAccount, double amount);
}
  • Try 接口实现
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;

@Service
public class TransferTryServiceImpl implements TransferTryService {
    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Override
    public boolean tryTransfer(String fromAccount, String toAccount, double amount) {
        String checkSql = "SELECT balance FROM accounts WHERE account_number =?";
        double fromBalance = jdbcTemplate.queryForObject(checkSql, new Object[]{fromAccount}, Double.class);
        if (fromBalance < amount) {
            return false;
        }
        String lockSql = "UPDATE accounts SET balance = balance -?, locked_amount = locked_amount +? WHERE account_number =?";
        int updateRows = jdbcTemplate.update(lockSql, amount, amount, fromAccount);
        return updateRows > 0;
    }
}
  • 定义 Confirm 接口
public interface TransferConfirmService {
    boolean confirmTransfer(String fromAccount, String toAccount, double amount);
}
  • Confirm 接口实现
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;

@Service
public class TransferConfirmServiceImpl implements TransferConfirmService {
    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Override
    public boolean confirmTransfer(String fromAccount, String toAccount, double amount) {
        String updateFromSql = "UPDATE accounts SET balance = balance -?, locked_amount = locked_amount -? WHERE account_number =?";
        int fromUpdateRows = jdbcTemplate.update(updateFromSql, amount, amount, fromAccount);
        String updateToSql = "UPDATE accounts SET balance = balance +? WHERE account_number =?";
        int toUpdateRows = jdbcTemplate.update(updateToSql, amount, toAccount);
        return fromUpdateRows > 0 && toUpdateRows > 0;
    }
}
  • 定义 Cancel 接口
public interface TransferCancelService {
    boolean cancelTransfer(String fromAccount, double amount);
}
  • Cancel 接口实现
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;

@Service
public class TransferCancelServiceImpl implements TransferCancelService {
    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Override
    public boolean cancelTransfer(String fromAccount, double amount) {
        String unlockSql = "UPDATE accounts SET locked_amount = locked_amount -? WHERE account_number =?";
        int updateRows = jdbcTemplate.update(unlockSql, amount, fromAccount);
        return updateRows > 0;
    }
}
  • 调用 TCC 服务
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TransferController {
    @Autowired
    private TransferTryService tryService;
    @Autowired
    private TransferConfirmService confirmService;
    @Autowired
    private TransferCancelService cancelService;

    @PostMapping("/transfer")
    public String transfer(@RequestParam String fromAccount, @RequestParam String toAccount, @RequestParam double amount) {
        if (tryService.tryTransfer(fromAccount, toAccount, amount)) {
            if (confirmService.confirmTransfer(fromAccount, toAccount, amount)) {
                return "转账成功";
            } else {
                cancelService.cancelTransfer(fromAccount, amount);
                return "转账失败,已回滚";
            }
        } else {
            return "余额不足,转账失败";
        }
    }
}
  • 优缺点:TCC 的优点是灵活性高,适合高并发场景,因为它对资源的锁定时间较短,在 Try 阶段完成后,资源的预留状态可以在一定时间内等待 ConfirmCancel 操作。它不需要像 2PC 和 3PC 那样依赖全局的协调者,每个服务自己管理 TryConfirmCancel 操作。缺点是开发成本较高,每个业务操作都需要实现这三个阶段的逻辑,并且对业务侵入性较大,需要对业务逻辑进行改造以适应 TCC 模式。同时,如果 Cancel 操作执行失败,可能会导致数据不一致的问题。
  1. 消息队列(MQ)实现最终一致性
    • 原理:利用消息队列的异步特性来实现最终一致性。在分布式事务场景中,当一个事务操作发生时,系统将相关的事务消息发送到消息队列中。各个参与方从消息队列中消费消息,并根据消息内容执行相应的业务操作。由于消息队列具有可靠的消息存储和投递机制,即使某个参与方出现暂时的故障,消息也不会丢失,待其恢复后可以继续消费消息进行处理。
    • 代码示例(以 RabbitMQ 和 Spring Boot 为例)
    • 引入依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 配置 RabbitMQ
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
  • 发送消息
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class TransactionMessageSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendTransactionMessage(String message) {
        rabbitTemplate.convertAndSend("transaction-exchange", "transaction-routing-key", message);
    }
}
  • 接收消息
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TransactionMessageReceiver {
    @RabbitListener(queues = "transaction-queue")
    public void handleTransactionMessage(String message) {
        // 处理事务消息,例如更新数据库等业务操作
        System.out.println("接收到事务消息: " + message);
    }
}
  • 优缺点:使用消息队列实现最终一致性的优点是解耦性强,各个服务之间通过消息进行通信,不需要紧密的耦合关系。它可以提高系统的异步处理能力和吞吐量,适合高并发场景。缺点是消息的顺序性可能难以保证,如果业务对消息处理顺序有严格要求,需要额外的机制来保证。同时,消息的可靠性虽然有队列保证,但如果消息处理逻辑出现错误,可能会导致数据不一致,需要引入重试机制和幂等性处理。而且在消息堆积的情况下,可能会影响系统的性能和一致性的达成时间。
  1. Saga 模式
    • 原理:Saga 模式将一个大的分布式事务拆分成多个本地事务,每个本地事务都有对应的补偿操作。当其中某个本地事务失败时,Saga 会按照相反的顺序调用之前成功执行的本地事务的补偿操作,以保证数据的一致性。例如,在一个电商订单处理的分布式事务中,可能涉及创建订单、扣减库存、更新用户积分等多个本地事务。如果扣减库存失败,Saga 会调用创建订单的补偿操作(如删除订单)。
    • 代码示例(以 Java 和 Spring Boot 模拟 Saga 模式处理订单)
    • 定义订单服务
import org.springframework.stereotype.Service;

@Service
public class OrderService {
    public void createOrder(String orderId) {
        System.out.println("创建订单: " + orderId);
    }

    public void cancelOrder(String orderId) {
        System.out.println("取消订单: " + orderId);
    }
}
  • 定义库存服务
import org.springframework.stereotype.Service;

@Service
public class InventoryService {
    public boolean deductInventory(String productId, int quantity) {
        System.out.println("扣减库存: " + productId + " 数量: " + quantity);
        // 模拟库存扣减成功
        return true;
    }

    public void restoreInventory(String productId, int quantity) {
        System.out.println("恢复库存: " + productId + " 数量: " + quantity);
    }
}
  • 定义积分服务
import org.springframework.stereotype.Service;

@Service
public class PointService {
    public void updatePoints(String userId, int points) {
        System.out.println("更新用户积分: " + userId + " 积分: " + points);
    }

    public void rollbackPoints(String userId, int points) {
        System.out.println("回滚用户积分: " + userId + " 积分: " + points);
    }
}
  • 定义 Saga 协调器
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class OrderSagaCoordinator {
    @Autowired
    private OrderService orderService;
    @Autowired
    private InventoryService inventoryService;
    @Autowired
    private PointService pointService;

    public void processOrder(String orderId, String productId, int quantity, String userId, int points) {
        try {
            orderService.createOrder(orderId);
            if (!inventoryService.deductInventory(productId, quantity)) {
                orderService.cancelOrder(orderId);
                return;
            }
            pointService.updatePoints(userId, points);
        } catch (Exception e) {
            orderService.cancelOrder(orderId);
            inventoryService.restoreInventory(productId, quantity);
            pointService.rollbackPoints(userId, points);
        }
    }
}
  • 优缺点:Saga 模式的优点是对业务的侵入性相对较小,它将大事务拆分成多个小的本地事务,每个本地事务可以独立开发和维护。同时,它具有较好的容错性,当某个本地事务失败时,可以通过补偿操作回滚已执行的事务。缺点是缺乏全局的事务状态管理,如果 Saga 流程复杂,可能会出现补偿操作执行不完全的情况,导致数据不一致。而且 Saga 模式下各个本地事务之间的执行顺序和依赖关系需要仔细设计,否则可能会出现逻辑错误。

分布式事务一致性解决方案的选择

在选择分布式事务一致性解决方案时,需要综合考虑多方面因素。

  1. 一致性要求:如果业务对数据一致性要求极高,如金融交易场景,可能需要选择 2PC 或 3PC 这样能够保证强一致性的方案,但要注意其性能和单点故障问题。如果业务可以接受一定时间内的数据不一致,像电商的一些非核心业务,最终一致性的方案如消息队列或 Saga 模式可能更合适。

  2. 性能和并发需求:高并发场景下,2PC 和 3PC 的资源锁定和协调机制可能会严重影响性能,此时 TCC、消息队列或 Saga 模式可能更具优势。例如,在秒杀活动中,TCC 可以快速地进行资源预留和释放,消息队列可以异步处理大量订单消息,Saga 模式可以将复杂业务拆分成多个本地事务并行处理。

  3. 业务复杂度:如果业务逻辑简单且对一致性要求较高,2PC 或 3PC 可以作为考虑方案。但对于复杂的业务逻辑,如涉及多个服务和多种资源的电商订单处理,Saga 模式可以更好地将业务拆分成多个独立的本地事务,降低整体复杂度。而 TCC 模式虽然灵活性高,但对业务的侵入性较大,需要谨慎评估业务改造的成本。

  4. 系统架构:如果系统是基于微服务架构,且各个服务之间的耦合度较低,消息队列实现最终一致性或 Saga 模式可能更容易集成。如果系统对现有数据库事务机制依赖较大,基于 XA 协议的 2PC 或 3PC 可能更容易实现,但要注意其在分布式环境下的局限性。

总之,分布式事务一致性解决方案的选择没有绝对的标准,需要根据具体的业务场景、性能需求、系统架构等因素进行综合权衡和评估,以找到最适合的解决方案。