MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

分布式事务处理的常见模式与技术

2022-12-242.3k 阅读

分布式事务基础概念

在深入探讨分布式事务处理的常见模式与技术之前,我们先来明确一些基础概念。分布式系统由多个独立的节点组成,这些节点通过网络进行通信和协作,以完成共同的任务。在这样的系统中,事务跨越多个节点,涉及多个数据源的操作,这就引入了分布式事务的概念。

分布式事务需要满足 ACID 特性,即原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability)。原子性要求事务中的所有操作要么全部成功,要么全部失败;一致性确保事务执行前后系统处于一致的状态;隔离性保证并发执行的事务之间不会相互干扰;持久性意味着一旦事务提交,其结果将永久保存。

然而,在分布式环境中实现 ACID 特性面临诸多挑战。网络延迟、节点故障、数据分区等问题使得传统的集中式事务处理方式难以直接应用。例如,当一个事务涉及多个节点的数据更新时,由于网络延迟,可能导致部分节点的更新成功,而部分节点的更新失败,这就违反了原子性原则。

二阶段提交(2PC)

2PC 原理

二阶段提交是一种经典的分布式事务处理协议。它将事务的提交过程分为两个阶段:准备阶段(Prepare Phase)和提交阶段(Commit Phase)。

在准备阶段,协调者向所有参与者发送 Prepare 消息,询问它们是否可以提交事务。参与者接收到消息后,会执行事务的所有操作,但并不真正提交,而是记录日志并回复 YesNo。如果所有参与者都回复 Yes,表示它们都准备好提交事务,进入提交阶段;如果有任何一个参与者回复 No,则进入回滚阶段。

在提交阶段,如果是提交,协调者向所有参与者发送 Commit 消息,参与者接收到后正式提交事务;如果是回滚,协调者发送 Rollback 消息,参与者回滚事务。

2PC 代码示例(以 Java 为例)

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

// 模拟参与者
public class Participant {
    private String dbUrl;
    private String username;
    private String password;

    public Participant(String dbUrl, String username, String password) {
        this.dbUrl = dbUrl;
        this.username = username;
        this.password = password;
    }

    public boolean prepare() {
        try (Connection conn = DriverManager.getConnection(dbUrl, username, password)) {
            // 执行事务操作,这里简单模拟更新操作
            String sql = "UPDATE accounts SET balance = balance - 100 WHERE account_id = 1";
            try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
                pstmt.executeUpdate();
                // 记录日志
                System.out.println("Prepare success");
                return true;
            }
        } catch (SQLException e) {
            e.printStackTrace();
            return false;
        }
    }

    public void commit() {
        try (Connection conn = DriverManager.getConnection(dbUrl, username, password)) {
            // 正式提交事务,这里简单模拟提交操作
            conn.commit();
            System.out.println("Commit success");
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    public void rollback() {
        try (Connection conn = DriverManager.getConnection(dbUrl, username, password)) {
            // 回滚事务,这里简单模拟回滚操作
            conn.rollback();
            System.out.println("Rollback success");
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

// 模拟协调者
public class Coordinator {
    private Participant[] participants;

    public Coordinator(Participant[] participants) {
        this.participants = participants;
    }

    public void twoPhaseCommit() {
        boolean allPrepared = true;
        // 准备阶段
        for (Participant participant : participants) {
            if (!participant.prepare()) {
                allPrepared = false;
                break;
            }
        }
        // 提交或回滚阶段
        if (allPrepared) {
            for (Participant participant : participants) {
                participant.commit();
            }
        } else {
            for (Participant participant : participants) {
                participant.rollback();
            }
        }
    }
}

2PC 优缺点

2PC 的优点在于实现相对简单,能够保证强一致性,适用于对数据一致性要求较高的场景。然而,它也存在明显的缺点。首先,性能问题,由于需要等待所有参与者的响应,在节点和网络较多的情况下,性能会受到严重影响。其次,单点故障问题,协调者一旦出现故障,整个事务可能无法继续进行。此外,在提交阶段,如果部分参与者出现故障,可能导致数据不一致。

三阶段提交(3PC)

3PC 原理

三阶段提交是在 2PC 的基础上进行改进,将事务提交过程分为三个阶段:询问阶段(CanCommit Phase)、预提交阶段(PreCommit Phase)和提交阶段(DoCommit Phase)。

在询问阶段,协调者向所有参与者发送 CanCommit 消息,询问它们是否可以进行事务操作。参与者接收到消息后,根据自身状态回复 YesNo。如果所有参与者都回复 Yes,进入预提交阶段;否则,进入中断事务阶段。

在预提交阶段,协调者向所有参与者发送 PreCommit 消息,参与者接收到后执行事务操作,但不提交,记录日志并回复 Ack。如果所有参与者都回复 Ack,进入提交阶段;否则,进入中断事务阶段。

在提交阶段,如果是提交,协调者向所有参与者发送 DoCommit 消息,参与者接收到后正式提交事务;如果是中断事务,协调者发送 Abort 消息,参与者回滚事务。

3PC 代码示例(以 Python 为例)

import pymysql

# 模拟参与者
class Participant:
    def __init__(self, host, user, password, database):
        self.conn = pymysql.connect(
            host=host,
            user=user,
            password=password,
            database=database
        )
        self.cursor = self.conn.cursor()

    def can_commit(self):
        try:
            # 简单模拟检查操作
            print("Can commit check success")
            return True
        except Exception as e:
            print(f"Can commit check error: {e}")
            return False

    def pre_commit(self):
        try:
            # 执行事务操作,这里简单模拟更新操作
            sql = "UPDATE accounts SET balance = balance - 100 WHERE account_id = 1"
            self.cursor.execute(sql)
            # 记录日志
            print("Pre commit success")
            return True
        except Exception as e:
            print(f"Pre commit error: {e}")
            return False

    def do_commit(self):
        try:
            self.conn.commit()
            print("Do commit success")
        except Exception as e:
            print(f"Do commit error: {e}")

    def abort(self):
        try:
            self.conn.rollback()
            print("Abort success")
        except Exception as e:
            print(f"Abort error: {e}")

# 模拟协调者
class Coordinator:
    def __init__(self, participants):
        self.participants = participants

    def three_phase_commit(self):
        all_can_commit = True
        # 询问阶段
        for participant in self.participants:
            if not participant.can_commit():
                all_can_commit = False
                break
        if not all_can_commit:
            for participant in self.participants:
                participant.abort()
            return
        all_pre_committed = True
        # 预提交阶段
        for participant in self.participants:
            if not participant.pre_commit():
                all_pre_committed = False
                break
        if not all_pre_committed:
            for participant in self.participants:
                participant.abort()
            return
        # 提交阶段
        for participant in self.participants:
            participant.do_commit()

3PC 优缺点

3PC 相比 2PC 有一定的改进。它减少了单点故障的影响,在协调者故障后,参与者可以根据自身状态继续进行事务处理。同时,由于引入了询问阶段,部分解决了 2PC 中参与者在准备阶段资源锁定时间过长的问题。然而,3PC 也并非完美,它仍然存在性能开销较大的问题,并且由于增加了一个阶段,协议的复杂性也有所提高。

最终一致性模式

本地消息表

本地消息表是一种基于消息队列实现最终一致性的模式。在这种模式下,每个参与事务的服务在本地数据库中创建一个消息表。当事务执行时,首先将消息插入到本地消息表中,标记为待发送状态。然后,通过一个定时任务或消息发送服务,将消息发送到消息队列中。接收方从消息队列中获取消息并处理,处理成功后反馈给发送方,发送方更新本地消息表的状态为已处理。

代码示例(以 Spring Boot 和 RabbitMQ 为例)

  1. 定义本地消息表实体
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;

@Entity
public class LocalMessage {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private String messageContent;
    private String status; // 例如 "待发送", "已发送", "已处理"

    // 省略 getter 和 setter 方法
}
  1. 事务操作与消息插入
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class TransactionService {
    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Transactional
    public void performTransaction() {
        // 执行事务操作,例如更新数据库
        jdbcTemplate.update("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1");
        // 插入消息到本地消息表
        jdbcTemplate.update("INSERT INTO local_message (message_content, status) VALUES (?,?)", "事务相关消息", "待发送");
    }
}
  1. 消息发送与处理
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class MessageSender {
    @Autowired
    private JdbcTemplate jdbcTemplate;
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Scheduled(fixedRate = 5000) // 每 5 秒检查一次
    public void sendMessages() {
        jdbcTemplate.queryForList("SELECT id, message_content FROM local_message WHERE status = '待发送'").forEach(message -> {
            Long id = (Long) message.get("id");
            String content = (String) message.get("message_content");
            rabbitTemplate.convertAndSend("your-exchange", "your-routing-key", content);
            jdbcTemplate.update("UPDATE local_message SET status = '已发送' WHERE id =?", id);
        });
    }
}
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

@Component
public class MessageReceiver {
    @Autowired
    private JdbcTemplate jdbcTemplate;

    @RabbitListener(queues = "your-queue")
    public void handleMessage(String message) {
        // 处理消息,例如更新另一个数据库
        jdbcTemplate.update("UPDATE other_accounts SET balance = balance + 100 WHERE account_id = 1");
        // 反馈消息处理成功,这里假设发送方根据消息内容更新本地消息表
        jdbcTemplate.update("UPDATE local_message SET status = '已处理' WHERE message_content =?", message);
    }
}

可靠性消息最终一致性

可靠性消息最终一致性模式强调消息的可靠发送和接收。发送方在发送消息前,先将消息持久化到本地数据库,并记录发送状态。消息发送成功后更新状态为已发送。接收方在接收到消息后,先将消息持久化,处理成功后回复确认消息。发送方接收到确认消息后,删除本地持久化的消息。如果发送方在一定时间内未收到确认消息,会进行重试发送。

TCC(Try - Confirm - Cancel)

TCC 模式将事务分为三个阶段:Try 阶段尝试执行业务操作,预留必要的资源;Confirm 阶段确认执行业务操作,真正提交资源;Cancel 阶段取消执行业务操作,释放预留的资源。

以一个简单的转账场景为例,在 Try 阶段,从转出账户冻结金额,在转入账户预占金额;在 Confirm 阶段,从转出账户扣除冻结金额,在转入账户增加预占金额;在 Cancel 阶段,释放转出账户的冻结金额和转入账户的预占金额。

TCC 代码示例(以 Java 为例)

  1. 定义 TCC 接口
public interface TransferService {
    boolean tryTransfer(String fromAccount, String toAccount, double amount);
    boolean confirmTransfer(String fromAccount, String toAccount, double amount);
    boolean cancelTransfer(String fromAccount, String toAccount, double amount);
}
  1. 实现 TCC 接口
import org.springframework.stereotype.Service;

@Service
public class TransferServiceImpl implements TransferService {
    @Override
    public boolean tryTransfer(String fromAccount, String toAccount, double amount) {
        // 尝试冻结转出账户金额,预占转入账户金额
        // 这里简单模拟,实际需要操作数据库
        System.out.println("Try transfer success, freeze " + amount + " from " + fromAccount + ", pre - occupy " + amount + " to " + toAccount);
        return true;
    }

    @Override
    public boolean confirmTransfer(String fromAccount, String toAccount, double amount) {
        // 确认扣除转出账户冻结金额,增加转入账户预占金额
        // 这里简单模拟,实际需要操作数据库
        System.out.println("Confirm transfer success, deduct " + amount + " from " + fromAccount + ", add " + amount + " to " + toAccount);
        return true;
    }

    @Override
    public boolean cancelTransfer(String fromAccount, String toAccount, double amount) {
        // 取消释放转出账户冻结金额,释放转入账户预占金额
        // 这里简单模拟,实际需要操作数据库
        System.out.println("Cancel transfer success, release " + amount + " from " + fromAccount + ", release " + amount + " from " + toAccount);
        return true;
    }
}
  1. 调用 TCC 服务
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class TransactionExecutor {
    @Autowired
    private TransferService transferService;

    public boolean executeTransfer(String fromAccount, String toAccount, double amount) {
        if (transferService.tryTransfer(fromAccount, toAccount, amount)) {
            if (transferService.confirmTransfer(fromAccount, toAccount, amount)) {
                return true;
            } else {
                transferService.cancelTransfer(fromAccount, toAccount, amount);
                return false;
            }
        } else {
            return false;
        }
    }
}

TCC 优缺点

TCC 的优点是能够在一定程度上保证事务的最终一致性,并且对资源的锁定时间相对较短,性能较好。然而,它也存在一些缺点。首先,实现成本较高,需要业务代码配合实现 TryConfirmCancel 方法。其次,对网络问题较为敏感,如果在 ConfirmCancel 阶段出现网络故障,可能导致数据不一致。

分布式事务处理技术选型考量

在实际应用中,选择合适的分布式事务处理技术需要综合考虑多个因素。

  1. 一致性要求:如果对数据一致性要求极高,如金融领域的交易,可能需要选择 2PC 或 3PC 等能保证强一致性的技术。但如果业务对一致性的要求可以容忍一定的延迟,最终一致性模式可能是更好的选择。
  2. 性能需求:对于高并发、性能要求高的场景,2PC 和 3PC 可能因为其同步等待机制导致性能瓶颈,此时 TCC 或本地消息表等模式可能更合适。
  3. 业务复杂度:TCC 模式需要业务代码深度参与,对于业务逻辑复杂的场景,实现难度较大。而本地消息表等模式相对简单,更易于实现和维护。
  4. 系统架构:如果系统是基于消息队列构建的,使用本地消息表或可靠性消息最终一致性模式可能更容易集成;如果系统是基于微服务架构,且服务之间耦合度较低,TCC 模式可能更适合。

不同的分布式事务处理模式和技术各有优劣,开发人员需要根据具体的业务场景和需求,权衡利弊,选择最合适的技术来保证分布式系统中事务的正确处理。