微服务架构中的分布式事务处理
2022-08-136.6k 阅读
微服务架构中的分布式事务处理
分布式事务概述
在单体应用架构中,事务处理相对简单。所有的数据操作都在同一个数据库实例中进行,数据库本身提供的事务机制(如 ACID 特性,即原子性 Atomicity、一致性 Consistency、隔离性 Isolation、持久性 Durability)能够很好地保证数据的完整性和一致性。例如,在一个简单的电商下单场景中,从创建订单记录到扣减库存,所有操作都在一个数据库事务中执行,如果其中任何一步失败,整个事务回滚,不会出现部分操作成功部分失败的情况。
然而,在微服务架构下,系统被拆分为多个独立的服务,每个服务可能有自己独立的数据库。以电商系统为例,订单服务可能使用 MySQL 管理订单数据,库存服务可能使用 Redis 管理库存数据。当一个业务操作涉及多个服务的数据库操作时,传统的本地事务就无法满足需求,这就引入了分布式事务的概念。分布式事务要保证跨多个服务、多个数据源的一系列操作要么全部成功,要么全部失败,如同在单体应用中的本地事务一样。
分布式事务面临的挑战
- 网络问题:微服务之间通过网络进行通信,网络本身具有不可靠性。可能会出现网络延迟、丢包、网络分区等情况。例如,在订单服务向库存服务发送扣减库存请求后,由于网络延迟,库存服务没有及时收到请求,订单服务也无法确认操作是否成功。这就导致在分布式事务处理中,很难准确判断操作的最终状态。
- 数据一致性:不同服务的数据库可能具有不同的数据存储格式和事务处理机制。在跨服务操作时,如何保证最终数据的一致性是一个难题。比如订单服务使用关系型数据库,而库存服务使用 NoSQL 数据库,它们对于数据一致性的保证方式不同,如何在两者之间协调以达到全局数据一致性是一个挑战。
- 性能与可用性:分布式事务处理往往需要更多的协调和通信,这可能会降低系统的性能。同时,为了保证事务的一致性,可能需要对某些资源进行锁定,这可能会影响其他操作的可用性。例如,在分布式事务中,为了保证订单创建和库存扣减的一致性,可能需要锁定库存资源,在事务处理期间其他库存查询或扣减操作可能会被阻塞。
分布式事务处理模型
- XA 模型
- 原理:XA 是 X/Open 组织定义的分布式事务处理(DTP)模型的一部分。XA 模型基于两阶段提交(Two - Phase Commit,2PC)协议。在第一阶段(准备阶段),事务协调者向所有参与者发送“准备”请求,参与者执行事务操作,但不提交。然后参与者向协调者反馈操作是否成功。在第二阶段(提交阶段),如果所有参与者在第一阶段都反馈成功,协调者向所有参与者发送“提交”请求,参与者正式提交事务;如果有任何一个参与者在第一阶段反馈失败,协调者向所有参与者发送“回滚”请求,参与者回滚事务。
- 代码示例(以 Java 中使用 JDBC 和 Atomikos 实现 XA 事务为例):
import javax.sql.DataSource;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import javax.transaction.UserTransaction;
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import java.sql.Connection;
import java.sql.PreparedStatement;
public class XATransactionExample {
public static void main(String[] args) {
// 配置第一个数据源
AtomikosDataSourceBean dataSource1 = new AtomikosDataSourceBean();
dataSource1.setUniqueResourceName("dataSource1");
dataSource1.setDriverClassName("com.mysql.cj.jdbc.Driver");
dataSource1.setUrl("jdbc:mysql://localhost:3306/db1");
dataSource1.setUser("root");
dataSource1.setPassword("password");
// 配置第二个数据源
AtomikosDataSourceBean dataSource2 = new AtomikosDataSourceBean();
dataSource2.setUniqueResourceName("dataSource2");
dataSource2.setDriverClassName("com.mysql.cj.jdbc.Driver");
dataSource2.setUrl("jdbc:mysql://localhost:3306/db2");
dataSource2.setUser("root");
dataSource2.setPassword("password");
UserTransactionManager userTransactionManager = new UserTransactionManager();
UserTransaction userTransaction = new UserTransactionImp();
try {
userTransactionManager.init();
userTransaction.begin();
Connection connection1 = dataSource1.getConnection();
PreparedStatement statement1 = connection1.prepareStatement("INSERT INTO table1 (column1) VALUES ('value1')");
statement1.executeUpdate();
Connection connection2 = dataSource2.getConnection();
PreparedStatement statement2 = connection2.prepareStatement("INSERT INTO table2 (column2) VALUES ('value2')");
statement2.executeUpdate();
userTransaction.commit();
} catch (Exception e) {
try {
userTransaction.rollback();
} catch (Exception ex) {
ex.printStackTrace();
}
e.printStackTrace();
} finally {
userTransactionManager.close();
}
}
}
- 优缺点:XA 模型能够严格保证事务的 ACID 特性,数据一致性强。但是它的性能开销较大,因为在两阶段提交过程中,参与者资源会被长时间锁定,等待协调者的最终指令,这可能会导致其他事务的阻塞,影响系统的并发性能。同时,XA 模型对资源管理器(如数据库)要求较高,需要其支持 XA 协议。
- TCC 模型(Try - Confirm - Cancel)
- 原理:TCC 模型将事务分为三个阶段。Try 阶段,主要是对业务资源进行检测和预留,例如在订单和库存场景中,Try 阶段库存服务可以检查库存是否足够,并预留相应库存。Confirm 阶段,在 Try 阶段所有操作都成功的情况下,正式执行提交操作,如订单创建成功且库存预留成功后,库存服务正式扣减库存。Cancel 阶段,如果 Try 阶段任何一个操作失败,就执行回滚操作,如释放 Try 阶段预留的库存。
- 代码示例(以 Java 为例,简单模拟 TCC 事务):
public class TCCExample {
// 库存服务
public static class StockService {
private int stock = 100;
public boolean tryReduceStock(int amount) {
if (stock >= amount) {
// 模拟预留库存,例如更新库存状态为预留中
stock -= amount;
return true;
}
return false;
}
public void confirmReduceStock(int amount) {
// 正式扣减库存
stock -= amount;
}
public void cancelReduceStock(int amount) {
// 释放预留库存
stock += amount;
}
}
// 订单服务
public static class OrderService {
private StockService stockService;
public OrderService(StockService stockService) {
this.stockService = stockService;
}
public boolean createOrder(int amount) {
// Try 阶段
if (!stockService.tryReduceStock(amount)) {
return false;
}
// 这里可以进行订单创建的 Try 操作,例如检查订单数据合法性等
// 假设订单创建 Try 操作成功
// Confirm 阶段
try {
stockService.confirmReduceStock(amount);
// 正式创建订单
System.out.println("订单创建成功");
return true;
} catch (Exception e) {
// Cancel 阶段
stockService.cancelReduceStock(amount);
System.out.println("订单创建失败,库存已回滚");
return false;
}
}
}
public static void main(String[] args) {
StockService stockService = new StockService();
OrderService orderService = new OrderService(stockService);
orderService.createOrder(10);
}
}
- 优缺点:TCC 模型相对 XA 模型性能更好,因为它不需要长时间锁定资源,在 Try 阶段完成后,资源可以被其他事务使用。它也具有较好的灵活性,适合一些对一致性要求不是特别严格,但对性能和响应速度要求较高的场景。然而,TCC 模型实现起来较为复杂,需要业务代码参与较多,每个服务都需要实现 Try、Confirm 和 Cancel 三个接口,并且要保证接口的幂等性,即多次调用对资源状态的影响是一致的,否则可能会出现数据不一致的问题。
- Saga 模型
- 原理:Saga 模型将一个长事务分解为多个本地事务,每个本地事务都有对应的补偿事务。当其中某个本地事务失败时,Saga 会按照相反的顺序调用之前已成功执行的本地事务的补偿事务,以达到回滚的目的。例如,在一个涉及订单创建、库存扣减和支付的业务流程中,如果支付失败,Saga 会先调用库存增加的补偿事务,再调用订单删除的补偿事务。
- 代码示例(以 Java 为例,简单模拟 Saga 事务):
import java.util.ArrayList;
import java.util.List;
public class SagaExample {
// 订单服务
public static class OrderService {
public void createOrder() {
System.out.println("订单创建成功");
}
public void cancelOrder() {
System.out.println("订单取消成功");
}
}
// 库存服务
public static class StockService {
public void reduceStock() {
System.out.println("库存扣减成功");
}
public void increaseStock() {
System.out.println("库存增加成功");
}
}
// 支付服务
public static class PaymentService {
public boolean makePayment() {
// 模拟支付失败
return false;
}
public void refundPayment() {
System.out.println("支付退款成功");
}
}
public static void main(String[] args) {
OrderService orderService = new OrderService();
StockService stockService = new StockService();
PaymentService paymentService = new PaymentService();
List<Runnable> sagaSteps = new ArrayList<>();
List<Runnable> compensationSteps = new ArrayList<>();
sagaSteps.add(() -> orderService.createOrder());
compensationSteps.add(0, () -> orderService.cancelOrder());
sagaSteps.add(() -> stockService.reduceStock());
compensationSteps.add(0, () -> stockService.increaseStock());
sagaSteps.add(() -> {
if (!paymentService.makePayment()) {
throw new RuntimeException("支付失败");
}
});
compensationSteps.add(0, () -> paymentService.refundPayment());
try {
for (Runnable step : sagaSteps) {
step.run();
}
} catch (Exception e) {
for (Runnable compensation : compensationSteps) {
compensation.run();
}
}
}
}
- 优缺点:Saga 模型具有较高的灵活性,适合长事务和业务流程复杂的场景,因为它不需要像 XA 模型那样长时间锁定资源,也不需要像 TCC 模型那样严格实现特定接口。它对服务的侵入性相对较小,每个服务只需要提供正常操作和补偿操作即可。但是,Saga 模型的事务一致性相对较弱,特别是在并发场景下,如果没有合理的控制,可能会出现数据不一致的情况。同时,由于是按照顺序执行本地事务和补偿事务,如果其中某个补偿事务执行失败,可能需要人工干预来处理。
- 本地消息表模型
- 原理:本地消息表模型基于可靠消息最终一致性的思想。在发起事务的服务中,将事务相关的消息记录在本地数据库的消息表中,同时进行本地业务操作。消息表通常包含消息内容、状态(如待发送、已发送、已处理等)等字段。然后通过定时任务或消息队列,将消息发送给其他服务。接收服务处理消息并完成本地业务操作后,反馈处理结果。如果消息发送失败或接收服务处理失败,可以通过重试机制保证最终一致性。例如,在订单创建后,订单服务将库存扣减消息记录在本地消息表,同时创建订单成功。然后定时任务将消息发送给库存服务,库存服务处理消息并扣减库存,反馈处理结果,订单服务根据反馈更新消息表状态。
- 代码示例(以 Java 和 MySQL 为例,简单实现本地消息表模型):
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Timestamp;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class LocalMessageTableExample {
private static final String JDBC_URL = "jdbc:mysql://localhost:3306/db";
private static final String JDBC_USER = "root";
private static final String JDBC_PASSWORD = "password";
public static void main(String[] args) {
// 模拟订单创建并插入消息表
insertMessageToTable("库存扣减消息", "待发送");
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleAtFixedRate(() -> {
try (Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD)) {
// 从消息表中获取待发送消息
String selectQuery = "SELECT id, message, status FROM message_table WHERE status = '待发送'";
try (PreparedStatement selectStatement = connection.prepareStatement(selectQuery);
ResultSet resultSet = selectStatement.executeQuery()) {
while (resultSet.next()) {
int id = resultSet.getInt("id");
String message = resultSet.getString("message");
// 模拟发送消息给库存服务并处理
boolean success = sendMessageToStockService(message);
if (success) {
// 更新消息表状态为已处理
String updateQuery = "UPDATE message_table SET status = '已处理', processed_time =? WHERE id =?";
try (PreparedStatement updateStatement = connection.prepareStatement(updateQuery)) {
updateStatement.setTimestamp(1, new Timestamp(System.currentTimeMillis()));
updateStatement.setInt(2, id);
updateStatement.executeUpdate();
}
} else {
// 可以根据情况设置重试次数等逻辑
System.out.println("消息发送失败,待重试");
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}, 0, 5, TimeUnit.SECONDS);
}
private static void insertMessageToTable(String message, String status) {
try (Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD)) {
String insertQuery = "INSERT INTO message_table (message, status, create_time) VALUES (?,?,?)";
try (PreparedStatement insertStatement = connection.prepareStatement(insertQuery)) {
insertStatement.setString(1, message);
insertStatement.setString(2, status);
insertStatement.setTimestamp(3, new Timestamp(System.currentTimeMillis()));
insertStatement.executeUpdate();
}
} catch (Exception e) {
e.printStackTrace();
}
}
private static boolean sendMessageToStockService(String message) {
// 模拟发送消息给库存服务并处理成功
System.out.println("消息已发送给库存服务并处理成功: " + message);
return true;
}
}
- 优缺点:本地消息表模型实现相对简单,对现有系统的侵入性较小,因为它主要依赖于本地数据库的事务机制。它通过重试机制能够较好地保证最终一致性。但是,这种模型依赖于定时任务或消息队列,可能会存在消息处理的延迟,不太适合对实时性要求极高的场景。同时,如果重试次数过多或重试逻辑不合理,可能会对系统性能产生一定影响。
- 可靠消息最终一致性模型(基于消息队列)
- 原理:与本地消息表模型类似,也是基于最终一致性的思想,但它借助专业的消息队列(如 Kafka、RabbitMQ 等)来传递事务相关消息。在发起事务的服务中,将业务操作和消息发送放在同一个本地事务中,确保消息发送的可靠性。消息队列保证消息的可靠投递,接收服务从消息队列获取消息并处理本地业务。例如,订单服务在创建订单成功后,在本地事务中向消息队列发送库存扣减消息。库存服务从消息队列消费消息并扣减库存。如果消息处理失败,消息队列可以提供重试机制,保证最终一致性。
- 代码示例(以 Java 和 RabbitMQ 为例,简单实现可靠消息最终一致性模型):
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP;
import java.io.IOException;
public class ReliableMessageExample {
private static final String QUEUE_NAME = "stock_reduction_queue";
private static final String HOST = "localhost";
private static final int PORT = 5672;
private static final String USERNAME = "guest";
private static final String PASSWORD = "guest";
public static void main(String[] args) {
// 订单服务发送消息
sendMessage("库存扣减消息");
// 库存服务消费消息
consumeMessage();
}
private static void sendMessage(String message) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF - 8"));
System.out.println("订单服务已发送消息: " + message);
} catch (Exception e) {
e.printStackTrace();
}
}
private static void consumeMessage() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicConsume(QUEUE_NAME, true, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF - 8");
System.out.println("库存服务接收到消息: " + message);
// 模拟库存扣减操作
System.out.println("库存服务已处理消息: " + message);
}
});
// 防止主线程退出
while (true) {
Thread.sleep(100);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 优缺点:这种模型利用消息队列的特性,具有较高的可靠性和可扩展性,适合高并发场景。消息队列能够异步处理消息,提高系统的整体性能。然而,它需要引入额外的消息队列组件,增加了系统的复杂度和维护成本。同时,消息的顺序性可能会成为问题,在一些对消息顺序敏感的业务场景中,需要额外的机制来保证消息顺序。
分布式事务处理策略选择
- 业务场景与一致性要求
- 如果业务场景对数据一致性要求极高,如金融交易场景,XA 模型可能是一个较好的选择,尽管它性能开销较大,但能严格保证 ACID 特性,确保资金数据的准确和一致。
- 对于一些对一致性要求相对宽松,但对性能和响应速度要求较高的场景,如电商的一些促销活动下单场景,TCC 模型或 Saga 模型可能更合适。TCC 模型性能较好且灵活性较高,Saga 模型适合复杂业务流程的长事务处理。
- 系统架构与技术栈
- 如果系统使用的数据库都支持 XA 协议,并且系统对性能要求不是极端苛刻,XA 模型可以方便地实现分布式事务。例如,在一些传统企业级应用中,数据库多为支持 XA 的商业数据库,此时 XA 模型是可行的。
- 对于微服务架构下使用多种不同类型数据库(如关系型数据库与 NoSQL 数据库混合使用)的场景,本地消息表模型或可靠消息最终一致性模型更为合适,因为它们不依赖于数据库对特定分布式事务协议的支持,通过消息机制来保证最终一致性。
- 开发与维护成本
- XA 模型虽然实现相对简单,只需要数据库支持 XA 协议,但性能优化和故障处理可能较为复杂,维护成本较高。
- TCC 模型实现复杂,需要业务代码深度参与,每个服务都要实现 Try、Confirm 和 Cancel 接口,并且要保证接口幂等性,开发和维护成本都较高。
- Saga 模型实现相对灵活,但由于其事务一致性较弱,在并发场景下可能需要更多的测试和调优,维护成本也不容小觑。
- 本地消息表模型和可靠消息最终一致性模型实现相对简单,对现有系统侵入性小,但可能需要额外开发定时任务或消息队列相关的代码,维护成本适中。
分布式事务处理中的常见问题及解决方法
- 幂等性问题
- 问题描述:在分布式事务处理中,由于网络问题、重试机制等原因,同一个操作可能会被多次执行。如果操作不具备幂等性,可能会导致数据不一致。例如,在库存扣减操作中,如果不保证幂等性,多次扣减可能会导致库存数量错误。
- 解决方法:可以通过数据库的唯一约束来保证幂等性。例如,在库存扣减操作中,可以在数据库表中添加一个唯一索引字段,每次扣减操作时携带一个唯一标识(如订单号),如果重复执行,数据库会因为唯一约束而拒绝操作。另外,也可以在服务端通过缓存来记录已执行的操作,当接收到相同操作时,直接返回之前的结果,而不重复执行实际业务逻辑。
- 网络分区问题
- 问题描述:网络分区是指由于网络故障等原因,将一个分布式系统分割成多个彼此无法通信的子系统。在分布式事务处理中,这可能导致部分参与者无法与协调者通信,从而无法确定事务的最终状态。
- 解决方法:一种常见的方法是使用超时机制。在一定时间内,如果协调者没有收到所有参与者的反馈,就根据当前情况进行处理。例如,在两阶段提交协议中,如果在第一阶段协调者没有收到某个参与者的准备成功反馈,超时后可以判定该参与者失败,向所有参与者发送回滚请求。另外,也可以采用一些分布式一致性算法(如 Paxos、Raft 等)来处理网络分区问题,通过选举出一个领导者来协调事务,保证在网络分区恢复后,系统能够达成一致状态。
- 并发控制问题
- 问题描述:在分布式事务处理中,多个事务可能同时操作相同的资源,这可能会导致数据竞争和不一致。例如,多个订单同时扣减相同的库存。
- 解决方法:可以使用分布式锁来进行并发控制。例如,在库存扣减操作前,先获取分布式锁(如使用 Redis 实现的分布式锁),只有获取到锁的事务才能进行库存扣减操作,其他事务需要等待锁的释放。另外,也可以采用乐观锁机制,在数据更新时,通过版本号等方式来检查数据是否在更新前被其他事务修改,如果被修改则重新执行事务,以保证数据的一致性。
总结
分布式事务处理是微服务架构中的一个关键难题,不同的处理模型和策略各有优劣。在实际应用中,需要根据业务场景的特点、系统架构和技术栈以及开发与维护成本等多方面因素综合考虑,选择合适的分布式事务处理方案。同时,要关注处理过程中可能出现的幂等性、网络分区和并发控制等问题,并采取相应的解决方法,以确保微服务架构下系统的数据一致性和可靠性。通过合理地选择和应用分布式事务处理技术,能够提升微服务架构系统的整体性能和稳定性,更好地满足业务需求。