MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Redis消息队列在MySQL评论处理系统中的应用

2022-11-135.8k 阅读

1. Redis 消息队列基础

Redis 是一个基于内存的高性能键值对存储数据库,除了常规的缓存使用场景外,它还可以用来构建消息队列。Redis 提供了多种数据结构用于实现消息队列,其中最常用的是 List(列表) 结构和 Stream(流) 结构。

1.1 List 结构实现消息队列

List 结构在 Redis 中可以当作栈或者队列使用。在消息队列场景下,我们主要利用其队列特性。Redis 提供了 LPUSH(从列表左边插入元素)和 RPOP(从列表右边弹出元素)两个命令,这两个命令组合起来就可以实现一个简单的消息队列。

  • 生产者(发送消息)

    import redis
    
    r = redis.Redis(host='localhost', port=6379, db=0)
    
    def produce_message(message):
        r.lpush('comment_queue', message)
    

    上述 Python 代码使用 redis - py 库连接到 Redis 服务器,并定义了一个 produce_message 函数,该函数使用 LPUSH 命令将消息插入到名为 comment_queue 的列表左边。

  • 消费者(接收消息)

    def consume_message():
        result = r.rpop('comment_queue')
        if result:
            return result.decode('utf - 8')
        return None
    

    consume_message 函数使用 RPOP 命令从 comment_queue 列表右边弹出一个元素。如果弹出成功,将字节类型的结果解码为字符串并返回;如果列表为空,返回 None

这种基于 List 结构的消息队列实现简单,但存在一些局限性。例如,当队列为空时,消费者需要不断轮询 RPOP 命令,这会浪费大量 CPU 资源。为了解决这个问题,Redis 提供了 BRPOP 命令,即阻塞式 RPOPBRPOP 会在列表为空时阻塞连接,直到有新元素加入列表才返回,从而避免了无效的轮询。

def consume_message_blocking():
    result = r.brpop('comment_queue', timeout=0)
    if result:
        return result[1].decode('utf - 8')
    return None

在上述代码中,brpop 函数的第一个参数是队列名称,第二个参数 timeout 设置为 0 表示无限期阻塞,直到有消息可用。如果设置为其他正整数,则表示阻塞指定的秒数。当有消息时,brpop 返回一个包含队列名称和消息内容的元组,我们只需要取出消息内容并解码返回。

1.2 Stream 结构实现消息队列

Redis 从 5.0 版本开始引入了 Stream 数据结构,它专门用于消息队列场景,相比 List 结构具有更多高级特性。

Stream 结构以 键值对 的形式存储消息,每个消息都有一个唯一的 ID。生产者使用 XADD 命令向 Stream 中添加消息,消费者使用 XREAD 命令读取消息。

  • 生产者(发送消息)

    def produce_message_stream(message):
        r.xadd('comment_stream', {'message': message})
    

    这里使用 xadd 命令向名为 comment_stream 的 Stream 中添加消息,消息内容是一个键值对,键为 message,值为实际的消息。

  • 消费者组:Stream 结构支持消费者组的概念,这使得多个消费者可以共同处理一个消息队列,实现负载均衡。首先需要使用 XGROUP CREATE 命令创建一个消费者组。

    def create_consumer_group():
        r.xgroup_create('comment_stream', 'comment_group', id='$')
    

    上述代码创建了一个名为 comment_group 的消费者组,id='$' 表示从 Stream 的尾部开始读取消息,即只处理新产生的消息。

  • 消费者(接收消息)

    def consume_message_stream():
        result = r.xreadgroup('comment_group', 'consumer_1', { 'comment_stream': '>' }, count = 1)
        if result:
            message_id = result[0][1][0][0]
            message = result[0][1][0][1]['message'].decode('utf - 8')
            r.xack('comment_stream', 'comment_group', message_id)
            return message
        return None
    

    xreadgroup 函数的第一个参数是消费者组名称,第二个参数是消费者名称,第三个参数是一个字典,指定要读取的 Stream 及其起始 ID。'>' 表示从 Stream 中尚未被消费的最新消息开始读取。count = 1 表示每次只读取一条消息。读取消息后,需要使用 xack 命令向 Stream 确认消息已被处理,这样该消息就不会被再次读取。

Stream 结构还支持消息持久化、消息回溯、按条件过滤消息等高级功能,使其在复杂的消息队列场景中表现更为出色。

2. MySQL 评论处理系统概述

MySQL 是一款广泛使用的关系型数据库,在评论处理系统中,通常会使用 MySQL 来存储评论数据。评论数据一般包含评论内容、评论者信息、评论时间等字段。例如,我们可以创建一个 comments 表来存储评论数据:

CREATE TABLE comments (
    id INT AUTO_INCREMENT PRIMARY KEY,
    content TEXT NOT NULL,
    user_id INT NOT NULL,
    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (user_id) REFERENCES users(id)
);

上述 SQL 语句创建了一个 comments 表,id 是自增主键,content 存储评论内容,user_id 关联评论者在 users 表中的 ID,create_time 记录评论创建时间,并设置默认值为当前时间。

在实际的评论处理系统中,除了存储评论数据,还可能涉及到对评论的审核、回复处理、统计分析等功能。例如,审核功能可能需要判断评论是否包含敏感词汇,回复处理需要关联评论和回复之间的关系等。

3. Redis 消息队列在 MySQL 评论处理系统中的应用场景

3.1 异步评论存储

在高并发的评论场景下,如果直接将评论数据写入 MySQL,可能会导致数据库压力过大,响应时间变长。使用 Redis 消息队列可以将评论消息先发送到队列中,然后由后台的消费者异步地从队列中读取消息并写入 MySQL。这样可以有效降低数据库的瞬时负载,提高系统的响应速度。

3.2 评论审核流程

评论审核通常是一个复杂的过程,可能需要调用多个服务进行敏感词检测、违规内容识别等。通过 Redis 消息队列,可以将待审核的评论消息发送到队列中,由专门的审核服务从队列中读取消息进行审核。审核通过的评论可以写入 MySQL,不通过的则进行相应处理(如标记为违规、通知用户等)。

3.3 评论回复处理

当有用户对评论进行回复时,同样可以将回复消息发送到 Redis 消息队列。然后,消费者从队列中读取回复消息,根据评论 ID 找到对应的评论,并在 MySQL 中建立回复与评论之间的关联关系。

4. 基于 Redis 消息队列的 MySQL 评论处理系统实现

4.1 项目架构

我们构建一个简单的基于 Redis 消息队列的 MySQL 评论处理系统,其架构主要包含以下几个部分:

  • Web 应用层:接收用户提交的评论请求,将评论消息发送到 Redis 消息队列。
  • Redis 消息队列:暂存评论消息,等待消费者处理。
  • 后台处理服务:从 Redis 消息队列中读取评论消息,进行相应的处理(如审核、存储到 MySQL 等)。
  • MySQL 数据库:存储最终处理后的评论数据。

4.2 代码实现

下面以 Python 为例,详细展示如何实现这个系统。

Web 应用层(Flask 示例)

from flask import Flask, request
import redis

app = Flask(__name__)
r = redis.Redis(host='localhost', port=6379, db=0)


@app.route('/comment', methods=['POST'])
def submit_comment():
    content = request.form.get('content')
    user_id = request.form.get('user_id')
    message = f'user_id: {user_id}, content: {content}'
    r.lpush('comment_queue', message)
    return 'Comment submitted successfully', 200


if __name__ == '__main__':
    app.run(debug=True)

上述代码使用 Flask 框架搭建了一个简单的 Web 应用,监听 /comment 路由。当收到 POST 请求时,从请求表单中获取评论内容和用户 ID,构造消息并发送到 Redis 的 comment_queue 队列中。

后台处理服务

import redis
import mysql.connector

r = redis.Redis(host='localhost', port=6379, db=0)
cnx = mysql.connector.connect(user='root', password='password',
                              host='127.0.0.1',
                              database='comments_db')
cursor = cnx.cursor()


def process_comment():
    message = r.brpop('comment_queue', timeout=0)
    if message:
        message = message[1].decode('utf - 8')
        user_id, content = message.split(', ')[0].split(': ')[1], message.split(', ')[1].split(': ')[1]
        add_comment = ("INSERT INTO comments (content, user_id) VALUES (%s, %s)")
        data_comment = (content, int(user_id))
        cursor.execute(add_comment, data_comment)
        cnx.commit()
        print(f'Comment {content} from user {user_id} inserted into MySQL')


while True:
    process_comment()

这段代码在后台持续运行,从 Redis 的 comment_queue 队列中阻塞读取评论消息。读取到消息后,解析出用户 ID 和评论内容,然后将其插入到 MySQL 的 comments 表中。

如果涉及评论审核功能,我们可以在后台处理服务中添加审核逻辑。例如,使用第三方敏感词检测库进行敏感词检测:

import redis
import mysql.connector
from sensitive_word_detector import detect_sensitive_words

r = redis.Redis(host='localhost', port=6379, db=0)
cnx = mysql.connector.connect(user='root', password='password',
                              host='127.0.0.1',
                              database='comments_db')
cursor = cnx.cursor()


def process_comment():
    message = r.brpop('comment_queue', timeout=0)
    if message:
        message = message[1].decode('utf - 8')
        user_id, content = message.split(', ')[0].split(': ')[1], message.split(', ')[1].split(': ')[1]
        is_sensitive = detect_sensitive_words(content)
        if not is_sensitive:
            add_comment = ("INSERT INTO comments (content, user_id) VALUES (%s, %s)")
            data_comment = (content, int(user_id))
            cursor.execute(add_comment, data_comment)
            cnx.commit()
            print(f'Comment {content} from user {user_id} inserted into MySQL')
        else:
            print(f'Comment {content} from user {user_id} contains sensitive words, not inserted')


while True:
    process_comment()

上述代码中,引入了 detect_sensitive_words 函数来检测评论内容是否包含敏感词。如果不包含敏感词,则将评论插入到 MySQL;否则,打印提示信息,不进行插入操作。

4.3 性能优化与注意事项

  • 批量处理:在从 Redis 队列读取消息并写入 MySQL 时,可以采用批量处理的方式,减少数据库交互次数。例如,每次从队列中读取多条消息,然后一次性执行多个插入操作。
def process_comments_batch():
    messages = r.brpoplpush('comment_queue', 'processing_queue', count = 10)
    if messages:
        comment_data = []
        for message in messages:
            message = message.decode('utf - 8')
            user_id, content = message.split(', ')[0].split(': ')[1], message.split(', ')[1].split(': ')[1]
            comment_data.append((content, int(user_id)))
        add_comment = ("INSERT INTO comments (content, user_id) VALUES (%s, %s)")
        cursor.executemany(add_comment, comment_data)
        cnx.commit()
        print(f'{len(comment_data)} comments inserted into MySQL')
        for _ in range(len(comment_data)):
            r.lpop('processing_queue')


while True:
    process_comments_batch()

上述代码中,brpoplpush 命令从 comment_queue 队列中弹出最多 10 条消息,并将它们推送到 processing_queue 队列中。然后,解析这些消息,将数据整理成适合批量插入的格式,使用 executemany 方法一次性插入到 MySQL。最后,从 processing_queue 队列中移除已处理的消息。

  • 消息持久化:为了防止 Redis 重启导致消息丢失,可以开启 Redis 的持久化功能(如 AOF 或 RDB)。对于 Stream 结构,它本身就具有一定的持久化特性,因为消息是以日志的形式追加写入的。
  • 错误处理:在从 Redis 读取消息和写入 MySQL 的过程中,要充分考虑各种可能的错误情况,如 Redis 连接失败、MySQL 插入错误等。对于 Redis 连接失败,可以设置重试机制;对于 MySQL 插入错误,需要根据错误类型进行相应处理,例如重复键错误可能需要更新数据而不是插入。
import redis
import mysql.connector
import time

r = redis.Redis(host='localhost', port=6379, db=0)
cnx = mysql.connector.connect(user='root', password='password',
                              host='127.0.0.1',
                              database='comments_db')
cursor = cnx.cursor()


def process_comment():
    max_retries = 3
    for retry in range(max_retries):
        try:
            message = r.brpop('comment_queue', timeout=0)
            if message:
                message = message[1].decode('utf - 8')
                user_id, content = message.split(', ')[0].split(': ')[1], message.split(', ')[1].split(': ')[1]
                add_comment = ("INSERT INTO comments (content, user_id) VALUES (%s, %s)")
                data_comment = (content, int(user_id))
                cursor.execute(add_comment, data_comment)
                cnx.commit()
                print(f'Comment {content} from user {user_id} inserted into MySQL')
                break
        except redis.RedisError as e:
            print(f'Redis error: {e}, retrying in 5 seconds...')
            time.sleep(5)
        except mysql.connector.Error as e:
            if e.errno == mysql.connector.errorcode.ER_DUP_ENTRY:
                # 处理重复键错误,例如更新数据
                update_comment = ("UPDATE comments SET content = %s WHERE user_id = %s")
                data_update = (content, int(user_id))
                cursor.execute(update_comment, data_update)
                cnx.commit()
                print(f'Comment {content} from user {user_id} updated in MySQL')
            else:
                print(f'MySQL error: {e}, retrying in 5 seconds...')
                time.sleep(5)


while True:
    process_comment()

上述代码中,对于 Redis 错误和 MySQL 错误分别进行了处理。对于 Redis 错误,设置了最大重试次数为 3 次,每次重试间隔 5 秒。对于 MySQL 的重复键错误,进行了数据更新操作;对于其他 MySQL 错误,同样设置了重试机制。

5. 总结 Redis 消息队列在 MySQL 评论处理系统中的优势

  • 提高系统响应速度:通过异步处理评论,Web 应用层可以快速响应用户请求,而不需要等待评论数据完全写入 MySQL。用户提交评论后,系统可以立即返回提交成功的消息,提升用户体验。
  • 削峰填谷:在高并发场景下,Redis 消息队列可以暂存大量的评论消息,避免 MySQL 瞬间承受过高的负载。当流量高峰过去后,后台处理服务可以按照一定的速度从队列中读取消息并处理,使 MySQL 的负载保持在一个较为稳定的水平。
  • 解耦系统组件:将评论的提交、审核和存储等功能通过 Redis 消息队列进行解耦。各个组件可以独立开发、部署和扩展,提高了系统的可维护性和可扩展性。例如,如果需要更换审核服务,只需要修改从队列读取消息后的处理逻辑,而不会影响到 Web 应用层和 MySQL 存储部分。
  • 保证数据可靠性:通过合理配置 Redis 的持久化机制以及在后台处理服务中进行适当的错误处理和重试机制,可以保证评论消息不会丢失,确保数据的可靠性。

综上所述,Redis 消息队列在 MySQL 评论处理系统中具有重要的应用价值,可以有效提升系统的性能、稳定性和可扩展性。在实际应用中,需要根据具体的业务需求和系统规模,合理选择 Redis 消息队列的实现方式(List 或 Stream),并进行相应的优化和配置。