MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Redis发布/订阅命令PUBLISH与SUBSCRIBE详解

2022-04-257.9k 阅读

Redis 发布/订阅模型概述

Redis 的发布/订阅(Publish/Subscribe)是一种消息通信模式,它允许用户向特定的频道(channel)发布消息,而多个客户端可以订阅一个或多个频道来接收这些消息。这种模式在构建实时应用,如聊天应用、实时数据更新、通知系统等场景中具有广泛应用。

在发布/订阅模型中,存在三个主要角色:发布者(Publisher)、订阅者(Subscriber)和频道(Channel)。发布者负责向频道发送消息,订阅者通过订阅频道来接收发布者发送到该频道的消息。一个频道可以有多个订阅者,同时一个订阅者也可以订阅多个频道。

PUBLISH 命令详解

命令格式

PUBLISH channel message

其中,channel 是要发布消息的频道名称,message 是要发布的具体消息内容。

命令作用

PUBLISH 命令用于将 message 发送到指定的 channel。一旦消息发布,所有订阅了该 channel 的客户端都会接收到此消息。

返回值

PUBLISH 命令返回一个整数,表示接收到消息的订阅者数量。如果没有订阅者,返回值为 0。

示例代码(Python 与 Redis - PyRedis 库)

import redis

# 连接 Redis 服务器
r = redis.Redis(host='localhost', port=6379, db = 0)

# 发布消息
count = r.publish('news_channel', 'New technology breakthrough!')
print(f"Number of subscribers who received the message: {count}")

在上述代码中,我们首先使用 redis.Redis 连接到本地的 Redis 服务器。然后通过 r.publish 方法向 news_channel 频道发布了一条消息。publish 方法返回接收到消息的订阅者数量,并打印出来。

示例代码(Java 与 Redis - Jedis 库)

import redis.clients.jedis.Jedis;

public class RedisPublisher {
    public static void main(String[] args) {
        // 连接 Redis 服务器
        Jedis jedis = new Jedis("localhost", 6379);

        // 发布消息
        Long count = jedis.publish("news_channel", "New technology breakthrough!");
        System.out.println("Number of subscribers who received the message: " + count);

        // 关闭连接
        jedis.close();
    }
}

在这段 Java 代码中,使用 Jedis 连接到 Redis 服务器,通过 jedis.publish 方法向 news_channel 频道发布消息,并打印接收到消息的订阅者数量。最后关闭了 Jedis 连接。

SUBSCRIBE 命令详解

命令格式

SUBSCRIBE channel [channel ...]

其中,channel 是要订阅的频道名称,可以同时指定多个频道。

命令作用

SUBSCRIBE 命令用于订阅指定的频道。一旦客户端执行此命令,它将进入订阅状态,开始接收发送到所订阅频道的消息。

接收消息格式

当客户端订阅频道后,接收到的消息格式如下:

  1. 订阅确认消息:当客户端成功订阅频道时,会收到一个订阅确认消息。格式为 ["subscribe", "channel_name", subscribed_channel_count]。其中,subscribe 表示这是一个订阅确认消息,channel_name 是刚刚订阅的频道名称,subscribed_channel_count 是当前客户端订阅的频道总数。
  2. 实际消息:当有消息发布到订阅的频道时,客户端会收到格式为 ["message", "channel_name", "message_content"] 的消息。其中,message 表示这是一条实际的消息,channel_name 是消息发布的频道名称,message_content 是消息的具体内容。

示例代码(Python 与 Redis - PyRedis 库)

import redis

# 连接 Redis 服务器
r = redis.Redis(host='localhost', port=6379, db = 0)

# 创建发布订阅对象
pubsub = r.pubsub()

# 订阅频道
pubsub.subscribe('news_channel')

# 接收消息
for message in pubsub.listen():
    if message['type'] =='subscribe':
        print(f"Subscribed to {message['channel'].decode('utf-8')}")
    elif message['type'] =='message':
        print(f"Received message from {message['channel'].decode('utf-8')}: {message['data'].decode('utf-8')}")

在上述 Python 代码中,我们首先创建了一个 Redis 连接,并通过 r.pubsub() 创建了一个发布订阅对象。然后使用 pubsub.subscribe('news_channel') 订阅了 news_channel 频道。接着通过 pubsub.listen() 进入消息监听循环,根据消息类型分别处理订阅确认消息和实际消息。

示例代码(Java 与 Redis - Jedis 库)

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

public class RedisSubscriber {
    public static void main(String[] args) {
        // 连接 Redis 服务器
        Jedis jedis = new Jedis("localhost", 6379);

        // 创建 JedisPubSub 实例
        JedisPubSub jedisPubSub = new JedisPubSub() {
            @Override
            public void onMessage(String channel, String message) {
                System.out.println("Received message from " + channel + ": " + message);
            }

            @Override
            public void onSubscribe(String channel, int subscribedChannels) {
                System.out.println("Subscribed to " + channel);
            }
        };

        // 订阅频道
        jedis.subscribe(jedisPubSub, "news_channel");

        // 关闭连接
        jedis.close();
    }
}

在这段 Java 代码中,我们创建了一个 Jedis 连接,并定义了一个 JedisPubSub 的匿名子类,重写了 onMessageonSubscribe 方法来处理接收到的消息和订阅确认。然后通过 jedis.subscribe 方法订阅了 news_channel 频道。最后关闭了 Jedis 连接。

发布/订阅的内部实现原理

Redis 的发布/订阅功能是基于事件驱动机制实现的。当一个客户端执行 SUBSCRIBE 命令时,Redis 会在内部维护一个数据结构,将该客户端与所订阅的频道进行关联。这个数据结构通常是一个字典,其中键为频道名称,值为订阅该频道的客户端列表。

当另一个客户端执行 PUBLISH 命令时,Redis 首先查找该频道对应的订阅者列表。如果找到了订阅者,就会将消息发送给列表中的每个客户端。这里的消息发送过程并不是阻塞式的,而是通过 Redis 的事件循环机制,将消息发送任务加入到事件队列中,在合适的时机进行处理。

在 Redis 的底层实现中,发布/订阅功能依赖于文件事件处理器。文件事件处理器负责监听客户端的套接字,当有订阅或发布操作时,会触发相应的事件。对于订阅操作,会将客户端信息记录到相应的数据结构中;对于发布操作,会遍历订阅者列表,将消息发送给每个订阅者。

这种实现方式使得 Redis 的发布/订阅功能能够高效地处理大量的订阅者和频繁的消息发布,同时保证了 Redis 单线程模型下的高性能。

发布/订阅的优缺点

优点

  1. 简单易用:Redis 的发布/订阅命令非常简单直观,开发者可以很容易地实现消息的发布和订阅功能,快速搭建实时通信系统。
  2. 高性能:基于 Redis 的单线程模型和事件驱动机制,发布/订阅功能能够高效地处理大量的消息和订阅者,适用于高并发的场景。
  3. 灵活性:一个客户端可以订阅多个频道,一个频道也可以有多个订阅者,这种灵活的订阅关系使得系统可以根据实际需求进行灵活配置。

缺点

  1. 消息可靠性:Redis 的发布/订阅是一种“发后即忘”的模式,即发布者发布消息后,不会关心订阅者是否成功接收。如果在消息发布和订阅者接收之间发生网络故障等问题,消息可能会丢失。
  2. 无持久化:Redis 默认不会持久化发布/订阅的消息。如果 Redis 服务器重启,之前发布但未被接收的消息将会丢失。
  3. 无消息队列:与专业的消息队列系统(如 Kafka、RabbitMQ)不同,Redis 的发布/订阅没有消息队列的概念,无法对消息进行排队处理,不适合需要严格顺序处理消息的场景。

发布/订阅的应用场景

实时聊天应用

在实时聊天应用中,每个聊天房间可以看作是一个频道。当一个用户发送消息时,就相当于向对应的频道发布消息,而其他订阅了该频道(即加入该聊天房间)的用户会收到这条消息。通过 Redis 的发布/订阅功能,可以很方便地实现这种实时聊天的功能。

实时数据更新

对于一些需要实时更新数据的应用,如股票行情、天气预报等,可以将数据变化作为消息发布到特定频道,而前端应用或其他需要实时获取数据的组件订阅该频道,从而及时获取最新数据。

通知系统

在各种应用中,通知系统是常见的需求。例如,当用户有新的订单、新的消息等情况时,可以通过 Redis 的发布/订阅功能,将通知消息发布到用户对应的频道(可以根据用户 ID 等标识创建频道),用户客户端订阅该频道来接收通知。

扩展阅读:Pattern Subscribe

除了基本的 SUBSCRIBE 命令,Redis 还提供了 PSUBSCRIBE 命令,即模式订阅。

命令格式

PSUBSCRIBE pattern [pattern ...]

其中,pattern 是一个模式字符串,可以包含通配符。例如,news.* 表示匹配所有以 news. 开头的频道。

命令作用

PSUBSCRIBE 命令允许客户端订阅符合指定模式的频道。当有消息发布到匹配模式的频道时,订阅该模式的客户端都会收到消息。

接收消息格式

SUBSCRIBE 类似,订阅确认消息格式为 ["psubscribe", "pattern", subscribed_pattern_count],实际消息格式为 ["pmessage", "pattern", "channel_name", "message_content"]。其中,psubscribepmessage 分别表示模式订阅确认和模式匹配消息,pattern 是订阅时的模式字符串,channel_name 是实际发布消息的频道名称。

示例代码(Python 与 Redis - PyRedis 库)

import redis

# 连接 Redis 服务器
r = redis.Redis(host='localhost', port=6379, db = 0)

# 创建发布订阅对象
pubsub = r.pubsub()

# 模式订阅
pubsub.psubscribe('news.*')

# 接收消息
for message in pubsub.listen():
    if message['type'] == 'psubscribe':
        print(f"Pattern subscribed: {message['pattern'].decode('utf-8')}")
    elif message['type'] == 'pmessage':
        print(f"Received pattern message from {message['channel'].decode('utf-8')} (matching {message['pattern'].decode('utf-8')}): {message['data'].decode('utf-8')}")

在上述 Python 代码中,通过 pubsub.psubscribe('news.*') 订阅了所有以 news. 开头的频道,并在消息监听循环中处理模式订阅确认和模式匹配消息。

示例代码(Java 与 Redis - Jedis 库)

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

public class RedisPatternSubscriber {
    public static void main(String[] args) {
        // 连接 Redis 服务器
        Jedis jedis = new Jedis("localhost", 6379);

        // 创建 JedisPubSub 实例
        JedisPubSub jedisPubSub = new JedisPubSub() {
            @Override
            public void onPMessage(String pattern, String channel, String message) {
                System.out.println("Received pattern message from " + channel + " (matching " + pattern + "): " + message);
            }

            @Override
            public void onPSubscribe(String pattern, int subscribedChannels) {
                System.out.println("Pattern subscribed: " + pattern);
            }
        };

        // 模式订阅
        jedis.psubscribe(jedisPubSub, "news.*");

        // 关闭连接
        jedis.close();
    }
}

在这段 Java 代码中,定义了一个 JedisPubSub 的匿名子类来处理模式订阅相关的消息,通过 jedis.psubscribe 方法进行模式订阅,并在相应的方法中打印接收到的消息和订阅确认信息。

模式订阅为 Redis 的发布/订阅功能增加了更灵活的订阅方式,在一些需要批量订阅相关频道的场景中非常有用。

与其他消息队列系统的比较

与 Kafka 的比较

  1. 消息持久化:Kafka 具有强大的消息持久化机制,消息会被持久化到磁盘,保证即使 Kafka 集群重启,消息也不会丢失。而 Redis 的发布/订阅默认不持久化消息,除非进行额外配置。
  2. 性能:Kafka 设计用于处理高吞吐量的消息,尤其适用于大数据场景下的消息处理,能够支持每秒百万级别的消息写入。Redis 的发布/订阅虽然也具有较高性能,但在大规模数据处理能力上不如 Kafka。
  3. 应用场景:Kafka 常用于大数据领域,如日志收集、数据传输等场景。Redis 的发布/订阅更侧重于实时性要求较高、数据量相对较小的实时通信场景,如聊天应用、实时通知等。

与 RabbitMQ 的比较

  1. 消息可靠性:RabbitMQ 提供了丰富的消息确认机制,确保消息能够可靠地发送和接收,通过事务、确认机制等可以有效避免消息丢失。而 Redis 的发布/订阅在消息可靠性方面相对较弱。
  2. 功能特性:RabbitMQ 支持多种消息模型,如简单队列、工作队列、发布/订阅、路由、主题等,功能非常丰富。Redis 的发布/订阅则是相对简单直接的消息通信模式。
  3. 性能:在高并发场景下,Redis 的发布/订阅由于基于单线程和事件驱动机制,在处理速度上可能更快一些,但 RabbitMQ 通过多线程和集群化部署也能提供较高的性能。

总结 Redis 发布/订阅的使用要点

  1. 消息可靠性处理:如果在应用中需要保证消息的可靠性,需要在应用层进行额外的处理,如消息确认机制、重试机制等。
  2. 持久化配置:如果希望消息能够持久化,需要对 Redis 进行相关配置,如开启 AOF 或 RDB 持久化,并根据实际需求调整配置参数。
  3. 模式订阅的合理使用:在需要批量订阅相关频道时,充分利用模式订阅功能,可以简化代码并提高订阅效率。
  4. 与其他系统结合:根据应用场景的需求,可以将 Redis 的发布/订阅与其他消息队列系统(如 Kafka、RabbitMQ)结合使用,发挥各自的优势。

通过深入理解 Redis 的发布/订阅命令 PUBLISHSUBSCRIBE,以及相关的扩展功能和与其他消息系统的比较,开发者可以更好地在实际项目中选择和应用 Redis 的发布/订阅功能,构建出高效、可靠的实时通信系统。