MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

消息队列在支付系统中的设计

2021-08-122.3k 阅读

消息队列在支付系统中的设计基础

支付系统概述

支付系统作为现代金融体系的核心组成部分,负责处理各类交易的资金流转。它需要具备高可用性、高性能、强一致性以及安全性等诸多特性。在一个完整的支付流程中,通常涉及到多个参与方,如用户、商家、支付机构、银行等。从用户发起支付请求开始,到资金成功结算到商家账户,期间要经历支付请求验证、风险评估、资金扣减、资金清算等一系列复杂的操作。

支付系统面临的挑战也是多方面的。高并发场景下,大量的支付请求同时涌入,系统需要能够快速响应并处理,避免出现卡顿或超时。数据一致性方面,资金的流转必须准确无误,任何差错都可能导致用户或商家的经济损失。此外,安全性至关重要,要防止支付信息泄露、恶意攻击等情况发生。

消息队列简介

消息队列是一种异步通信机制,它基于生产者 - 消费者模型。生产者将消息发送到队列中,而消费者从队列中获取消息进行处理。消息队列具有以下几个关键特性:

  1. 解耦:生产者和消费者不需要直接相互依赖。例如,在支付系统中,支付请求的生成(生产者)和支付处理逻辑(消费者)可以通过消息队列解耦,使得它们的开发、维护和扩展更加独立。
  2. 异步处理:生产者发送消息后无需等待消费者处理完成,继续执行后续操作。这在支付系统中尤为重要,比如在处理支付请求时,系统可以快速返回支付受理成功的响应给用户,而将复杂的支付处理任务异步交给消息队列的消费者处理。
  3. 削峰填谷:在高并发场景下,消息队列可以缓存大量的消息,避免瞬间高流量对系统造成冲击。例如,在促销活动期间,大量的支付请求涌入,消息队列可以暂存这些请求,然后消费者按照系统能够承受的速率逐步处理。

常见的消息队列中间件有 RabbitMQ、Kafka、RocketMQ 等。RabbitMQ 以其可靠性和灵活性著称,支持多种消息协议;Kafka 擅长处理高吞吐量的日志数据,具有良好的扩展性;RocketMQ 在分布式事务消息处理方面表现出色,适合大型分布式系统。

消息队列在支付系统中的应用场景

支付异步处理

  1. 支付请求接收与响应:当用户发起支付请求时,支付系统的前端首先接收该请求,并将其发送到消息队列。系统立即返回一个支付受理成功的响应给用户,告知用户支付请求已收到,正在处理中。这样可以提升用户体验,避免用户长时间等待。例如,在电商平台的支付场景中,用户在点击支付按钮后,很快就能看到支付受理成功的提示,而不必等待支付处理的全过程完成。
  2. 支付处理逻辑:消息队列的消费者从队列中取出支付请求消息,开始执行具体的支付处理逻辑。这包括验证支付信息的合法性,如银行卡号、密码、支付金额等是否正确;进行风险评估,判断该支付是否存在欺诈风险;然后进行资金扣减操作,从用户账户中扣除相应的金额。在这一系列操作中,每个步骤都可能涉及到与外部系统的交互,如与银行系统进行账户余额查询和资金划转等。例如,假设使用 Python 的 Celery 结合 RabbitMQ 实现支付异步处理。首先安装必要的库:
pip install celery pika

然后定义 Celery 任务:

from celery import Celery

app = Celery('payment_tasks', broker='amqp://guest:guest@localhost:5672//')

@app.task
def process_payment(payment_info):
    # 验证支付信息
    if not validate_payment_info(payment_info):
        return 'Payment information is invalid'
    # 风险评估
    if is_high_risk(payment_info):
        return 'Payment is high risk'
    # 资金扣减
    if not deduct_funds(payment_info):
        return 'Failed to deduct funds'
    return 'Payment processed successfully'

在支付请求发送端:

from payment_tasks import process_payment

payment_info = {
    'card_number': '1234567890123456',
    'amount': 100.0,
    # 其他支付信息
}
result = process_payment.delay(payment_info)
print(result.get())
  1. 支付结果通知:支付处理完成后,需要将支付结果通知给相关方。消费者可以将支付结果消息再次发送到另一个消息队列,该队列的消费者负责将结果通知给用户和商家。例如,通过短信、邮件或者应用内推送等方式告知用户支付成功或失败的信息,同时通知商家订单的支付状态,以便商家进行相应的后续操作,如发货等。

支付系统与其他系统的解耦

  1. 与订单系统的解耦:在电商场景中,支付系统与订单系统紧密相关。当用户完成支付后,需要更新订单状态为已支付。如果没有消息队列,支付系统需要直接调用订单系统的接口来更新订单状态。但这样会导致两个系统之间耦合度较高,一方的变动可能影响到另一方。通过引入消息队列,支付系统在支付成功后将订单更新消息发送到队列中,订单系统作为消费者从队列中获取消息并更新订单状态。这样,即使订单系统出现故障或者进行升级维护,支付系统仍然可以正常处理支付业务,而不会因为依赖订单系统的接口而受到影响。例如,在 Java 项目中使用 RabbitMQ 实现支付系统与订单系统的解耦。首先引入 RabbitMQ 的依赖:
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.12.0</version>
</dependency>

支付系统发送消息:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class PaymentSystem {
    private static final String QUEUE_NAME = "order_update_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Order with id 12345 is paid";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF - 8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

订单系统接收消息:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class OrderSystem {
    private static final String QUEUE_NAME = "order_update_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF - 8");
            System.out.println(" [x] Received '" + message + "'");
            // 更新订单状态逻辑
        };
        channel.basicConsume(QUEUE_NAME, true, "", deliverCallback, consumerTag -> { });
    }
}
  1. 与清算系统的解耦:支付系统在完成一笔支付后,需要将支付数据发送给清算系统进行资金清算。清算系统负责计算各方应得的资金,并进行最终的资金结算。通过消息队列,支付系统将支付成功的交易数据发送到队列中,清算系统从队列中获取数据进行清算处理。这样可以使两个系统独立运行,各自根据自身的节奏进行处理。例如,在大型支付平台中,每天可能有海量的支付交易,清算系统可能无法实时处理所有交易。消息队列可以暂存这些交易数据,待清算系统有能力处理时再逐步消费。

削峰填谷应对高并发支付请求

  1. 高并发场景分析:在一些特殊的时间节点,如电商的促销活动(如双十一、618 等),支付请求会瞬间爆发,达到极高的并发量。以双十一为例,仅在开场几分钟内,就可能产生数百万甚至数千万的支付请求。如果支付系统直接处理这些请求,可能会因为系统资源耗尽而崩溃,如 CPU 使用率过高、内存溢出等问题。
  2. 消息队列的削峰作用:消息队列可以作为一个缓冲区,在高并发时接收并暂存大量的支付请求消息。支付系统的消费者从队列中按照系统能够承受的速率取出消息进行处理。例如,假设支付系统每秒最多能处理 1000 笔支付请求,而在促销活动开场时,每秒可能有 10000 笔支付请求涌入。消息队列可以在短时间内缓存这 9000 笔请求,然后消费者以每秒 1000 笔的速度逐步处理,避免系统因瞬间高流量而崩溃。
  3. 填谷效果:当高并发过后,系统的请求量会逐渐减少。此时,消息队列中暂存的消息可以继续被消费者处理,保证系统资源的充分利用,不会出现资源闲置的情况。例如,在促销活动结束后的一段时间内,虽然支付请求量大幅下降,但消息队列中的消息可以持续被处理,使支付系统的处理能力得到持续发挥。

消息队列在支付系统中的设计要点

消息可靠性保证

  1. 消息持久化:在支付系统中,消息丢失可能会导致严重的后果,如支付请求未被处理,资金流转出现差错等。消息队列需要支持消息持久化,将消息保存到磁盘上,即使消息队列服务器重启,消息也不会丢失。例如,在 RabbitMQ 中,可以通过设置消息的 deliveryMode2 来实现消息持久化。
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF - 8"));
  1. 确认机制:生产者需要确认消息是否成功发送到消息队列,消费者需要确认消息是否被成功处理。在 RabbitMQ 中,生产者可以通过 confirm 机制来确认消息的发送。
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("Message sent successfully");
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("Message sent failed");
    }
});

消费者可以采用手动确认模式,在处理完消息后向消息队列发送确认消息。

channel.basicConsume(QUEUE_NAME, false, "", deliverCallback, consumerTag -> { });
// 处理消息逻辑
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  1. 重试机制:如果消息发送失败或者处理失败,需要有重试机制。对于发送失败的消息,生产者可以按照一定的策略进行重试,如指数退避策略,即每次重试的间隔时间逐渐增大。对于处理失败的消息,消费者可以将消息重新放回队列,等待下一次处理,或者发送到一个专门的死信队列(Dead Letter Queue),由人工进行处理。例如,在 Python 的 Celery 中,可以通过设置 retry 参数来实现任务重试。
@app.task(bind=True, default_retry_delay=300, max_retries=5)
def process_payment(self, payment_info):
    try:
        # 支付处理逻辑
        pass
    except SomeException as exc:
        self.retry(exc=exc)

消息顺序性保证

  1. 支付业务对消息顺序的要求:在某些支付场景中,消息的顺序非常重要。例如,在处理用户的一系列支付操作时,先进行预授权操作,然后进行实际支付操作,如果这两个操作的消息顺序颠倒,可能会导致支付失败或者资金异常。再比如,在处理退款流程时,退款消息需要按照正确的顺序处理,否则可能会出现重复退款或者退款金额错误等问题。
  2. 实现消息顺序性的方法:一种常见的方法是使用分区队列。将相关的消息发送到同一个分区(队列)中,消费者按照顺序从该分区中获取消息进行处理。例如,在 Kafka 中,可以通过设置消息的 partition 来实现。假设根据用户 ID 来分区,代码如下:
ProducerRecord<String, String> record = new ProducerRecord<>("payment_topic", userId, paymentMessage);
producer.send(record);

消费者从对应的分区中消费消息,保证了同一用户的支付相关消息的顺序性。另外,在 RabbitMQ 中,可以通过使用单队列,并确保生产者和消费者都以单线程的方式操作该队列来保证消息顺序性。但这种方式在高并发场景下性能较低,因此可以结合一些分布式锁机制来优化,确保在同一时间只有一个线程从队列中获取消息进行处理。

消息队列的性能优化

  1. 队列设计优化:合理设计队列的数量和类型。对于高吞吐量的支付请求队列,可以采用无持久化的队列,以提高消息的处理速度,因为持久化操作会涉及磁盘 I/O,相对较慢。但对于一些关键的消息,如涉及资金变动的消息,必须采用持久化队列。同时,根据业务场景划分不同的队列,如支付请求队列、支付结果通知队列等,避免不同类型的消息相互干扰。
  2. 消费者性能优化:增加消费者的数量可以提高消息的处理速度,但需要注意系统资源的限制。可以采用多线程或者分布式的方式部署消费者。例如,在 Python 的 Celery 中,可以通过设置 concurrency 参数来指定消费者的并发数。
celery -A payment_tasks worker -l info -c 10

另外,优化消费者的处理逻辑,减少不必要的计算和 I/O 操作,提高单个消费者的处理效率。例如,在处理支付请求时,可以批量查询用户账户信息,而不是每次处理一个请求都进行一次查询。 3. 消息压缩:对于一些数据量较大的消息,如包含详细支付信息的消息,可以进行压缩后再发送到消息队列。这样可以减少网络传输开销,提高消息的发送和接收速度。例如,在 Java 中可以使用 Gzip 进行消息压缩。

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.zip.GZIPOutputStream;

public class MessageCompressor {
    public static byte[] compress(String message) throws IOException {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        GZIPOutputStream gzip = new GZIPOutputStream(bos);
        gzip.write(message.getBytes());
        gzip.close();
        return bos.toByteArray();
    }
}

在发送端进行压缩:

byte[] compressedMessage = MessageCompressor.compress(paymentMessage);
channel.basicPublish("", QUEUE_NAME, null, compressedMessage);

在接收端进行解压缩:

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.zip.GZIPInputStream;

public class MessageDecompressor {
    public static String decompress(byte[] compressedMessage) throws IOException {
        ByteArrayInputStream bis = new ByteArrayInputStream(compressedMessage);
        GZIPInputStream gis = new GZIPInputStream(bis);
        byte[] buffer = new byte[1024];
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        int len;
        while ((len = gis.read(buffer)) != -1) {
            bos.write(buffer, 0, len);
        }
        gis.close();
        return new String(bos.toByteArray());
    }
}
String decompressedMessage = MessageDecompressor.decompress(delivery.getBody());

消息队列在支付系统中的事务处理

支付系统中的事务需求

  1. 资金一致性事务:在支付过程中,涉及到用户账户资金的扣减和商家账户资金的增加,这两个操作必须要么全部成功,要么全部失败,以保证资金的一致性。例如,当用户支付 100 元给商家时,用户账户余额减少 100 元的同时,商家账户余额必须增加 100 元。如果只完成了用户账户资金扣减而商家账户资金未增加,或者反之,都会导致资金不一致,给用户或商家带来损失。
  2. 业务流程事务:除了资金事务,支付系统还涉及到一系列业务流程的事务处理。比如,在支付成功后,不仅要更新用户和商家的账户余额,还要更新订单状态、记录支付日志等操作。这些操作也需要保证原子性,即要么全部完成,要么全部回滚,以确保业务流程的完整性。

基于消息队列的事务实现方式

  1. 本地事务 + 消息队列:一种常见的实现方式是先在本地数据库中执行支付相关的操作,如扣减用户账户余额、更新订单状态等,然后将支付成功的消息发送到消息队列。如果本地事务执行成功,再发送消息;如果本地事务执行失败,则不发送消息。例如,在 Java 中使用 Spring Boot 和 RabbitMQ 实现。首先配置好 Spring Boot 与 RabbitMQ 的集成。
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring - boot - starter - amqp</artifactId>
</dependency>

在支付服务中:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class PaymentService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private PaymentRepository paymentRepository;

    @Transactional
    public void processPayment(Payment payment) {
        // 执行本地数据库操作,如扣减用户账户余额
        paymentRepository.save(payment);
        // 发送支付成功消息到队列
        rabbitTemplate.convertAndSend("payment_success_queue", payment);
    }
}
  1. 分布式事务解决方案:在分布式支付系统中,可能涉及多个服务之间的事务处理,如支付服务、账户服务、订单服务等。可以采用两阶段提交(2PC)、三阶段提交(3PC)或者 TCC(Try - Confirm - Cancel)等分布式事务解决方案结合消息队列来实现事务一致性。以 TCC 为例,支付服务先执行 Try 操作,即预扣用户账户资金,然后将消息发送到消息队列通知其他服务进行准备操作。如果所有服务的准备操作都成功,再执行 Confirm 操作,完成最终的资金划转和业务状态更新;如果有任何一个服务的准备操作失败,则执行 Cancel 操作,回滚之前的预扣资金等操作。例如,在一个微服务架构的支付系统中,使用 Spring Cloud Alibaba 的 Seata 框架实现 TCC 模式的分布式事务与消息队列结合。首先引入 Seata 的依赖:
<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata - spring - boot - starter</artifactId>
    <version>1.4.2</version>
</dependency>

定义支付服务的 Try、Confirm 和 Cancel 方法:

import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class PaymentService {
    @Autowired
    private AccountService accountService;

    @Autowired
    private OrderService orderService;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GlobalTransactional
    public void processPayment(Payment payment) {
        // Try 操作
        boolean tryResult = accountService.tryDeduct(payment.getUserId(), payment.getAmount());
        if (!tryResult) {
            throw new RuntimeException("Try operation failed");
        }
        // 发送消息到队列
        rabbitTemplate.convertAndSend("payment_prepare_queue", payment);
        // Confirm 操作
        accountService.confirmDeduct(payment.getUserId(), payment.getAmount());
        orderService.updateOrderStatus(payment.getOrderId(), "PAID");
    }

    public void cancelPayment(Payment payment) {
        // Cancel 操作
        accountService.cancelDeduct(payment.getUserId(), payment.getAmount());
    }
}

其他服务监听消息队列进行相应的准备操作:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class PaymentPrepareListener {
    @Autowired
    private OtherService otherService;

    @RabbitListener(queues = "payment_prepare_queue")
    public void handlePaymentPrepare(Payment payment) {
        otherService.prepare(payment);
    }
}

消息队列与支付系统的监控与运维

消息队列监控指标

  1. 消息堆积量:消息堆积量是指消息队列中未被处理的消息数量。在支付系统中,如果消息堆积量持续增加,可能意味着消费者处理速度过慢,或者生产者发送消息的速度过快,超过了系统的处理能力。需要密切关注消息堆积量,当堆积量达到一定阈值时,及时采取措施,如增加消费者数量、优化消费者处理逻辑等。例如,在 RabbitMQ 中,可以通过管理界面或者 API 获取队列的消息堆积量。
curl -u guest:guest http://localhost:15672/api/queues/%2F/payment_request_queue | grep messages
  1. 消息处理延迟:消息处理延迟是指从消息发送到消息被处理完成所经历的时间。在支付系统中,消息处理延迟直接影响用户体验和业务流程的及时性。可以通过在消息中添加时间戳,在消费者处理消息时计算时间差来获取消息处理延迟。对于处理延迟较长的消息,需要分析原因,可能是网络问题、系统资源瓶颈或者业务逻辑复杂等。例如,在 Kafka 中,可以在生产者发送消息时添加时间戳:
ProducerRecord<String, String> record = new ProducerRecord<>("payment_topic", System.currentTimeMillis() + ":" + paymentMessage);
producer.send(record);

在消费者端计算延迟:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
    String[] parts = record.value().split(":", 2);
    long sendTime = Long.parseLong(parts[0]);
    long receiveTime = System.currentTimeMillis();
    long delay = receiveTime - sendTime;
    System.out.println("Message delay: " + delay + " ms");
}
  1. 消息发送和接收成功率:消息发送成功率是指成功发送到消息队列的消息数量与总发送消息数量的比例,消息接收成功率是指成功从消息队列接收并处理的消息数量与总接收消息数量的比例。如果发送成功率较低,可能是网络故障、消息队列服务器问题等;如果接收成功率较低,可能是消费者处理消息时出现错误、消息格式不正确等。通过统计这些成功率指标,可以及时发现系统中的问题并进行修复。例如,在 RabbitMQ 中,可以通过生产者的 confirm 机制和消费者的确认机制来统计发送和接收成功率。

消息队列的运维策略

  1. 定期备份:对消息队列的数据进行定期备份,以防止数据丢失。特别是对于持久化的消息队列,备份尤为重要。可以使用消息队列提供的备份工具或者结合操作系统的备份机制进行备份。例如,在 RabbitMQ 中,可以使用 rabbitmqctl 命令进行数据备份:
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
rabbitmqctl cluster_status
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl stop_app
cp -r /var/lib/rabbitmq/mnesia/rabbit@node1 /backup/rabbitmq_backup
rabbitmqctl start_app
  1. 升级与维护:定期对消息队列中间件进行升级,以获取新的功能、性能优化和安全修复。在升级前,需要进行充分的测试,确保升级不会影响支付系统的正常运行。同时,对消息队列服务器进行日常维护,如检查系统资源使用情况、清理过期的消息等。例如,在升级 Kafka 时,先在测试环境中进行升级测试,验证消息的收发、处理等功能是否正常,然后再逐步在生产环境中进行升级。
  2. 故障处理:当消息队列出现故障时,如服务器宕机、网络中断等,需要有快速的故障恢复机制。对于主从架构的消息队列,可以切换到从服务器继续提供服务。同时,对故障原因进行深入分析,采取相应的措施防止故障再次发生。例如,在 RabbitMQ 集群中,如果主节点宕机,从节点会自动接管成为主节点,保证消息队列的可用性。但需要分析主节点宕机的原因,如硬件故障、软件 bug 等,进行修复和预防。

在设计和实现基于消息队列的支付系统时,需要充分考虑以上各个方面,从应用场景、设计要点、事务处理到监控运维,确保支付系统的高效、稳定、安全运行。通过合理运用消息队列技术,可以提升支付系统的性能、扩展性和可靠性,满足日益增长的支付业务需求。