MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Redis与MySQL数据同步的一致性校验方法

2024-03-307.0k 阅读

Redis 与 MySQL 数据同步基础

数据同步的背景

在现代应用开发中,MySQL 作为传统的关系型数据库,以其强大的数据持久化和复杂查询能力被广泛应用。而 Redis 作为高性能的非关系型数据库,在缓存、高并发读写场景下表现卓越。为了兼顾数据的持久化存储以及高效的读写性能,常常需要将 MySQL 中的数据同步到 Redis 中。例如,电商应用中商品信息存储在 MySQL 以保证数据的可靠性,同时同步到 Redis 用于快速展示商品列表,提升用户体验。

然而,在数据同步过程中,由于网络延迟、系统故障、并发操作等原因,很容易出现 Redis 与 MySQL 数据不一致的情况。比如,在商品库存更新时,MySQL 中的库存数量已经更新,但由于网络抖动,Redis 中的库存数据未能及时同步,导致前端展示的库存数量与实际库存不符,给业务带来严重影响。因此,实现 Redis 与 MySQL 数据同步的一致性校验至关重要。

常见的数据同步方式

  1. 应用层同步 应用程序在对 MySQL 数据进行增删改操作时,同时对 Redis 进行相应的操作。以 Python 为例,使用 pymysql 连接 MySQL,redis - py 连接 Redis,代码如下:
import pymysql
import redis

# 连接 MySQL
mysql_conn = pymysql.connect(host='localhost', user='root', password='password', database='test')
mysql_cursor = mysql_conn.cursor()

# 连接 Redis
redis_client = redis.StrictRedis(host='localhost', port=6379, db = 0)

# 假设要插入一条用户数据到 MySQL 并同步到 Redis
user_id = 1
username = 'test_user'
sql = "INSERT INTO users (id, username) VALUES (%s, %s)"
mysql_cursor.execute(sql, (user_id, username))
mysql_conn.commit()

# 同步到 Redis
redis_client.set(f'user:{user_id}', username)

这种方式实现简单,但在高并发场景下,应用程序负担较重,容易出现同步遗漏或不一致的问题。

  1. 基于 Binlog 的同步 MySQL 的 Binlog(二进制日志)记录了数据库的所有变更操作。通过解析 Binlog,可以捕获数据的变化并同步到 Redis。以 Canal 为例,它模拟 MySQL 从库,通过解析 Binlog 实现数据同步。首先,需要在 MySQL 配置文件中开启 Binlog 功能:
[mysqld]
log - bin = /var/lib/mysql/mysql - bin.log
server - id = 1

然后,下载并启动 Canal 服务。Canal 会将解析到的 Binlog 数据发送到客户端,客户端再将数据同步到 Redis。以下是一个简单的 Canal 客户端处理数据同步到 Redis 的示例代码(基于 Java):

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import redis.clients.jedis.Jedis;

import java.net.InetSocketAddress;

public class CanalRedisSync {
    public static void main(String[] args) {
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111), "example", "", "");
        Jedis jedis = new Jedis("localhost", 6379);
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            while (true) {
                Message message = connector.getWithoutAck(100);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    for (CanalEntry.Entry entry : message.getEntries()) {
                        if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                                if (rowChange.getEventType() == CanalEntry.EventType.INSERT) {
                                    // 处理插入操作同步到 Redis
                                    for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
                                        if ("id".equals(column.getName())) {
                                            String key = "user:" + column.getValue();
                                            String value = "";
                                            for (CanalEntry.Column col : rowData.getAfterColumnsList()) {
                                                if ("username".equals(col.getName())) {
                                                    value = col.getValue();
                                                    break;
                                                }
                                            }
                                            jedis.set(key, value);
                                        }
                                    }
                                } else if (rowChange.getEventType() == CanalEntry.EventType.UPDATE) {
                                    // 处理更新操作同步到 Redis
                                    for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
                                        if ("id".equals(column.getName())) {
                                            String key = "user:" + column.getValue();
                                            String value = "";
                                            for (CanalEntry.Column col : rowData.getAfterColumnsList()) {
                                                if ("username".equals(col.getName())) {
                                                    value = col.getValue();
                                                    break;
                                                }
                                            }
                                            jedis.set(key, value);
                                        }
                                    }
                                } else if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {
                                    // 处理删除操作同步到 Redis
                                    for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
                                        if ("id".equals(column.getName())) {
                                            String key = "user:" + column.getValue();
                                            jedis.del(key);
                                        }
                                    }
                                }
                            }
                        }
                    }
                    connector.ack(batchId);
                }
            }
        } finally {
            connector.disconnect();
            jedis.close();
        }
    }
}

这种方式对应用程序侵入性小,能够实时捕获数据变化,但配置和维护相对复杂。

数据一致性校验原理

数据一致性的概念

数据一致性指的是 Redis 中的数据与 MySQL 中的数据在内容上保持一致。在数据同步过程中,一致性可分为强一致性和最终一致性。强一致性要求在任何时刻,Redis 和 MySQL 中的数据都完全相同,对数据的任何操作都能立即反映在两个数据库中。而最终一致性则允许在一定时间内存在数据不一致的情况,但经过一段时间后,数据最终会达到一致状态。在实际应用中,由于网络、性能等因素的影响,通常采用最终一致性的方案。

校验的关键因素

  1. 数据版本标识 为了准确判断数据是否一致,需要为数据引入版本标识。在 MySQL 中,可以通过增加一个 version 字段来记录数据的版本号。每次数据更新时,version 字段自增。在 Redis 中,可以将版本号与数据一同存储。例如,存储用户信息时,在 Redis 中可以这样设置:
# Python 示例
user_id = 1
username = 'test_user'
version = 1
redis_client.hmset(f'user:{user_id}', {'username': username,'version': version})

当进行一致性校验时,对比 MySQL 和 Redis 中数据的版本号,如果版本号相同,则认为数据可能一致;如果版本号不同,则需要进一步检查数据内容。

  1. 数据校验范围 确定需要校验的数据范围至关重要。对于一些高频更新的数据,应更频繁地进行校验;而对于低频更新的数据,可以适当降低校验频率。例如,电商应用中商品的价格可能高频更新,库存相对低频更新。可以根据业务需求,将商品价格数据设定为每 5 分钟校验一次,库存数据每小时校验一次。

  2. 校验时间点 选择合适的校验时间点能有效减少对系统性能的影响。可以在系统负载较低的时间段进行全面校验,如凌晨 2 - 4 点。同时,在数据同步操作完成后,也应进行局部校验,确保本次同步的数据已经正确。

一致性校验方法实现

全量校验

  1. 原理 全量校验是指定期对 Redis 和 MySQL 中的所有数据进行逐一对比,以确保数据的一致性。这种方法可以全面检测出数据不一致的情况,但由于涉及的数据量较大,对系统性能影响也较大。

  2. 实现步骤

    • 连接数据库:使用相应的数据库连接库分别连接 MySQL 和 Redis。例如,在 Python 中:
import pymysql
import redis

mysql_conn = pymysql.connect(host='localhost', user='root', password='password', database='test')
mysql_cursor = mysql_conn.cursor()
redis_client = redis.StrictRedis(host='localhost', port=6379, db = 0)
- **获取 MySQL 数据**:从 MySQL 中查询所有需要校验的数据,例如查询所有用户数据:
sql = "SELECT id, username, version FROM users"
mysql_cursor.execute(sql)
mysql_data = mysql_cursor.fetchall()
- **获取 Redis 数据**:遍历 Redis 中对应的数据,在这个例子中是用户数据:
redis_data = {}
keys = redis_client.keys('user:*')
for key in keys:
    user_info = redis_client.hgetall(key)
    user_id = key.decode('utf - 8').split(':')[1]
    redis_data[user_id] = {
        'username': user_info[b'username'].decode('utf - 8'),
      'version': int(user_info[b'version'])
    }
- **对比数据**:逐一对比 MySQL 和 Redis 中的数据:
for row in mysql_data:
    user_id = str(row[0])
    mysql_username = row[1]
    mysql_version = row[2]
    if user_id in redis_data:
        redis_username = redis_data[user_id]['username']
        redis_version = redis_data[user_id]['version']
        if mysql_version != redis_version or mysql_username != redis_username:
            print(f"User {user_id} data is inconsistent. MySQL: {mysql_username}, {mysql_version}; Redis: {redis_username}, {redis_version}")
    else:
        print(f"User {user_id} exists in MySQL but not in Redis.")

增量校验

  1. 原理 增量校验只对发生变化的数据进行校验,通过记录数据的变更日志(如 MySQL 的 Binlog),获取新增、更新和删除的数据,然后只对这些数据在 Redis 和 MySQL 之间进行对比。这种方法效率较高,对系统性能影响较小。

  2. 实现步骤

    • 获取变更数据:以基于 Binlog 的同步为例,通过 Canal 解析 Binlog 获取变更数据。在前面的 Canal 客户端代码基础上,修改处理逻辑来记录变更数据:
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import redis.clients.jedis.Jedis;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;

public class CanalRedisSync {
    private static List<String> changedKeys = new ArrayList<>();

    public static void main(String[] args) {
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111), "example", "", "");
        Jedis jedis = new Jedis("localhost", 6379);
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            while (true) {
                Message message = connector.getWithoutAck(100);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    for (CanalEntry.Entry entry : message.getEntries()) {
                        if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                                if (rowChange.getEventType() == CanalEntry.EventType.INSERT ||
                                        rowChange.getEventType() == CanalEntry.EventType.UPDATE ||
                                        rowChange.getEventType() == CanalEntry.EventType.DELETE) {
                                    for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
                                        if ("id".equals(column.getName())) {
                                            String key = "user:" + column.getValue();
                                            changedKeys.add(key);
                                            break;
                                        }
                                    }
                                }
                            }
                        }
                    }
                    connector.ack(batchId);
                }
            }
        } finally {
            connector.disconnect();
            jedis.close();
        }
    }

    public static List<String> getChangedKeys() {
        return changedKeys;
    }
}
- **连接数据库**:与全量校验类似,连接 MySQL 和 Redis。
- **对比数据**:根据获取到的变更数据的键,分别从 MySQL 和 Redis 中获取数据并对比:
import pymysql
import redis

mysql_conn = pymysql.connect(host='localhost', user='root', password='password', database='test')
mysql_cursor = mysql_conn.cursor()
redis_client = redis.StrictRedis(host='localhost', port=6379, db = 0)

changed_keys = CanalRedisSync.getChangedKeys()
for key in changed_keys:
    user_id = key.split(':')[1]
    sql = "SELECT username, version FROM users WHERE id = %s"
    mysql_cursor.execute(sql, (user_id,))
    mysql_result = mysql_cursor.fetchone()
    if mysql_result:
        mysql_username = mysql_result[0]
        mysql_version = mysql_result[1]
        redis_info = redis_client.hgetall(key)
        if redis_info:
            redis_username = redis_info[b'username'].decode('utf - 8')
            redis_version = int(redis_info[b'version'])
            if mysql_version != redis_version or mysql_username != redis_username:
                print(f"User {user_id} data is inconsistent. MySQL: {mysql_username}, {mysql_version}; Redis: {redis_username}, {redis_version}")
        else:
            print(f"User {user_id} exists in MySQL but not in Redis.")
    else:
        print(f"User {user_id} exists in Redis but not in MySQL.")

基于哈希值的校验

  1. 原理 计算数据的哈希值,通过对比 Redis 和 MySQL 中相同数据的哈希值来判断数据是否一致。哈希值是对数据内容的一种摘要,只要数据内容发生变化,哈希值就会改变。常见的哈希算法有 MD5、SHA - 256 等。这种方法可以快速判断数据是否一致,减少数据对比的工作量。

  2. 实现步骤

    • 计算 MySQL 数据哈希值:在查询 MySQL 数据时,计算数据的哈希值。以 Python 为例,使用 hashlib 库计算 SHA - 256 哈希值:
import pymysql
import hashlib

mysql_conn = pymysql.connect(host='localhost', user='root', password='password', database='test')
mysql_cursor = mysql_conn.cursor()
sql = "SELECT id, username, version FROM users"
mysql_cursor.execute(sql)
mysql_data = mysql_cursor.fetchall()

mysql_hash_data = {}
for row in mysql_data:
    user_id = row[0]
    data_str = f"{row[1]}{row[2]}"
    hash_value = hashlib.sha256(data_str.encode()).hexdigest()
    mysql_hash_data[user_id] = hash_value
- **计算 Redis 数据哈希值**:从 Redis 中获取数据并计算哈希值:
import redis
import hashlib

redis_client = redis.StrictRedis(host='localhost', port=6379, db = 0)
redis_hash_data = {}
keys = redis_client.keys('user:*')
for key in keys:
    user_id = key.decode('utf - 8').split(':')[1]
    user_info = redis_client.hgetall(key)
    username = user_info[b'username'].decode('utf - 8')
    version = user_info[b'version'].decode('utf - 8')
    data_str = f"{username}{version}"
    hash_value = hashlib.sha256(data_str.encode()).hexdigest()
    redis_hash_data[user_id] = hash_value
- **对比哈希值**:对比 MySQL 和 Redis 中相同数据的哈希值:
for user_id, mysql_hash in mysql_hash_data.items():
    if user_id in redis_hash_data:
        redis_hash = redis_hash_data[user_id]
        if mysql_hash != redis_hash:
            print(f"User {user_id} data is inconsistent. MySQL hash: {mysql_hash}; Redis hash: {redis_hash}")
    else:
        print(f"User {user_id} exists in MySQL but not in Redis.")

优化与注意事项

性能优化

  1. 批量操作 在进行数据查询和对比时,尽量采用批量操作。例如,在查询 MySQL 数据时,可以使用 LIMIT 进行分页查询,每次查询一定数量的数据,减少单次查询的压力。在 Redis 操作中,也可以使用批量命令,如 mgetmset 等。
  2. 异步处理 将一致性校验任务放到异步线程或消息队列中处理,避免阻塞主线程,影响系统的正常业务操作。例如,使用 Python 的 threading 模块创建异步线程:
import threading

def check_consistency():
    # 一致性校验代码
    pass

thread = threading.Thread(target = check_consistency)
thread.start()

注意事项

  1. 数据类型兼容性 在对比数据时,要注意 MySQL 和 Redis 数据类型的兼容性。例如,MySQL 中的日期时间类型在 Redis 中存储时可能需要转换为字符串类型。在进行一致性校验时,需要进行相应的类型转换和对比。
  2. 网络问题 网络波动可能导致数据同步失败或校验数据获取失败。在实现中要加入网络重试机制,确保数据同步和校验的可靠性。例如,在连接 MySQL 和 Redis 时,设置重试次数和重试间隔:
import time

max_retries = 3
retry_interval = 5
for retry in range(max_retries):
    try:
        mysql_conn = pymysql.connect(host='localhost', user='root', password='password', database='test')
        break
    except pymysql.Error as e:
        if retry < max_retries - 1:
            time.sleep(retry_interval)
        else:
            raise e
  1. 事务处理 在数据同步和校验过程中,涉及到多个数据库操作,要注意事务的处理。确保在数据同步或校验过程中,如果出现错误,能够回滚到操作前的状态,避免数据不一致的情况进一步恶化。例如,在 MySQL 中使用事务:
try:
    mysql_conn.begin()
    # 数据操作代码
    mysql_conn.commit()
except pymysql.Error as e:
    mysql_conn.rollback()
    raise e