MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Redis发布/订阅模式在消息传递中的应用实践

2022-06-187.5k 阅读

Redis发布/订阅模式基础

Redis的发布/订阅(Publish/Subscribe)模式是一种消息通信模式,其中发送者(发布者)不直接将消息发送给特定的接收者(订阅者),而是将消息发布到一个或多个频道(channel)。订阅了这些频道的所有客户端(订阅者)都会收到该消息。这种模式实现了消息的广播机制,在解耦消息生产者和消费者方面有着显著的优势。

基本概念

  1. 频道(Channel):频道是消息的载体,发布者将消息发布到频道,订阅者通过订阅频道来接收消息。频道可以是任何符合Redis键命名规范的字符串,例如 "news:sports"、"chat:room1" 等。
  2. 发布者(Publisher):发布者负责将消息发送到指定的频道。它不需要知道有哪些订阅者会接收消息,只关心将消息发布到正确的频道。
  3. 订阅者(Subscriber):订阅者通过订阅一个或多个频道来接收发布者发布到这些频道的消息。订阅者只关心自己所订阅频道的消息。

工作流程

  1. 订阅者使用 SUBSCRIBE 命令订阅一个或多个频道。例如,在Redis客户端中执行 SUBSCRIBE news:sports,这表示该客户端订阅了 news:sports 频道。
  2. 发布者使用 PUBLISH 命令向指定频道发布消息。例如,执行 PUBLISH news:sports "Lakers won the game",这样所有订阅了 news:sports 频道的客户端都会收到这条消息。
  3. 当有新消息发布到频道时,Redis服务器会将消息推送给所有订阅该频道的客户端。

应用场景

实时通信

  1. 聊天应用:在多人聊天应用中,每个聊天房间可以看作是一个频道。当一个用户发送消息时,该消息作为发布内容发布到对应的聊天房间频道,其他订阅了该频道的用户(即同在该聊天房间的用户)就能实时收到消息。
  2. 实时通知:例如股票交易平台,当某只股票价格达到特定阈值、新闻网站有新的重大新闻发布等场景下,可以通过Redis的发布/订阅模式向订阅了相关频道的用户推送实时通知。

分布式系统中的消息传递

  1. 微服务间通信:在微服务架构中,不同的微服务可能需要进行异步通信。通过Redis的发布/订阅模式,一个微服务可以作为发布者发布事件消息,其他感兴趣的微服务作为订阅者接收这些消息,从而实现微服务之间的松耦合通信。例如,订单服务在创建新订单后,可以发布一条 "new_order" 消息到相应频道,库存服务订阅该频道,接收到消息后进行库存扣减操作。
  2. 分布式缓存更新:在分布式缓存系统中,当某个缓存数据发生变化时,可以通过发布/订阅模式通知其他节点更新相应的缓存数据。比如,一个商品信息在数据库中被修改,更新该商品信息缓存的服务可以发布一条消息到 "product_cache_update" 频道,其他使用该商品缓存数据的服务订阅此频道,接收到消息后重新从数据库加载最新数据并更新缓存。

代码示例

Python示例

  1. 安装Redis库: 首先需要安装 redis - py 库,可以使用 pip install redis 命令进行安装。
  2. 订阅者代码
import redis

r = redis.Redis(host='localhost', port=6379, db = 0)


def subscribe_to_channel():
    pubsub = r.pubsub()
    pubsub.subscribe('test_channel')
    for message in pubsub.listen():
        if message['type'] =='message':
            print(f"Received message: {message['data'].decode('utf - 8')}")


if __name__ == '__main__':
    subscribe_to_channel()


在上述代码中,首先创建了一个Redis连接对象 r。然后,通过 pubsub() 方法创建了一个发布/订阅对象 pubsub。接着使用 subscribe 方法订阅了 test_channel 频道。listen 方法是一个阻塞方法,它会持续监听频道上的消息,当有消息到达时,会根据消息类型进行处理,如果是 message 类型的消息,则打印出消息内容。 3. 发布者代码

import redis

r = redis.Redis(host='localhost', port=6379, db = 0)


def publish_message():
    message = "Hello, this is a test message"
    r.publish('test_channel', message)


if __name__ == '__main__':
    publish_message()


发布者代码中,同样先创建了Redis连接对象 r,然后使用 publish 方法向 test_channel 频道发布了一条消息。当运行发布者代码后,订阅了 test_channel 频道的订阅者就会收到这条消息。

Java示例

  1. 添加依赖: 在Maven项目的 pom.xml 文件中添加Jedis依赖:
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.6.0</version>
</dependency>
  1. 订阅者代码
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

public class RedisSubscriber {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost", 6379);
        JedisPubSub jedisPubSub = new JedisPubSub() {
            @Override
            public void onMessage(String channel, String message) {
                System.out.println("Received message from channel " + channel + ": " + message);
            }
        };
        jedis.subscribe(jedisPubSub, "test_channel");
    }
}

在上述Java代码中,首先创建了一个Jedis对象连接到本地Redis服务器。然后定义了一个 JedisPubSub 匿名类,重写了 onMessage 方法,当接收到消息时,会打印出消息所在频道和消息内容。最后使用 subscribe 方法订阅了 test_channel 频道。 3. 发布者代码

import redis.clients.jedis.Jedis;

public class RedisPublisher {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost", 6379);
        String message = "Hello from Java publisher";
        jedis.publish("test_channel", message);
    }
}

发布者代码通过Jedis对象连接到Redis服务器,然后使用 publish 方法向 test_channel 频道发布了一条消息。

Node.js示例

  1. 安装Redis库: 使用 npm install redis 命令安装 ioredis 库。
  2. 订阅者代码
const Redis = require('ioredis');
const redis = new Redis();

redis.subscribe('test_channel', (err, count) => {
    if (err) {
        console.error(err);
        return;
    }
    console.log(`Subscribed to ${count} channels`);
});

redis.on('message', (channel, message) => {
    console.log(`Received message from channel ${channel}: ${message}`);
});

在上述Node.js代码中,首先引入 ioredis 库并创建了一个Redis实例。然后使用 subscribe 方法订阅了 test_channel 频道,并在订阅成功时打印出订阅的频道数量。on('message') 事件监听器用于接收频道上的消息,并打印出消息所在频道和消息内容。 3. 发布者代码

const Redis = require('ioredis');
const redis = new Redis();

const message = 'Hello from Node.js publisher';
redis.publish('test_channel', message);

发布者代码通过 publish 方法向 test_channel 频道发布了一条消息。

Redis发布/订阅模式的特性与注意事项

消息的即时性

  1. 优点:Redis的发布/订阅模式具有很强的即时性。一旦发布者发布消息到频道,Redis服务器会立即将消息推送给所有订阅该频道的订阅者。这使得它非常适合需要实时响应的场景,如实时聊天、实时监控等。
  2. 缺点:由于消息是即时推送的,如果订阅者在消息发布时处于离线状态,那么它将错过该消息。Redis本身不会存储历史消息,所以对于需要保存历史消息以供后续查看的场景,单纯的发布/订阅模式无法满足,需要结合其他机制,如将消息存储到数据库中。

广播特性

  1. 优点:发布/订阅模式采用广播机制,一个消息可以被多个订阅者接收。这在需要向多个客户端发送相同信息的场景下非常高效,如向所有在线用户推送系统公告等。
  2. 缺点:广播特性可能会导致消息冗余。如果一个频道有大量的订阅者,发布一条消息会占用较多的网络带宽和服务器资源。同时,对于一些只需要部分订阅者接收消息的场景,可能需要更精细的消息分发策略,而单纯的发布/订阅模式无法很好地满足这种需求。

可靠性

  1. 优点:Redis本身是一个高性能的内存数据库,在正常情况下,发布/订阅功能能够稳定运行。并且Redis支持主从复制和集群模式,可以在一定程度上提高系统的可用性和可靠性。
  2. 缺点:Redis发布/订阅模式不是完全可靠的。如果Redis服务器发生故障或者网络中断,可能会导致消息丢失。特别是在发布者发布消息后但还未成功推送给所有订阅者时发生故障,部分订阅者可能无法收到消息。为了提高可靠性,可以采用一些额外的措施,如在发布者端记录已发布的消息,订阅者定期检查是否有遗漏的消息等。

频道管理

  1. 动态订阅与取消订阅:订阅者可以在运行时动态地订阅或取消订阅频道。这使得应用程序能够根据运行时的需求灵活地调整消息接收范围。例如,在一个用户可以自由加入和退出不同聊天房间的聊天应用中,用户对应的客户端可以根据用户的操作动态地订阅或取消订阅相应的聊天房间频道。
  2. 频道命名规范:由于频道名称是字符串,应用程序需要定义良好的频道命名规范。合理的命名规范有助于组织和管理频道,提高代码的可读性和可维护性。例如,可以采用分层命名的方式,如 "category:subcategory:channel_name",这样能够更清晰地表示频道的用途和层次关系。

高级应用与扩展

通配符订阅

  1. 概念:Redis支持通配符订阅,通过使用 PSUBSCRIBE 命令,订阅者可以订阅匹配特定通配符模式的多个频道。通配符有两种:* 匹配任意数量的字符,? 匹配单个字符。
  2. 示例: 假设我们有多个体育新闻频道,命名规则为 "news:sports:football"、"news:sports:basketball"、"news:sports:tennis" 等。如果我们想要订阅所有体育新闻频道,可以使用 PSUBSCRIBE news:sports:* 命令。在代码中,以Python为例:
import redis

r = redis.Redis(host='localhost', port=6379, db = 0)


def psubscribe_to_channels():
    pubsub = r.pubsub()
    pubsub.psubscribe('news:sports:*')
    for message in pubsub.listen():
        if message['type'] == 'pmessage':
            print(f"Received message from pattern {message['pattern'].decode('utf - 8')} "
                  f"on channel {message['channel'].decode('utf - 8')}: "
                  f"{message['data'].decode('utf - 8')}")


if __name__ == '__main__':
    psubscribe_to_channels()


在上述代码中,使用 psubscribe 方法订阅了所有以 "news:sports:" 开头的频道。listen 方法监听消息,当接收到通配符匹配的消息(typepmessage)时,打印出匹配的模式、频道以及消息内容。

多频道发布

  1. 操作:发布者可以一次性向多个频道发布相同的消息。在Redis客户端中,可以通过多次调用 PUBLISH 命令来实现,但这样效率较低。一些Redis客户端库提供了批量发布的方法。以Java的Jedis为例:
import redis.clients.jedis.Jedis;

public class MultiChannelPublisher {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost", 6379);
        String message = "This is a multi - channel message";
        String[] channels = {"channel1", "channel2", "channel3"};
        for (String channel : channels) {
            jedis.publish(channel, message);
        }
    }
}

上述代码中,通过循环向多个频道发布了同一条消息。在实际应用中,如果频道数量较多,这种方式性能较低。一些高级客户端库可能提供更高效的批量发布方法,如通过一次网络请求发布到多个频道。

结合其他Redis功能

  1. 与Redis List结合:可以将发布/订阅模式与Redis的List数据结构结合使用,以实现消息的持久化和可靠传递。例如,发布者在发布消息到频道的同时,将消息也添加到一个List中。订阅者在订阅频道接收消息的同时,也从List中读取消息,这样即使在订阅者离线期间有消息发布,也可以从List中获取历史消息。
  2. 与Redis事务结合:在某些场景下,可能需要保证发布消息的原子性。可以将发布操作放在Redis事务中执行。例如,在一个电商系统中,当订单状态发生变化时,可能需要同时发布订单状态更新消息到多个频道,并且要保证这些发布操作要么全部成功,要么全部失败。以Python的 redis - py 库为例:
import redis

r = redis.Redis(host='localhost', port=6379, db = 0)


def publish_with_transaction():
    pipe = r.pipeline()
    pipe.multi()
    pipe.publish('order_status_channel1', 'Order status updated')
    pipe.publish('order_status_channel2', 'Order status updated')
    pipe.execute()


if __name__ == '__main__':
    publish_with_transaction()


在上述代码中,通过 pipeline 创建了一个管道,并使用 multi 开启事务,然后在事务中执行多个 publish 操作,最后通过 execute 方法提交事务,保证了多个发布操作的原子性。

性能优化

减少频道数量

  1. 原因:过多的频道会增加Redis服务器的管理负担。每个频道都需要维护自己的订阅者列表等信息,频道数量过多会占用较多的内存和CPU资源。
  2. 优化方法:在设计应用程序时,尽量合理地合并频道。例如,在一个物联网设备监控系统中,如果每个设备都对应一个单独的频道来接收控制指令,当设备数量较多时,频道数量会急剧增加。可以按照设备类型或者区域等维度进行频道合并,如 "iot:device_type:control" 或者 "iot:region:control",通过在消息中添加设备标识等信息来区分不同设备的指令。

批量处理消息

  1. 发布者端:对于发布者,如果需要频繁发布消息,可以考虑批量发布。如前文提到的Java代码中,通过循环逐个发布消息性能较低。一些客户端库支持批量发布操作,这样可以减少网络通信次数,提高发布效率。例如,在 redis - py 库中,可以将多个 publish 操作放在一个管道中执行:
import redis

r = redis.Redis(host='localhost', port=6379, db = 0)


def batch_publish():
    pipe = r.pipeline()
    messages = [("channel1", "message1"), ("channel2", "message2"), ("channel3", "message3")]
    for channel, message in messages:
        pipe.publish(channel, message)
    pipe.execute()


if __name__ == '__main__':
    batch_publish()


  1. 订阅者端:对于订阅者,如果接收到的消息处理逻辑较为复杂,且消息量较大,可以考虑批量处理消息。例如,在Python中,可以设置一个缓冲区,当缓冲区中的消息达到一定数量或者经过一定时间后,再批量处理这些消息,而不是接收到一条消息就处理一次。
import redis
import time

r = redis.Redis(host='localhost', port=6379, db = 0)
message_buffer = []


def subscribe_and_batch_process():
    pubsub = r.pubsub()
    pubsub.subscribe('test_channel')
    start_time = time.time()
    for message in pubsub.listen():
        if message['type'] =='message':
            message_buffer.append(message['data'].decode('utf - 8'))
        if len(message_buffer) >= 10 or (time.time() - start_time) >= 5:
            # 处理缓冲区中的消息
            for msg in message_buffer:
                print(f"Processing message: {msg}")
            message_buffer = []
            start_time = time.time()


if __name__ == '__main__':
    subscribe_and_batch_process()


在上述代码中,设置了缓冲区 message_buffer,当缓冲区中的消息数量达到10条或者距离上次处理消息的时间超过5秒时,就批量处理缓冲区中的消息。

合理配置Redis服务器

  1. 内存配置:根据应用程序的消息量和频道数量,合理分配Redis服务器的内存。如果内存不足,可能会导致消息丢失或者性能下降。可以通过修改Redis配置文件中的 maxmemory 参数来设置最大内存限制,并根据实际情况选择合适的内存淘汰策略,如 volatile - lru(在设置了过期时间的键中,使用最近最少使用算法淘汰键)、allkeys - lru(在所有键中,使用最近最少使用算法淘汰键)等。
  2. 网络配置:优化Redis服务器的网络配置,如调整TCP缓冲区大小等。合理的网络配置可以减少网络延迟,提高消息发布和订阅的效率。可以通过修改操作系统的网络参数,如 /etc/sysctl.conf 中的 net.core.rmem_max(接收缓冲区最大大小)和 net.core.wmem_max(发送缓冲区最大大小)等参数来优化网络性能。

安全考虑

身份验证

  1. 设置密码:为了防止未经授权的客户端连接到Redis服务器并进行发布/订阅操作,应该为Redis设置密码。在Redis配置文件中,通过设置 requirepass 参数来指定密码。客户端在连接Redis服务器时,需要提供正确的密码才能进行操作。以Python的 redis - py 库为例:
import redis

r = redis.Redis(host='localhost', port=6379, db = 0, password='your_password')


  1. 用户认证:从Redis 6.0开始,支持基于用户名和密码的用户认证。可以通过配置文件或者 ACL(Access Control List)命令来创建用户并设置权限。例如,创建一个只具有发布/订阅权限的用户:
redis - cli
ACL SETUSER pubsub_user on >password ~* +SUBSCRIBE +PUBLISH

然后客户端在连接时需要指定用户名和密码:

import redis

r = redis.Redis(host='localhost', port=6379, db = 0, username='pubsub_user', password='password')


数据加密

  1. 传输加密:如果Redis服务器与客户端之间的数据传输需要保密,如在公网环境中使用,建议使用SSL/TLS加密传输。一些Redis客户端库支持SSL连接,如Java的Jedis库可以通过以下方式创建SSL连接:
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Protocol;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import java.security.NoSuchAlgorithmException;

public class SSLJedisExample {
    public static void main(String[] args) throws NoSuchAlgorithmException {
        SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
        sslContext.init(null, null, null);
        SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();

        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        JedisPool jedisPool = new JedisPool(jedisPoolConfig, "localhost", Protocol.DEFAULT_SSL_PORT, 0, null, 0, sslSocketFactory);
        try (Jedis jedis = jedisPool.getResource()) {
            jedis.publish("test_channel", "Hello over SSL");
        }
    }
}
  1. 存储加密:如果对数据的保密性要求极高,还可以考虑对存储在Redis中的消息进行加密。可以在发布者端对消息进行加密后再发布,订阅者接收到消息后进行解密。常用的加密算法如AES(高级加密标准)等,可以使用相关的加密库来实现。例如,在Python中使用 pycryptodome 库进行AES加密和解密:
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
import redis
import base64

r = redis.Redis(host='localhost', port=6379, db = 0)
key = b'16 - byte - key - 1234567890123456'
cipher = AES.new(key, AES.MODE_CBC)


def encrypt_and_publish(message):
    padded_message = pad(message.encode('utf - 8'), AES.block_size)
    encrypted_message = cipher.encrypt(padded_message)
    iv_and_encrypted = cipher.iv + encrypted_message
    encoded_message = base64.b64encode(iv_and_encrypted).decode('utf - 8')
    r.publish('encrypted_channel', encoded_message)


def subscribe_and_decrypt():
    pubsub = r.pubsub()
    pubsub.subscribe('encrypted_channel')
    for message in pubsub.listen():
        if message['type'] =='message':
            decoded_message = base64.b64decode(message['data'].decode('utf - 8'))
            iv = decoded_message[:AES.block_size]
            encrypted_message = decoded_message[AES.block_size:]
            decrypt_cipher = AES.new(key, AES.MODE_CBC, iv)
            decrypted_message = unpad(decrypt_cipher.decrypt(encrypted_message), AES.block_size)
            print(f"Decrypted message: {decrypted_message.decode('utf - 8')}")


if __name__ == '__main__':
    encrypt_and_publish("This is a secret message")
    subscribe_and_decrypt()


在上述代码中,发布者使用AES加密消息后发布到频道,订阅者接收到消息后进行解密。

通过以上对Redis发布/订阅模式在消息传递中的应用实践的详细阐述,包括基础概念、应用场景、代码示例、特性与注意事项、高级应用与扩展、性能优化以及安全考虑等方面,希望能帮助开发者更好地理解和应用这一强大的消息通信模式,构建出高效、可靠、安全的消息传递系统。