MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

RabbitMQ实战入门

2023-11-032.3k 阅读

什么是 RabbitMQ

RabbitMQ 是一个开源的消息代理和队列服务器,它实现了高级消息队列协议(AMQP)。消息队列在后端开发中扮演着至关重要的角色,常用于解耦应用程序的不同部分,处理异步任务,以及实现可靠的消息传递。RabbitMQ 因其可靠性、灵活性和广泛的语言支持而被广泛采用。

核心概念

  1. 生产者(Producer):发送消息的应用程序。生产者创建消息并将其发送到 RabbitMQ 服务器。
  2. 消费者(Consumer):接收消息的应用程序。消费者订阅队列,并从队列中获取消息进行处理。
  3. 队列(Queue):存储消息的地方。队列是 RabbitMQ 服务器上的一种数据结构,它可以存储多个消息,直到消费者将它们取出。
  4. 交换机(Exchange):接收生产者发送的消息,并根据路由规则将消息发送到一个或多个队列。交换机根据不同的类型(如 direct、topic、fanout 等)来决定如何路由消息。
  5. 绑定(Binding):定义了交换机和队列之间的关系,即交换机如何将消息路由到队列。绑定可以包含一个可选的路由键(routing key),用于更精确的路由。

安装 RabbitMQ

在 Linux 上安装

  1. 添加 RabbitMQ 官方仓库: 对于 Debian 或 Ubuntu 系统,可以使用以下命令添加 RabbitMQ 官方仓库:
wget -O - https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey | sudo apt-key add -
sudo apt - get install apt - transport - https
echo "deb https://packagecloud.io/rabbitmq/rabbitmq - server/ubuntu/ $(lsb_release -sc) main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list
  1. 安装 RabbitMQ: 添加仓库后,使用以下命令安装 RabbitMQ 服务器:
sudo apt - get update
sudo apt - get install rabbitmq - server
  1. 启动 RabbitMQ: 安装完成后,可以使用以下命令启动 RabbitMQ 服务:
sudo systemctl start rabbitmq - server
  1. 验证安装: 使用以下命令检查 RabbitMQ 服务状态:
sudo systemctl status rabbitmq - server

在 Windows 上安装

  1. 下载安装包: 从 RabbitMQ 官方网站(https://www.rabbitmq.com/download.html)下载 Windows 安装包。
  2. 安装 Erlang: RabbitMQ 依赖于 Erlang,因此需要先安装 Erlang。从 Erlang 官方网站(https://www.erlang.org/downloads)下载并安装 Erlang。安装过程中注意设置环境变量。
  3. 安装 RabbitMQ: 运行 RabbitMQ 安装包,按照提示完成安装。
  4. 启动 RabbitMQ: 安装完成后,可以在 Windows 服务中找到 RabbitMQ 服务并启动它。

简单消息队列示例

使用 Python 和 pika 库

  1. 安装 pika: 使用 pip 安装 pika 库:
pip install pika
  1. 生产者代码
import pika

# 建立到 RabbitMQ 服务器的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明队列
channel.queue_declare(queue='hello')

# 发送消息
message = 'Hello, RabbitMQ!'
channel.basic_publish(exchange='', routing_key='hello', body=message)
print(" [x] Sent 'Hello, RabbitMQ!'")

# 关闭连接
connection.close()
  1. 消费者代码
import pika


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)


# 建立到 RabbitMQ 服务器的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明队列
channel.queue_declare(queue='hello')

# 消费消息
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

在上述代码中,生产者通过 pika.BlockingConnection 建立到本地 RabbitMQ 服务器的连接,声明一个名为 hello 的队列,并向该队列发送一条消息。消费者同样建立连接,声明相同的队列,并设置一个回调函数 callback 来处理接收到的消息。basic_consume 方法用于指定队列和回调函数,auto_ack=True 表示消息被接收后自动确认,无需手动确认。

交换机类型

Fanout 交换机

Fanout 交换机将接收到的消息广播到所有与之绑定的队列,而不考虑路由键。

  1. 生产者代码
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明 fanout 类型的交换机
channel.exchange_declare(exchange='logs', exchange_type='fanout')

message = 'This is a fanout message'
# 发送消息到交换机,不指定路由键
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent 'This is a fanout message'")

connection.close()
  1. 消费者代码
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明 fanout 类型的交换机
channel.exchange_declare(exchange='logs', exchange_type='fanout')

# 创建一个随机命名的队列
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

# 将队列绑定到交换机
channel.queue_bind(exchange='logs', queue=queue_name)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)


# 消费消息
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

在这个示例中,生产者声明一个 fanout 类型的交换机 logs,并向其发送消息,不指定路由键。消费者声明相同的交换机,并创建一个随机命名的队列,然后将该队列绑定到交换机。这样,无论有多少个消费者,它们都会接收到生产者发送到 logs 交换机的所有消息。

Direct 交换机

Direct 交换机根据消息的路由键将消息发送到特定的队列。只有队列绑定的路由键与消息的路由键完全匹配时,消息才会被发送到该队列。

  1. 生产者代码
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明 direct 类型的交换机
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

severity = 'error'
message = 'This is an error message'
# 发送消息到交换机,指定路由键
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent '%s:%s'" % (severity, message))

connection.close()
  1. 消费者代码
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明 direct 类型的交换机
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

# 创建一个队列
result = channel.queue_declare(queue='')
queue_name = result.method.queue

severities = ['error']
for severity in severities:
    # 将队列绑定到交换机,并指定路由键
    channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)


# 消费消息
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

这里生产者声明一个 direct 类型的交换机 direct_logs,并根据消息的严重程度(如 error)作为路由键发送消息。消费者声明相同的交换机,创建一个队列,并将队列绑定到交换机,同时指定要接收的路由键(如 error)。只有当消息的路由键与消费者绑定的路由键匹配时,消费者才会接收到消息。

Topic 交换机

Topic 交换机允许更灵活的路由,它根据路由键的模式匹配将消息发送到队列。路由键和绑定键都是由点分隔的单词组成,支持通配符。* 匹配一个单词,# 匹配零个或多个单词。

  1. 生产者代码
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明 topic 类型的交换机
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

routing_key = 'kern.critical'
message = 'A critical kernel error'
# 发送消息到交换机,指定路由键
channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message)
print(" [x] Sent '%s:%s'" % (routing_key, message))

connection.close()
  1. 消费者代码
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明 topic 类型的交换机
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

# 创建一个队列
result = channel.queue_declare(queue='')
queue_name = result.method.queue

binding_keys = ['*.critical']
for binding_key in binding_keys:
    # 将队列绑定到交换机,并指定绑定键
    channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)


# 消费消息
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

在这个例子中,生产者声明一个 topic 类型的交换机 topic_logs,并使用一个包含模式的路由键(如 kern.critical)发送消息。消费者声明相同的交换机,创建队列,并使用通配符模式(如 *.critical)将队列绑定到交换机。这样,只要消息的路由键与消费者的绑定键模式匹配,消费者就会接收到消息。

消息确认机制

手动确认

默认情况下,RabbitMQ 在消息被发送到消费者后就认为消息已经被处理。但有时候,我们需要确保消息被消费者真正处理后才确认。这可以通过手动确认来实现。

  1. 消费者代码
import pika


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    # 手动确认消息
    ch.basic_ack(delivery_tag=method.delivery_tag)


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

# 设置为手动确认,auto_ack=False
channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=False)

channel.start_consuming()

在上述代码中,basic_consume 方法的 auto_ack 参数设置为 False,表示关闭自动确认。在回调函数 callback 中,使用 ch.basic_ack(delivery_tag=method.delivery_tag) 手动确认消息。这样,只有当消费者调用 basic_ack 方法后,RabbitMQ 才会认为消息已被成功处理,并从队列中删除。

持久化

为了确保 RabbitMQ 在重启后消息不会丢失,我们可以将队列和消息设置为持久化。

  1. 生产者代码
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明一个持久化队列
channel.queue_declare(queue='task_queue', durable=True)

message = 'This is a durable message'
# 发送一个持久化消息
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # 使消息持久化
                      ))
print(" [x] Sent 'This is a durable message'")

connection.close()
  1. 消费者代码
import pika


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明一个持久化队列
channel.queue_declare(queue='task_queue', durable=True)

print(' [*] Waiting for messages. To exit press CTRL+C')

channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=False)

channel.start_consuming()

在生产者代码中,queue_declare 方法的 durable=True 参数使队列持久化,basic_publish 方法中的 properties=pika.BasicProperties(delivery_mode=2) 使消息持久化。消费者同样声明持久化队列,这样即使 RabbitMQ 服务器重启,持久化的队列和消息也不会丢失。

工作队列模式

工作队列模式用于在多个消费者之间分配任务。多个消费者可以同时从一个队列中获取消息并处理,从而提高任务处理的效率。

  1. 生产者代码
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明队列
channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
# 发送消息
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # 使消息持久化
                      ))
print(" [x] Sent %r" % message)

connection.close()
  1. 消费者代码
import pika
import time


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明队列
channel.queue_declare(queue='task_queue', durable=True)

print(' [*] Waiting for messages. To exit press CTRL+C')

# 设置每个消费者在同一时间最多处理一条消息
channel.basic_qos(prefetch_count=1)

channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=False)

channel.start_consuming()

在这个模式中,生产者向 task_queue 队列发送任务消息。消费者从队列中获取消息并处理,basic_qos(prefetch_count=1) 方法设置每个消费者在同一时间最多处理一条消息,这样可以确保任务在多个消费者之间更均衡地分配。当消费者处理完一条消息后,通过 basic_ack 方法手动确认消息。

发布/订阅模式

发布/订阅模式使用 fanout 交换机,允许生产者将消息发送到多个消费者。

  1. 生产者代码
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明 fanout 交换机
channel.exchange_declare(exchange='logs', exchange_type='fanout')

message = 'This is a pub - sub message'
# 发送消息到交换机
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent 'This is a pub - sub message'")

connection.close()
  1. 消费者代码
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明 fanout 交换机
channel.exchange_declare(exchange='logs', exchange_type='fanout')

# 创建一个随机命名的队列
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

# 将队列绑定到交换机
channel.queue_bind(exchange='logs', queue=queue_name)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)


# 消费消息
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

在发布/订阅模式中,生产者将消息发送到 fanout 交换机 logs,不指定路由键。消费者声明相同的交换机,并创建一个随机命名的队列,然后将队列绑定到交换机。这样,所有绑定到该交换机的消费者都会接收到生产者发送的消息。

RabbitMQ 集群

集群搭建

  1. 环境准备: 假设有三台服务器,IP 分别为 192.168.1.10192.168.1.11192.168.1.12,并且都安装了 RabbitMQ。
  2. 配置文件修改: 在每台服务器的 /etc/rabbitmq/rabbitmq.conf 文件中添加以下配置:
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes = ['rabbit@192.168.1.10', 'rabbit@192.168.1.11', 'rabbit@192.168.1.12']
  1. 启动集群: 在第一台服务器上启动 RabbitMQ:
sudo rabbitmq - server start

在第二台和第三台服务器上,先停止 RabbitMQ 服务,然后加入集群:

sudo rabbitmqctl stop_app
sudo rabbitmqctl join_cluster rabbit@192.168.1.10
sudo rabbitmqctl start_app

这样就搭建好了一个 RabbitMQ 集群,集群中的节点可以共享队列和交换机等资源,提高了系统的可靠性和性能。

集群中的数据复制

RabbitMQ 集群支持队列的镜像模式,即可以将队列复制到多个节点上,以提高可用性。

  1. 设置镜像队列: 使用以下命令设置镜像队列:
sudo rabbitmqctl set_policy HA ".*" '{"ha - mode":"all"}'

上述命令将匹配所有队列(.*),并将它们设置为镜像队列,ha - mode":"all" 表示将队列复制到集群中的所有节点。

RabbitMQ 的监控与管理

管理界面

RabbitMQ 提供了一个基于 Web 的管理界面,可以方便地监控和管理 RabbitMQ 服务器。

  1. 启用管理插件: 使用以下命令启用管理插件:
sudo rabbitmq - plugins enable rabbitmq_management
  1. 访问管理界面: 在浏览器中输入 http://服务器IP:15672,默认用户名和密码是 guest。登录后可以查看队列、交换机、连接等信息,还可以进行创建、删除等操作。

命令行管理

  1. 查看队列信息: 使用以下命令查看队列信息:
sudo rabbitmqctl list_queues
  1. 查看交换机信息
sudo rabbitmqctl list_exchanges
  1. 查看连接信息
sudo rabbitmqctl list_connections

通过命令行工具,可以对 RabbitMQ 进行更灵活的管理和监控,满足不同场景下的需求。

通过以上内容,我们对 RabbitMQ 的实战应用有了较为全面的了解,从基础概念到各种应用模式,再到集群搭建和管理,希望能帮助你在后端开发中更好地使用 RabbitMQ 来构建高效、可靠的消息传递系统。