Saga 模式在社交分布式系统中的应用
社交分布式系统面临的挑战
在当今社交网络飞速发展的时代,社交分布式系统承载着海量用户的交互数据与复杂业务逻辑。这些系统通常需要处理诸如用户注册、好友添加、动态发布、消息推送等众多操作,并且要保证数据的一致性与系统的高可用性。然而,传统单体架构在面对如此大规模与高并发场景时,显得力不从心。
分布式事务问题
以社交系统中发布动态为例,不仅要在“动态表”中插入新记录,还可能涉及更新用户活跃度、向粉丝推送通知等多个操作。这些操作可能分布在不同数据库甚至不同服务节点上,要保证要么所有操作成功,要么所有操作回滚,这就是典型的分布式事务难题。若采用传统的两阶段提交(2PC)协议,虽然能保证强一致性,但存在协调者单点故障、性能瓶颈等问题。在高并发环境下,2PC 的同步阻塞特性会严重影响系统的吞吐量。
系统扩展性
随着用户数量与业务功能的不断增长,社交系统需要具备良好的扩展性。但传统单体架构难以通过简单的水平扩展来应对负载增加。当某个功能模块出现性能瓶颈时,很难将其独立拆分并进行针对性优化。而分布式系统虽可以通过增加节点实现水平扩展,但各节点间的交互与数据一致性维护变得更加复杂。
Saga 模式概述
Saga 模式是应对分布式系统中分布式事务问题的有效解决方案。它最早由 Hector Garcia - Molina 和 Kenneth Salem 在 1987 年发表的论文“ Sagas ”中提出。
Saga 模式定义
Saga 是由一系列本地事务组成的序列,这些本地事务被称为 Saga 的步骤。每个 Saga 步骤都有对应的补偿步骤,用于在 Saga 执行过程中出现故障时进行回滚操作。Saga 模式通过按顺序执行这些本地事务步骤来完成一个复杂的业务操作,若其中某一步骤失败,则从失败步骤开始反向执行相应的补偿步骤,以确保数据的一致性。
Saga 模式的执行流程
- 正向执行阶段:Saga 从第一个步骤开始依次执行各个本地事务步骤。例如,在社交系统的用户注册流程中,第一步可能是在“用户表”中插入用户基本信息,第二步是为用户创建默认的个性化设置等。
- 反向补偿阶段:若在正向执行过程中某一步骤失败,Saga 会从失败步骤开始,反向执行相应的补偿步骤。比如,若创建默认个性化设置失败,就需要回滚之前插入用户基本信息的操作(通过在“用户表”中删除该用户记录来实现补偿)。
Saga 模式在社交分布式系统中的应用场景
用户注册流程
- 正向步骤
- 插入用户基本信息:在“用户表”中插入用户名、密码、邮箱等基本信息。假设使用 SQL 数据库,代码示例如下(以 Python 和 SQLAlchemy 为例):
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
engine = create_engine('sqlite:///social.db')
Session = sessionmaker(bind = engine)
session = Session()
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key = True)
username = Column(String)
password = Column(String)
email = Column(String)
def insert_user(username, password, email):
new_user = User(username = username, password = password, email = email)
session.add(new_user)
session.commit()
- **创建默认个性化设置**:为新注册用户创建默认的主题、通知设置等。假设存储在 Redis 中,代码示例如下(以 Python 和 redis - py 库为例):
import redis
r = redis.Redis(host = 'localhost', port = 6379, db = 0)
def create_default_settings(user_id):
default_settings = {
'theme': 'default',
'notifications': {
'new_friend': True,
'new_message': True
}
}
r.hmset(f'user:{user_id}:settings', default_settings)
- 补偿步骤
- 删除用户基本信息:若创建默认个性化设置失败,需要删除已插入的用户基本信息。代码如下:
def delete_user(user_id):
user = session.query(User).filter(User.id == user_id).first()
if user:
session.delete(user)
session.commit()
- **删除默认个性化设置**:若之前已成功创建默认个性化设置,在回滚时需删除。代码如下:
def delete_default_settings(user_id):
r.delete(f'user:{user_id}:settings')
好友添加流程
- 正向步骤
- 在好友关系表中添加记录:假设使用关系型数据库,代码如下(以 Java 和 JDBC 为例):
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class FriendshipDao {
private static final String INSERT_FRIENDSHIP = "INSERT INTO friendships (user_id, friend_id) VALUES (?,?)";
public void addFriendship(int userId, int friendId) {
try (Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/social", "root", "password");
PreparedStatement statement = connection.prepareStatement(INSERT_FRIENDSHIP)) {
statement.setInt(1, userId);
statement.setInt(2, friendId);
statement.executeUpdate();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
- **向双方推送好友添加通知**:假设使用消息队列(如 RabbitMQ)来实现通知推送,以 Python 和 pika 库为例:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue = 'friendship_notification')
def send_friendship_notification(user_id, friend_id):
message = f'User {user_id} has added {friend_id} as a friend'
channel.basic_publish(exchange = '', routing_key = 'friendship_notification', body = message)
print(f" [x] Sent '{message}'")
connection.close()
- 补偿步骤
- 删除好友关系记录:若推送通知失败,需要删除已添加的好友关系记录。代码如下(以 Java 和 JDBC 为例):
private static final String DELETE_FRIENDSHIP = "DELETE FROM friendships WHERE user_id =? AND friend_id =?";
public void deleteFriendship(int userId, int friendId) {
try (Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/social", "root", "password");
PreparedStatement statement = connection.prepareStatement(DELETE_FRIENDSHIP)) {
statement.setInt(1, userId);
statement.setInt(2, friendId);
statement.executeUpdate();
} catch (SQLException e) {
e.printStackTrace();
}
}
- **撤回通知**:实际操作中,若通知已发送成功,撤回可能比较困难。但可以记录通知状态,若发现异常,后续采取一些补救措施,如发送撤回通知等。
动态发布流程
- 正向步骤
- 插入动态记录:在“动态表”中插入用户发布的动态内容、发布时间等信息。以 Node.js 和 MySQL 为例,代码如下:
const mysql = require('mysql2');
const connection = mysql.createConnection({
host: 'localhost',
user: 'root',
password: 'password',
database:'social'
});
connection.connect();
function insert_post(user_id, content) {
const sql = 'INSERT INTO posts (user_id, content, publish_time) VALUES (?,?, NOW())';
connection.query(sql, [user_id, content], (err, results, fields) => {
if (err) throw err;
console.log('Post inserted successfully');
});
}
- **更新用户活跃度**:假设活跃度存储在 Redis 中,每次发布动态增加活跃度值。代码如下(以 Node.js 和 ioredis 库为例):
const Redis = require('ioredis');
const redis = new Redis();
async function update_user_activity(user_id) {
await redis.incr(`user:${user_id}:activity`);
}
- **向粉丝推送动态通知**:同样使用消息队列(如 Kafka)来推送通知。以 Java 和 Kafka 客户端为例:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class PostNotificationProducer {
private static final String TOPIC = "post_notification";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties)) {
String message = "User has posted a new dynamic";
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, message);
producer.send(record);
}
}
}
- 补偿步骤
- 删除动态记录:若更新用户活跃度或推送通知失败,删除已插入的动态记录。代码如下(以 Node.js 和 MySQL 为例):
function delete_post(post_id) {
const sql = 'DELETE FROM posts WHERE id =?';
connection.query(sql, [post_id], (err, results, fields) => {
if (err) throw err;
console.log('Post deleted successfully');
});
}
- **降低用户活跃度**:若之前已成功更新用户活跃度,回滚时降低活跃度值。代码如下(以 Node.js 和 ioredis 库为例):
async function reduce_user_activity(user_id) {
await redis.decr(`user:${user_id}:activity`);
}
- **撤回粉丝动态通知**:类似于好友添加通知的撤回,实际操作较复杂。可在通知中设置可撤回标识,若需要撤回,重新发送撤回通知给粉丝。
Saga 模式在社交分布式系统中的优势
提高系统可用性
Saga 模式采用的是本地事务序列执行方式,每个本地事务相对独立。即使某个 Saga 步骤所在节点出现故障,其他步骤仍可继续执行(在一定时间窗口内)。与 2PC 等强一致性协议相比,Saga 模式不会因单点故障导致整个分布式事务长时间阻塞。例如,在用户注册流程中,若创建默认个性化设置的服务暂时不可用,可先回滚插入用户基本信息的操作,而不会影响其他用户的注册流程。
增强系统扩展性
由于 Saga 模式将复杂业务操作拆分为多个本地事务步骤,每个步骤可以独立部署在不同的服务节点上。这使得社交系统在扩展时更加灵活,可根据不同业务模块的负载情况进行针对性的水平扩展。比如,当好友添加功能请求量剧增时,可以增加处理好友关系表操作的服务节点,而不影响其他如动态发布等功能模块。
简化分布式事务处理
Saga 模式通过定义正向步骤和补偿步骤,将分布式事务处理简化为一系列本地事务的组合。开发人员只需关注每个本地事务的实现以及对应的补偿逻辑,无需处理像 2PC 那样复杂的全局协调与同步机制。这降低了开发难度,提高了开发效率。
Saga 模式在社交分布式系统中的实现方式
编排式 Saga
- 实现原理 编排式 Saga 通过一个中央协调器来控制 Saga 的执行流程。协调器负责按顺序调用各个 Saga 步骤,并在出现故障时调用相应的补偿步骤。在社交分布式系统中,例如用户注册流程,协调器会先调用插入用户基本信息的服务,成功后再调用创建默认个性化设置的服务。若创建默认个性化设置失败,协调器会调用删除用户基本信息的补偿服务。
- 代码示例(以 Python 和 Flask 框架实现简单协调器为例)
from flask import Flask, request
import requests
app = Flask(__name__)
@app.route('/register_user', methods = ['POST'])
def register_user():
data = request.get_json()
username = data.get('username')
password = data.get('password')
email = data.get('email')
# 调用插入用户基本信息服务
user_insert_response = requests.post('http://user - service/insert_user', json = {
'username': username,
'password': password,
'email': email
})
if user_insert_response.status_code!= 200:
return 'User insertion failed', 500
user_id = user_insert_response.json().get('user_id')
# 调用创建默认个性化设置服务
settings_response = requests.post('http://settings - service/create_default_settings', json = {
'user_id': user_id
})
if settings_response.status_code!= 200:
# 调用删除用户基本信息补偿服务
requests.post('http://user - service/delete_user', json = {
'user_id': user_id
})
return 'Settings creation failed and user rolled back', 500
return 'User registered successfully', 200
if __name__ == '__main__':
app.run(debug = True, port = 5000)
- 优缺点
- 优点:流程清晰,易于理解和维护。所有 Saga 步骤的执行逻辑集中在协调器,开发人员可以在一个地方清晰地看到整个业务流程。
- 缺点:协调器成为单点故障点。若协调器出现故障,整个 Saga 执行流程可能中断。并且随着系统规模扩大,协调器的负载会逐渐增加,可能成为性能瓶颈。
协同式 Saga
- 实现原理 协同式 Saga 没有中央协调器,每个 Saga 步骤在完成自身任务后,负责调用下一个 Saga 步骤。每个步骤都知道后续步骤和补偿步骤。例如在好友添加流程中,插入好友关系记录的服务在成功执行后,直接调用推送好友添加通知的服务。若推送通知失败,推送通知服务会调用删除好友关系记录的补偿服务。
- 代码示例(以 Java 和 Spring Boot 实现简单协同式 Saga 为例)
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;
@RestController
public class FriendshipController {
@Autowired
private RestTemplate restTemplate;
@PostMapping("/add_friendship")
public String addFriendship(@RequestBody FriendshipRequest request) {
int userId = request.getUserId();
int friendId = request.getFriendId();
// 调用插入好友关系记录服务
String friendshipUrl = "http://friendship - service/add_friendship";
Friendship friendship = new Friendship(userId, friendId);
restTemplate.postForObject(friendshipUrl, friendship, Void.class);
// 调用推送好友添加通知服务
String notificationUrl = "http://notification - service/send_friendship_notification";
Notification notification = new Notification(userId, friendId);
try {
restTemplate.postForObject(notificationUrl, notification, Void.class);
} catch (Exception e) {
// 调用删除好友关系记录补偿服务
String deleteUrl = "http://friendship - service/delete_friendship";
restTemplate.postForObject(deleteUrl, friendship, Void.class);
return "Notification failed and friendship rolled back";
}
return "Friendship added successfully";
}
}
class FriendshipRequest {
private int userId;
private int friendId;
// getters and setters
}
class Friendship {
private int userId;
private int friendId;
public Friendship(int userId, int friendId) {
this.userId = userId;
this.friendId = friendId;
}
// getters and setters
}
class Notification {
private int userId;
private int friendId;
public Notification(int userId, int friendId) {
this.userId = userId;
this.friendId = friendId;
}
// getters and setters
}
- 优缺点
- 优点:不存在单点故障问题,每个服务相对独立,系统的可靠性更高。并且各服务之间的耦合度相对较低,更易于扩展。
- 缺点:流程逻辑分散在各个服务中,维护和理解整个 Saga 流程相对困难。当 Saga 步骤发生变化时,可能需要修改多个服务的代码。
Saga 模式在社交分布式系统中的挑战与应对策略
补偿操作的幂等性
- 问题描述 在 Saga 模式中,由于网络故障、系统重试等原因,补偿操作可能会被多次调用。若补偿操作不具备幂等性,可能会导致数据不一致等问题。例如,在动态发布流程中,若删除动态记录的补偿操作被多次调用,可能会误删其他正常的动态记录。
- 应对策略
- 使用唯一标识:在每个 Saga 步骤执行时,生成一个唯一标识(如 UUID),并将其作为参数传递给补偿操作。补偿操作在执行前先检查该唯一标识是否已处理过,若已处理过则直接返回成功,不再重复执行实际操作。
- 数据库操作优化:对于数据库的删除、更新等操作,采用条件判断方式。例如,在删除动态记录时,不仅根据动态 ID,还可以增加一些额外条件,如发布时间等,确保每次删除的是正确的记录,并且多次执行不会产生额外影响。
长时间运行的 Saga
- 问题描述 有些社交业务操作对应的 Saga 可能会长时间运行,例如一个复杂的用户资料迁移操作,可能涉及多个系统间的数据同步,耗时较长。在长时间运行过程中,可能会遇到数据变化、系统升级等情况,导致 Saga 执行出现异常。
- 应对策略
- 定期检查与重试:设置定时任务,定期检查长时间运行 Saga 的执行状态。若发现某个步骤执行失败或处于异常状态,根据情况进行重试。例如,对于数据同步失败的步骤,可以重新发起同步请求。
- 版本控制:对参与 Saga 的数据和服务进行版本控制。当系统升级或数据结构发生变化时,根据版本信息调整 Saga 的执行逻辑,确保 Saga 能够在不同版本环境下正确执行。
并发控制
- 问题描述 在社交分布式系统高并发环境下,多个 Saga 可能同时对相同的数据进行操作,例如多个用户同时添加同一个好友,可能会导致好友关系表数据不一致。
- 应对策略
- 锁机制:在对共享数据进行操作前,获取相应的锁。例如,在好友添加流程中,对好友关系表的插入操作可以先获取表级锁或行级锁,确保同一时间只有一个 Saga 步骤能对该表进行写操作。
- 乐观并发控制:采用乐观锁机制,在数据更新时检查数据版本。例如,在更新用户活跃度时,先读取当前活跃度值及版本号,更新时带上版本号,若版本号一致则更新成功,否则重新读取数据并尝试更新。