MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Redis与MySQL结合应对复杂业务查询需求

2022-09-164.1k 阅读

Redis 与 MySQL 基础概述

Redis 基础特性

Redis 是一个开源的、基于键值对(Key-Value)存储的内存数据库。它支持多种数据结构,如字符串(String)、哈希(Hash)、列表(List)、集合(Set)和有序集合(Sorted Set)。Redis 具有极高的读写性能,因为其数据存储在内存中,使得它非常适合用于缓存、计数器、消息队列等场景。

例如,使用 Redis 进行简单的字符串存储与读取:

import redis

# 连接 Redis 服务器
r = redis.Redis(host='localhost', port=6379, db = 0)

# 设置键值对
r.set('name', 'John')

# 获取键对应的值
name = r.get('name')
print(name.decode('utf-8')) 

Redis 的数据结构灵活性使其在不同业务场景中有广泛应用。以哈希结构为例,假设我们要存储用户信息:

user_id = '123'
user_info = {
    'name': 'Alice',
    'age': 25,
    'email': 'alice@example.com'
}

# 使用哈希结构存储用户信息
r.hmset(f'user:{user_id}', user_info)

# 获取整个哈希结构
user = r.hgetall(f'user:{user_id}')
for key, value in user.items():
    print(f'{key.decode("utf-8")}: {value.decode("utf-8")}')

MySQL 基础特性

MySQL 是最流行的开源关系型数据库管理系统之一。它基于 SQL(Structured Query Language)语言,以表格形式存储数据,表之间可以通过外键建立关联关系。MySQL 适合存储大量结构化数据,并提供了强大的事务处理能力,确保数据的一致性和完整性。

例如,创建一个简单的用户表:

CREATE TABLE users (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    age INT,
    email VARCHAR(255) UNIQUE
);

插入数据:

INSERT INTO users (name, age, email) VALUES ('Bob', 30, 'bob@example.com');

查询数据:

SELECT * FROM users WHERE age > 25;

MySQL 的优势在于数据的持久化存储和复杂查询处理。它可以处理多表关联、聚合查询等复杂操作,如统计不同年龄段的用户数量:

SELECT age, COUNT(*) AS user_count
FROM users
GROUP BY age;

复杂业务查询需求场景分析

电商场景中的复杂查询

在电商系统中,有许多复杂的业务查询需求。比如,查询某个时间段内销量最高的前 N 种商品,并展示商品的详细信息。这里涉及到至少两张表,订单表(orders)记录了每笔订单的商品信息、下单时间等,商品表(products)存储了商品的详细描述、价格等。

订单表结构示例:

CREATE TABLE orders (
    id INT AUTO_INCREMENT PRIMARY KEY,
    product_id INT,
    order_time TIMESTAMP,
    quantity INT,
    FOREIGN KEY (product_id) REFERENCES products(id)
);

商品表结构示例:

CREATE TABLE products (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(255),
    price DECIMAL(10, 2),
    description TEXT
);

查询某个时间段内销量最高的前 10 种商品:

SELECT p.id, p.name, p.price, p.description, SUM(o.quantity) AS total_sales
FROM products p
JOIN orders o ON p.id = o.product_id
WHERE o.order_time BETWEEN '2023-01-01 00:00:00' AND '2023-12-31 23:59:59'
GROUP BY p.id, p.name, p.price, p.description
ORDER BY total_sales DESC
LIMIT 10;

这种查询在数据量较大时,执行效率可能较低,因为涉及到多表关联和聚合操作。

社交网络场景中的复杂查询

在社交网络平台,用户之间存在复杂的关系。例如,查询某个用户的二度好友列表(即好友的好友),并按共同好友数量排序。假设存在用户表(users)和好友关系表(friendships)。

用户表结构:

CREATE TABLE users (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(255)
);

好友关系表结构:

CREATE TABLE friendships (
    id INT AUTO_INCREMENT PRIMARY KEY,
    user_id INT,
    friend_id INT,
    FOREIGN KEY (user_id) REFERENCES users(id),
    FOREIGN KEY (friend_id) REFERENCES users(id)
);

查询用户 1 的二度好友列表并按共同好友数量排序:

WITH friend_of_friends AS (
    SELECT f1.friend_id AS second_degree_friend
    FROM friendships f1
    JOIN friendships f2 ON f1.friend_id = f2.user_id
    WHERE f1.user_id = 1 AND f2.friend_id != 1
),
common_friend_count AS (
    SELECT sdf.second_degree_friend, COUNT(*) AS common_friends
    FROM friend_of_friends sdf
    JOIN friendships f1 ON sdf.second_degree_friend = f1.user_id
    JOIN friendships f2 ON f1.friend_id = f2.user_id AND f2.user_id = 1
    GROUP BY sdf.second_degree_friend
)
SELECT cfc.second_degree_friend, cfc.common_friends
FROM common_friend_count cfc
ORDER BY cfc.common_friends DESC;

此类查询涉及递归和复杂的关联操作,在关系型数据库中处理起来可能会面临性能挑战。

Redis 与 MySQL 结合策略

缓存查询结果

将经常查询且不频繁变化的数据缓存到 Redis 中,可以大大提高查询效率。以电商场景中查询商品详情为例,假设商品信息更新频率较低。

首先,尝试从 Redis 中获取商品信息:

product_id = '1'
product_key = f'product:{product_id}'

product_info = r.get(product_key)
if product_info:
    print('从 Redis 缓存获取商品信息:', product_info.decode('utf-8'))
else:
    # 如果 Redis 中没有,从 MySQL 查询
    connection = mysql.connector.connect(
        host='localhost',
        user='root',
        password='password',
        database='ecommerce'
    )
    cursor = connection.cursor()
    query = "SELECT * FROM products WHERE id = %s"
    cursor.execute(query, (product_id,))
    product = cursor.fetchone()
    if product:
        # 将查询结果存入 Redis 缓存
        r.set(product_key, str(product))
        print('从 MySQL 获取商品信息并缓存到 Redis:', product)
    cursor.close()
    connection.close()

这样,下次查询相同商品时,如果 Redis 中有缓存,则直接从缓存获取,避免了对 MySQL 的查询,提高了响应速度。

利用 Redis 预处理数据

在一些复杂查询中,可以利用 Redis 的数据结构对数据进行预处理,减轻 MySQL 的查询压力。例如,在社交网络场景中查询二度好友时,可以利用 Redis 的集合(Set)结构来存储用户的好友关系。

当添加好友关系时,同时更新 Redis 中的集合:

user_id = '1'
friend_id = '2'

# 在 MySQL 中添加好友关系
connection = mysql.connector.connect(
    host='localhost',
    user='root',
    password='password',
    database='social_network'
)
cursor = connection.cursor()
query = "INSERT INTO friendships (user_id, friend_id) VALUES (%s, %s)"
cursor.execute(query, (user_id, friend_id))
connection.commit()
cursor.close()
connection.close()

# 在 Redis 中更新好友关系集合
r.sadd(f'friends:{user_id}', friend_id)
r.sadd(f'friends:{friend_id}', user_id)

查询二度好友时,可以先在 Redis 中进行部分计算:

user_id = '1'

# 获取用户的直接好友集合
direct_friends = r.smembers(f'friends:{user_id}')
second_degree_friends = set()
for friend in direct_friends:
    friend_friends = r.smembers(f'friends:{friend.decode("utf-8")}')
    for friend_friend in friend_friends:
        if friend_friend.decode('utf-8') != user_id and friend_friend not in direct_friends:
            second_degree_friends.add(friend_friend.decode('utf-8'))

print('通过 Redis 预处理得到的二度好友:', second_degree_friends)

通过这种方式,减少了对 MySQL 的复杂关联查询,提高了查询性能。

分布式缓存与负载均衡

在高并发场景下,可以采用分布式 Redis 缓存,并结合负载均衡策略。例如,使用 Redis Cluster 搭建分布式缓存环境。假设有三个 Redis 节点,分别在不同的服务器上。

在应用程序中配置连接到 Redis Cluster:

from rediscluster import RedisCluster

startup_nodes = [
    {"host": "192.168.1.100", "port": "7000"},
    {"host": "192.168.1.101", "port": "7001"},
    {"host": "192.168.1.102", "port": "7002"}
]

rc = RedisCluster(startup_nodes = startup_nodes, decode_responses = True)

# 设置键值对
rc.set('key', 'value')

# 获取键对应的值
value = rc.get('key')
print(value)

通过分布式缓存,数据分散存储在多个节点上,提高了缓存的容量和并发处理能力。同时,结合负载均衡器(如 Nginx),可以将请求均匀分配到不同的 Redis 节点和 MySQL 服务器上,避免单个节点压力过大。

性能优化与注意事项

缓存更新策略

在使用 Redis 缓存时,需要注意缓存更新策略。常见的策略有:

  1. 写后失效:在 MySQL 数据更新后,立即删除 Redis 中对应的缓存数据。例如,当商品信息更新时:
product_id = '1'
product_key = f'product:{product_id}'

# 更新 MySQL 中的商品信息
connection = mysql.connector.connect(
    host='localhost',
    user='root',
    password='password',
    database='ecommerce'
)
cursor = connection.cursor()
update_query = "UPDATE products SET price = %s WHERE id = %s"
cursor.execute(update_query, (new_price, product_id))
connection.commit()
cursor.close()
connection.close()

# 删除 Redis 缓存
r.delete(product_key)
  1. 写前失效:在更新 MySQL 数据前,先删除 Redis 缓存。这种策略可以避免数据不一致的短暂窗口,但可能会导致在更新过程中查询时缓存缺失。
  2. 读写锁:使用读写锁机制,在更新数据时加写锁,防止其他读操作读取到旧数据,更新完成后释放锁。在 Python 中可以使用 threading 模块的 RLock 实现简单的读写锁:
import threading

lock = threading.RLock()

def update_product(product_id, new_price):
    with lock:
        # 删除 Redis 缓存
        product_key = f'product:{product_id}'
        r.delete(product_key)

        # 更新 MySQL 中的商品信息
        connection = mysql.connector.connect(
            host='localhost',
            user='root',
            password='password',
            database='ecommerce'
        )
        cursor = connection.cursor()
        update_query = "UPDATE products SET price = %s WHERE id = %s"
        cursor.execute(update_query, (new_price, product_id))
        connection.commit()
        cursor.close()
        connection.close()

def get_product(product_id):
    with lock:
        product_key = f'product:{product_id}'
        product_info = r.get(product_key)
        if product_info:
            return product_info.decode('utf-8')
        else:
            # 从 MySQL 查询
            connection = mysql.connector.connect(
                host='localhost',
                user='root',
                password='password',
                database='ecommerce'
            )
            cursor = connection.cursor()
            query = "SELECT * FROM products WHERE id = %s"
            cursor.execute(query, (product_id,))
            product = cursor.fetchone()
            if product:
                r.set(product_key, str(product))
                return str(product)
            cursor.close()
            connection.close()

数据一致性保证

虽然 Redis 与 MySQL 结合可以提高性能,但要确保数据一致性。除了上述缓存更新策略外,还可以采用以下方法:

  1. 异步更新:在更新 MySQL 数据后,通过消息队列(如 RabbitMQ、Kafka 等)异步通知 Redis 更新缓存。这样可以减少对业务流程的阻塞,但需要处理消息队列的可靠性和消息重复问题。 例如,使用 RabbitMQ 实现异步缓存更新:
import pika

# 连接 RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明队列
channel.queue_declare(queue='cache_update')

# 发送消息
product_id = '1'
message = f'update_product:{product_id}'
channel.basic_publish(exchange='', routing_key='cache_update', body = message)

print('消息已发送到队列')
connection.close()

在另一个消费者程序中接收消息并更新 Redis 缓存:

import pika
import redis

# 连接 RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明队列
channel.queue_declare(queue='cache_update')

# 连接 Redis
r = redis.Redis(host='localhost', port=6379, db = 0)

def callback(ch, method, properties, body):
    parts = body.decode('utf-8').split(':')
    if parts[0] == 'update_product':
        product_id = parts[1]
        product_key = f'product:{product_id}'
        r.delete(product_key)
        print(f'已删除 Redis 中商品 {product_id} 的缓存')

# 消费消息
channel.basic_consume(queue='cache_update', on_message_callback = callback, auto_ack = True)

print('等待消息...')
channel.start_consuming()
  1. 版本控制:在数据库表中添加版本号字段,每次数据更新时版本号递增。在读取数据时,同时读取版本号并存储在 Redis 缓存中。当再次读取缓存数据时,比较版本号,如果不一致则从数据库重新获取数据并更新缓存。

监控与调优

  1. 监控工具:使用 Redis 自带的 redis-cli 工具可以监控 Redis 的运行状态,如 INFO 命令可以获取 Redis 服务器的各种统计信息,包括内存使用、客户端连接数、命令执行次数等。
redis-cli INFO

对于 MySQL,可以使用 SHOW STATUS 语句查看数据库的运行状态,例如查看查询缓存的使用情况:

SHOW STATUS LIKE 'Qcache%';
  1. 性能调优:根据监控数据进行性能调优。对于 Redis,合理配置内存大小、调整数据结构的使用方式等可以提高性能。例如,如果发现内存使用过高,可以考虑使用更紧凑的数据结构或者清理过期数据。

对于 MySQL,优化查询语句、调整索引策略、配置合适的缓存参数等是常见的性能调优手段。例如,通过 EXPLAIN 关键字分析查询语句的执行计划,找出性能瓶颈:

EXPLAIN SELECT * FROM products WHERE price > 100;

根据执行计划结果,添加合适的索引来提高查询效率:

CREATE INDEX idx_price ON products(price);

案例分析

案例一:新闻资讯平台

在一个新闻资讯平台中,需要频繁查询热门新闻列表(按浏览量排序)。新闻数据存储在 MySQL 中,表结构如下:

CREATE TABLE news (
    id INT AUTO_INCREMENT PRIMARY KEY,
    title VARCHAR(255),
    content TEXT,
    views INT,
    publish_time TIMESTAMP
);

为了提高查询热门新闻的效率,使用 Redis 缓存热门新闻列表。每小时更新一次缓存。

import redis
import mysql.connector
import schedule
import time

r = redis.Redis(host='localhost', port=6379, db = 0)

def update_hot_news_cache():
    connection = mysql.connector.connect(
        host='localhost',
        user='root',
        password='password',
        database='news_db'
    )
    cursor = connection.cursor()
    query = "SELECT id, title, views FROM news ORDER BY views DESC LIMIT 10"
    cursor.execute(query)
    hot_news = cursor.fetchall()
    cursor.close()
    connection.close()

    # 将热门新闻存储到 Redis 列表中
    r.delete('hot_news')
    for news in hot_news:
        news_data = f"{news[0]}:{news[1]}:{news[2]}"
        r.rpush('hot_news', news_data)

# 每小时执行一次更新缓存任务
schedule.every(1).hours.do(update_hot_news_cache)

while True:
    schedule.run_pending()
    time.sleep(1)

当用户请求热门新闻列表时,直接从 Redis 中获取:

hot_news = r.lrange('hot_news', 0, -1)
for news in hot_news:
    news_id, title, views = news.decode('utf-8').split(':')
    print(f'新闻 ID: {news_id}, 标题: {title}, 浏览量: {views}')

案例二:在线教育平台

在线教育平台需要查询某个课程的所有学生及其学习进度。课程信息和学生信息存储在 MySQL 中,表结构如下:

CREATE TABLE courses (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(255)
);

CREATE TABLE students (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(255)
);

CREATE TABLE course_students (
    id INT AUTO_INCREMENT PRIMARY KEY,
    course_id INT,
    student_id INT,
    progress DECIMAL(5, 2),
    FOREIGN KEY (course_id) REFERENCES courses(id),
    FOREIGN KEY (student_id) REFERENCES students(id)
);

为了提高查询效率,利用 Redis 的哈希结构存储课程与学生的关系及学习进度。当有新学生加入课程或学习进度更新时,同时更新 Redis 和 MySQL。

import redis
import mysql.connector

r = redis.Redis(host='localhost', port=6379, db = 0)

def add_student_to_course(course_id, student_id, progress):
    connection = mysql.connector.connect(
        host='localhost',
        user='root',
        password='password',
        database='education_db'
    )
    cursor = connection.cursor()
    query = "INSERT INTO course_students (course_id, student_id, progress) VALUES (%s, %s, %s)"
    cursor.execute(query, (course_id, student_id, progress))
    connection.commit()
    cursor.close()
    connection.close()

    # 更新 Redis 哈希
    r.hset(f'course:{course_id}','student:{student_id}', progress)

def get_students_progress(course_id):
    students_progress = r.hgetall(f'course:{course_id}')
    for student_key, progress in students_progress.items():
        student_id = student_key.decode('utf-8').split(':')[1]
        print(f'学生 ID: {student_id}, 学习进度: {progress.decode("utf-8")}')

通过这种方式,在查询某个课程学生学习进度时,直接从 Redis 中获取,大大提高了查询性能。

综上所述,Redis 与 MySQL 结合可以有效地应对复杂业务查询需求,通过合理的策略和优化手段,可以在提高系统性能的同时保证数据的一致性和可靠性。在实际应用中,需要根据具体业务场景和需求,灵活选择结合方式和优化方法。