RabbitMQ实战入门
什么是 RabbitMQ
RabbitMQ 是一个开源的消息代理和队列服务器,它实现了高级消息队列协议(AMQP)。消息队列在后端开发中扮演着至关重要的角色,常用于解耦应用程序的不同部分,处理异步任务,以及实现可靠的消息传递。RabbitMQ 因其可靠性、灵活性和广泛的语言支持而被广泛采用。
核心概念
- 生产者(Producer):发送消息的应用程序。生产者创建消息并将其发送到 RabbitMQ 服务器。
- 消费者(Consumer):接收消息的应用程序。消费者订阅队列,并从队列中获取消息进行处理。
- 队列(Queue):存储消息的地方。队列是 RabbitMQ 服务器上的一种数据结构,它可以存储多个消息,直到消费者将它们取出。
- 交换机(Exchange):接收生产者发送的消息,并根据路由规则将消息发送到一个或多个队列。交换机根据不同的类型(如 direct、topic、fanout 等)来决定如何路由消息。
- 绑定(Binding):定义了交换机和队列之间的关系,即交换机如何将消息路由到队列。绑定可以包含一个可选的路由键(routing key),用于更精确的路由。
安装 RabbitMQ
在 Linux 上安装
- 添加 RabbitMQ 官方仓库: 对于 Debian 或 Ubuntu 系统,可以使用以下命令添加 RabbitMQ 官方仓库:
wget -O - https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey | sudo apt-key add -
sudo apt - get install apt - transport - https
echo "deb https://packagecloud.io/rabbitmq/rabbitmq - server/ubuntu/ $(lsb_release -sc) main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list
- 安装 RabbitMQ: 添加仓库后,使用以下命令安装 RabbitMQ 服务器:
sudo apt - get update
sudo apt - get install rabbitmq - server
- 启动 RabbitMQ: 安装完成后,可以使用以下命令启动 RabbitMQ 服务:
sudo systemctl start rabbitmq - server
- 验证安装: 使用以下命令检查 RabbitMQ 服务状态:
sudo systemctl status rabbitmq - server
在 Windows 上安装
- 下载安装包: 从 RabbitMQ 官方网站(https://www.rabbitmq.com/download.html)下载 Windows 安装包。
- 安装 Erlang: RabbitMQ 依赖于 Erlang,因此需要先安装 Erlang。从 Erlang 官方网站(https://www.erlang.org/downloads)下载并安装 Erlang。安装过程中注意设置环境变量。
- 安装 RabbitMQ: 运行 RabbitMQ 安装包,按照提示完成安装。
- 启动 RabbitMQ: 安装完成后,可以在 Windows 服务中找到 RabbitMQ 服务并启动它。
简单消息队列示例
使用 Python 和 pika 库
- 安装 pika: 使用 pip 安装 pika 库:
pip install pika
- 生产者代码:
import pika
# 建立到 RabbitMQ 服务器的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明队列
channel.queue_declare(queue='hello')
# 发送消息
message = 'Hello, RabbitMQ!'
channel.basic_publish(exchange='', routing_key='hello', body=message)
print(" [x] Sent 'Hello, RabbitMQ!'")
# 关闭连接
connection.close()
- 消费者代码:
import pika
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
# 建立到 RabbitMQ 服务器的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明队列
channel.queue_declare(queue='hello')
# 消费消息
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
在上述代码中,生产者通过 pika.BlockingConnection
建立到本地 RabbitMQ 服务器的连接,声明一个名为 hello
的队列,并向该队列发送一条消息。消费者同样建立连接,声明相同的队列,并设置一个回调函数 callback
来处理接收到的消息。basic_consume
方法用于指定队列和回调函数,auto_ack=True
表示消息被接收后自动确认,无需手动确认。
交换机类型
Fanout 交换机
Fanout 交换机将接收到的消息广播到所有与之绑定的队列,而不考虑路由键。
- 生产者代码:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明 fanout 类型的交换机
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = 'This is a fanout message'
# 发送消息到交换机,不指定路由键
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent 'This is a fanout message'")
connection.close()
- 消费者代码:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明 fanout 类型的交换机
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# 创建一个随机命名的队列
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# 将队列绑定到交换机
channel.queue_bind(exchange='logs', queue=queue_name)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
# 消费消息
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
在这个示例中,生产者声明一个 fanout
类型的交换机 logs
,并向其发送消息,不指定路由键。消费者声明相同的交换机,并创建一个随机命名的队列,然后将该队列绑定到交换机。这样,无论有多少个消费者,它们都会接收到生产者发送到 logs
交换机的所有消息。
Direct 交换机
Direct 交换机根据消息的路由键将消息发送到特定的队列。只有队列绑定的路由键与消息的路由键完全匹配时,消息才会被发送到该队列。
- 生产者代码:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明 direct 类型的交换机
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
severity = 'error'
message = 'This is an error message'
# 发送消息到交换机,指定路由键
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent '%s:%s'" % (severity, message))
connection.close()
- 消费者代码:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明 direct 类型的交换机
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
# 创建一个队列
result = channel.queue_declare(queue='')
queue_name = result.method.queue
severities = ['error']
for severity in severities:
# 将队列绑定到交换机,并指定路由键
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
# 消费消息
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
这里生产者声明一个 direct
类型的交换机 direct_logs
,并根据消息的严重程度(如 error
)作为路由键发送消息。消费者声明相同的交换机,创建一个队列,并将队列绑定到交换机,同时指定要接收的路由键(如 error
)。只有当消息的路由键与消费者绑定的路由键匹配时,消费者才会接收到消息。
Topic 交换机
Topic 交换机允许更灵活的路由,它根据路由键的模式匹配将消息发送到队列。路由键和绑定键都是由点分隔的单词组成,支持通配符。*
匹配一个单词,#
匹配零个或多个单词。
- 生产者代码:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明 topic 类型的交换机
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
routing_key = 'kern.critical'
message = 'A critical kernel error'
# 发送消息到交换机,指定路由键
channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message)
print(" [x] Sent '%s:%s'" % (routing_key, message))
connection.close()
- 消费者代码:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明 topic 类型的交换机
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
# 创建一个队列
result = channel.queue_declare(queue='')
queue_name = result.method.queue
binding_keys = ['*.critical']
for binding_key in binding_keys:
# 将队列绑定到交换机,并指定绑定键
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
# 消费消息
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
在这个例子中,生产者声明一个 topic
类型的交换机 topic_logs
,并使用一个包含模式的路由键(如 kern.critical
)发送消息。消费者声明相同的交换机,创建队列,并使用通配符模式(如 *.critical
)将队列绑定到交换机。这样,只要消息的路由键与消费者的绑定键模式匹配,消费者就会接收到消息。
消息确认机制
手动确认
默认情况下,RabbitMQ 在消息被发送到消费者后就认为消息已经被处理。但有时候,我们需要确保消息被消费者真正处理后才确认。这可以通过手动确认来实现。
- 消费者代码:
import pika
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
# 手动确认消息
ch.basic_ack(delivery_tag=method.delivery_tag)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
# 设置为手动确认,auto_ack=False
channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=False)
channel.start_consuming()
在上述代码中,basic_consume
方法的 auto_ack
参数设置为 False
,表示关闭自动确认。在回调函数 callback
中,使用 ch.basic_ack(delivery_tag=method.delivery_tag)
手动确认消息。这样,只有当消费者调用 basic_ack
方法后,RabbitMQ 才会认为消息已被成功处理,并从队列中删除。
持久化
为了确保 RabbitMQ 在重启后消息不会丢失,我们可以将队列和消息设置为持久化。
- 生产者代码:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明一个持久化队列
channel.queue_declare(queue='task_queue', durable=True)
message = 'This is a durable message'
# 发送一个持久化消息
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # 使消息持久化
))
print(" [x] Sent 'This is a durable message'")
connection.close()
- 消费者代码:
import pika
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明一个持久化队列
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=False)
channel.start_consuming()
在生产者代码中,queue_declare
方法的 durable=True
参数使队列持久化,basic_publish
方法中的 properties=pika.BasicProperties(delivery_mode=2)
使消息持久化。消费者同样声明持久化队列,这样即使 RabbitMQ 服务器重启,持久化的队列和消息也不会丢失。
工作队列模式
工作队列模式用于在多个消费者之间分配任务。多个消费者可以同时从一个队列中获取消息并处理,从而提高任务处理的效率。
- 生产者代码:
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明队列
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
# 发送消息
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # 使消息持久化
))
print(" [x] Sent %r" % message)
connection.close()
- 消费者代码:
import pika
import time
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明队列
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
# 设置每个消费者在同一时间最多处理一条消息
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=False)
channel.start_consuming()
在这个模式中,生产者向 task_queue
队列发送任务消息。消费者从队列中获取消息并处理,basic_qos(prefetch_count=1)
方法设置每个消费者在同一时间最多处理一条消息,这样可以确保任务在多个消费者之间更均衡地分配。当消费者处理完一条消息后,通过 basic_ack
方法手动确认消息。
发布/订阅模式
发布/订阅模式使用 fanout
交换机,允许生产者将消息发送到多个消费者。
- 生产者代码:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明 fanout 交换机
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = 'This is a pub - sub message'
# 发送消息到交换机
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent 'This is a pub - sub message'")
connection.close()
- 消费者代码:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明 fanout 交换机
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# 创建一个随机命名的队列
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# 将队列绑定到交换机
channel.queue_bind(exchange='logs', queue=queue_name)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
# 消费消息
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
在发布/订阅模式中,生产者将消息发送到 fanout
交换机 logs
,不指定路由键。消费者声明相同的交换机,并创建一个随机命名的队列,然后将队列绑定到交换机。这样,所有绑定到该交换机的消费者都会接收到生产者发送的消息。
RabbitMQ 集群
集群搭建
- 环境准备:
假设有三台服务器,IP 分别为
192.168.1.10
、192.168.1.11
和192.168.1.12
,并且都安装了 RabbitMQ。 - 配置文件修改:
在每台服务器的
/etc/rabbitmq/rabbitmq.conf
文件中添加以下配置:
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes = ['rabbit@192.168.1.10', 'rabbit@192.168.1.11', 'rabbit@192.168.1.12']
- 启动集群: 在第一台服务器上启动 RabbitMQ:
sudo rabbitmq - server start
在第二台和第三台服务器上,先停止 RabbitMQ 服务,然后加入集群:
sudo rabbitmqctl stop_app
sudo rabbitmqctl join_cluster rabbit@192.168.1.10
sudo rabbitmqctl start_app
这样就搭建好了一个 RabbitMQ 集群,集群中的节点可以共享队列和交换机等资源,提高了系统的可靠性和性能。
集群中的数据复制
RabbitMQ 集群支持队列的镜像模式,即可以将队列复制到多个节点上,以提高可用性。
- 设置镜像队列: 使用以下命令设置镜像队列:
sudo rabbitmqctl set_policy HA ".*" '{"ha - mode":"all"}'
上述命令将匹配所有队列(.*
),并将它们设置为镜像队列,ha - mode":"all"
表示将队列复制到集群中的所有节点。
RabbitMQ 的监控与管理
管理界面
RabbitMQ 提供了一个基于 Web 的管理界面,可以方便地监控和管理 RabbitMQ 服务器。
- 启用管理插件: 使用以下命令启用管理插件:
sudo rabbitmq - plugins enable rabbitmq_management
- 访问管理界面:
在浏览器中输入
http://服务器IP:15672
,默认用户名和密码是guest
。登录后可以查看队列、交换机、连接等信息,还可以进行创建、删除等操作。
命令行管理
- 查看队列信息: 使用以下命令查看队列信息:
sudo rabbitmqctl list_queues
- 查看交换机信息:
sudo rabbitmqctl list_exchanges
- 查看连接信息:
sudo rabbitmqctl list_connections
通过命令行工具,可以对 RabbitMQ 进行更灵活的管理和监控,满足不同场景下的需求。
通过以上内容,我们对 RabbitMQ 的实战应用有了较为全面的了解,从基础概念到各种应用模式,再到集群搭建和管理,希望能帮助你在后端开发中更好地使用 RabbitMQ 来构建高效、可靠的消息传递系统。