MK
摩柯社区 - 一个极简的技术知识社区
AI 面试
消息队列的顺序消息实现
2021-04-161.8k 阅读

什么是顺序消息

在消息队列的应用场景中,顺序消息是一类特殊的消息,它们的消费顺序与生产顺序保持一致。这在许多业务场景下至关重要,例如电商订单处理流程中,订单创建、支付、发货等环节的消息必须按照特定顺序处理,否则可能导致业务逻辑错误,如未支付就发货的情况。

顺序消息主要分为两种类型:

  • 全局顺序消息:在整个消息队列中,所有消息严格按照发送顺序依次被消费。这种情况对消息队列的性能和架构设计要求较高,因为它需要全局的一致性保证。
  • 分区顺序消息:消息被划分到不同的分区(Partition)中,在每个分区内部,消息按照发送顺序依次消费。这种方式相对全局顺序消息更具灵活性和可扩展性,在实际应用中更为常见。例如,在电商系统中,可以按照订单ID对消息进行分区,同一订单的相关消息会进入同一个分区,从而保证该订单相关操作的顺序性。

顺序消息实现的挑战

实现顺序消息面临着诸多技术挑战,主要体现在以下几个方面:

  • 消息发送端:确保消息能按照期望的顺序发送到消息队列中。这需要考虑网络波动、并发发送等因素。例如,在高并发场景下,多个线程同时发送消息,可能导致消息顺序错乱。
  • 消息队列内部:消息队列需要保证消息的存储和转发过程中顺序不被打乱。例如,在分布式消息队列中,消息可能被存储在多个节点上,如何在节点间同步和传输消息时保持顺序是一个难题。
  • 消息消费端:消费端要按照消息队列中的顺序准确无误地消费消息。但消费端可能存在多个实例并行消费的情况,如何协调这些实例以保证顺序消费是关键。此外,消费过程中还可能出现消息处理失败需要重试的情况,重试时的顺序保证也不容忽视。

基于 RabbitMQ 的顺序消息实现

RabbitMQ 是一款广泛使用的开源消息队列,虽然它本身并不直接支持严格的顺序消息,但通过一些策略可以实现类似的功能。

1. 单队列单消费者模式

最简单的实现顺序消息的方式是使用单队列单消费者模式。在这种模式下,生产者将消息发送到单个队列,消费者从该队列中按顺序取出消息进行处理。

import pika

# 连接RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明队列
channel.queue_declare(queue='order_queue')

# 发送消息
for i in range(10):
    message = f'Order {i} message'
    channel.basic_publish(exchange='', routing_key='order_queue', body=message)
    print(f'Sent: {message}')

# 关闭连接
connection.close()
import pika

# 连接RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明队列
channel.queue_declare(queue='order_queue')

# 定义回调函数处理消息
def callback(ch, method, properties, body):
    print(f'Received: {body}')

# 消费消息
channel.basic_consume(queue='order_queue', on_message_callback=callback, auto_ack=True)

print('Waiting for messages...')
channel.start_consuming()

这种方式虽然简单直接,但存在性能瓶颈,因为单个消费者处理消息的速度有限,无法充分利用多核处理器的优势。

2. 基于 Topic Exchange 和 Routing Key 分区实现

通过 Topic Exchange 和合理设置 Routing Key,可以将相关消息发送到同一个队列分区,然后为每个分区设置一个消费者,从而在分区内实现顺序消费。

import pika

# 连接RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明Topic Exchange
channel.exchange_declare(exchange='order_exchange', exchange_type='topic')

# 发送消息,根据订单ID作为Routing Key
order_ids = ['123', '456', '123']
for order_id in order_ids:
    message = f'Order {order_id} message'
    routing_key = f'order.{order_id}'
    channel.basic_publish(exchange='order_exchange', routing_key=routing_key, body=message)
    print(f'Sent: {message} with routing key {routing_key}')

# 关闭连接
connection.close()
import pika

# 连接RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明Topic Exchange
channel.exchange_declare(exchange='order_exchange', exchange_type='topic')

# 声明队列并绑定到Exchange
queue_name = 'order_123_queue'
channel.queue_declare(queue=queue_name)
routing_key = 'order.123'
channel.queue_bind(exchange='order_exchange', queue=queue_name, routing_key=routing_key)

# 定义回调函数处理消息
def callback(ch, method, properties, body):
    print(f'Received: {body}')

# 消费消息
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

print('Waiting for messages...')
channel.start_consuming()

通过这种方式,可以在一定程度上提高并发处理能力,同时保证同一分区内消息的顺序性。

基于 Kafka 的顺序消息实现

Kafka 是一款高性能的分布式消息队列,对顺序消息有较好的支持。

1. 生产者发送顺序消息

Kafka 生产者通过设置 partitioner 来确保相关消息发送到同一个分区。

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaOrderProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        // 设置自定义分区器
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.example.OrderPartitioner");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        String[] orderIds = {"123", "456", "123"};
        for (String orderId : orderIds) {
            ProducerRecord<String, String> record = new ProducerRecord<>("order_topic", orderId, "Order " + orderId + " message");
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        exception.printStackTrace();
                    } else {
                        System.out.println("Sent message to partition " + metadata.partition() + " at offset " + metadata.offset());
                    }
                }
            });
        }

        producer.close();
    }
}

自定义分区器 OrderPartitioner 代码如下:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;

public class OrderPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        return Utils.toPositive(keyBytes.hashCode()) % numPartitions;
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }
}

2. 消费者消费顺序消息

Kafka 消费者通过设置 isolation.levelread_committed,并确保每个分区只有一个消费者实例,来保证顺序消费。

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaOrderConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "order_group");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("order_topic"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: " + record.value() + " from partition " + record.partition() + " at offset " + record.offset());
            }
        }
    }
}

通过上述方式,Kafka 可以在分区级别实现高效的顺序消息处理。

基于 RocketMQ 的顺序消息实现

RocketMQ 是一款由阿里巴巴开源的分布式消息队列,对顺序消息有原生支持。

1. 生产者发送顺序消息

RocketMQ 生产者通过选择特定的队列来发送消息,以保证顺序。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;

public class RocketMQOrderProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("order_producer_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        String[] orderIds = {"123", "456", "123"};
        for (String orderId : orderIds) {
            Message message = new Message("order_topic", ("Order " + orderId + " message").getBytes("UTF-8"));
            SendResult result = producer.send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    int hash = ((String) arg).hashCode();
                    int index = hash % mqs.size();
                    return mqs.get(index);
                }
            }, orderId);
            System.out.println("Sent message: " + message.getBody() + " to queue " + result.getMessageQueue());
        }

        producer.shutdown();
    }
}

2. 消费者消费顺序消息

RocketMQ 消费者通过顺序消费监听器来保证消息顺序处理。

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class RocketMQOrderConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("order_topic", "*");

        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("Received message: " + new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Consumer started");
    }
}

RocketMQ 的这种原生支持使得顺序消息的实现更加简洁和高效。

顺序消息实现的性能优化

在实现顺序消息时,性能优化是不可忽视的方面。以下是一些常见的性能优化策略:

  • 批量处理:无论是在消息发送端还是消费端,都可以采用批量处理的方式。在发送端,将多条消息批量发送到消息队列,可以减少网络开销。在消费端,批量从消息队列中拉取消息进行处理,能提高处理效率。例如,Kafka 生产者可以通过设置 batch.size 参数来控制批量发送的消息数量。
  • 异步处理:在消费端,可以采用异步处理的方式。将消息从队列中快速取出,然后放入一个异步处理线程池中进行处理,这样可以提高消费速度。但需要注意,异步处理时要保证消息处理的顺序性,通常可以通过使用队列来暂存消息,按照顺序从队列中取出消息进行异步处理。
  • 合理配置资源:根据业务量和系统资源情况,合理配置消息队列的分区数量、消费者实例数量等。例如,如果业务量较大,可以适当增加 Kafka 的分区数量,以提高并行处理能力。但同时也要注意,分区过多可能会增加管理开销。

顺序消息的应用场景

顺序消息在众多业务场景中都有广泛应用:

  • 电商订单处理:如前文所述,订单的创建、支付、发货等环节的消息需要顺序处理,以保证业务流程的正确性。
  • 金融交易:在金融系统中,交易的下单、成交、清算等消息必须按照顺序处理,否则可能导致资金错误等严重问题。
  • 日志处理:对于一些需要按照时间顺序处理的日志消息,如系统运行日志、用户操作日志等,顺序消息可以保证日志处理的准确性和完整性。

通过以上详细介绍,我们对消息队列的顺序消息实现有了全面深入的了解,包括不同消息队列产品的实现方式、面临的挑战以及性能优化和应用场景等方面。在实际应用中,需要根据具体业务需求和系统架构选择合适的消息队列及实现方案。