MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Redis固定窗口限流与业务需求的适配技巧

2022-01-302.6k 阅读

Redis固定窗口限流概述

在现代软件开发中,尤其是在高并发的场景下,限流是一项至关重要的技术手段。它用于控制对资源的访问频率,防止系统因过载而崩溃。Redis作为一款高性能的键值数据库,提供了丰富的数据结构和命令,使其成为实现限流的理想选择。固定窗口限流是限流策略中的一种基础方式,它将时间划分为固定大小的窗口,在每个窗口内对请求进行计数。当请求数达到设定的阈值时,后续请求将被限流。

例如,我们设定一个1分钟的固定窗口,允许在这1分钟内最多有100个请求。在窗口开始时,计数器清零,每收到一个请求,计数器加1。如果在1分钟内计数器达到100,之后的请求就会被判定为超出限流,需要进行相应处理,比如返回错误信息或进行排队等待。

Redis实现固定窗口限流的原理

Redis实现固定窗口限流主要依赖其原子操作和数据存储功能。我们通常使用Redis的字符串类型来存储请求计数,通过INCR命令实现原子性的计数增加。INCR命令会将指定键的值原子性地增加1,如果键不存在,则会先创建该键并将其值设为1。

同时,利用Redis的键过期机制来控制窗口的时间范围。当窗口开始时,设置一个带有过期时间的键,过期时间即为窗口的长度。当键过期时,相当于窗口结束,此时可以重新开始新窗口的计数。

代码示例(Python + Redis)

下面以Python结合Redis为例,展示如何实现固定窗口限流。首先确保安装了redis - py库,可以通过pip install redis进行安装。

import redis


class FixedWindowLimiter:
    def __init__(self, redis_client, key, limit, window_seconds):
        self.redis_client = redis_client
        self.key = key
        self.limit = limit
        self.window_seconds = window_seconds

    def is_allowed(self):
        current_count = self.redis_client.incr(self.key)
        if current_count == 1:
            self.redis_client.expire(self.key, self.window_seconds)
        return current_count <= self.limit


# 使用示例
redis_client = redis.StrictRedis(host='localhost', port=6379, db = 0)
limiter = FixedWindowLimiter(redis_client, 'example_limit_key', 10, 60)
if limiter.is_allowed():
    print("请求被允许")
else:
    print("请求超出限流")

在上述代码中,FixedWindowLimiter类封装了固定窗口限流的逻辑。__init__方法接收Redis客户端实例、限流键、限流阈值和窗口时间。is_allowed方法通过incr命令增加计数,并判断当前计数是否超过阈值。如果是第一个请求,设置键的过期时间。

与业务需求适配的考虑因素

  1. 阈值设定:阈值的设定需要根据业务系统的处理能力来确定。如果阈值过高,可能无法有效限流,导致系统过载;阈值过低,则可能影响正常业务流量。例如,对于一个API接口,需要通过性能测试确定其在高并发下稳定处理的请求数量,以此作为阈值的参考。假设一个API在压测时,每秒处理1000个请求时开始出现响应延迟增大的情况,那么可以将阈值设定在800左右,以预留一定的缓冲空间。
  2. 窗口大小:窗口大小的选择与业务场景紧密相关。对于一些对实时性要求较高的业务,如秒杀活动,窗口可以设置得较小,比如1秒或10秒,以更精确地控制短时间内的请求频率。而对于一些相对低频但持续时间较长的业务,如用户每日的文件上传功能,窗口可以设置为1天。

复杂业务场景下的适配技巧

  1. 动态调整阈值:在某些业务场景中,系统的处理能力可能会随着时间或其他因素发生变化。例如,在电商大促期间,服务器资源会增加,系统的处理能力也会提高。此时可以根据监控数据动态调整限流阈值。可以通过定时任务或者事件驱动的方式,根据系统的CPU使用率、内存使用率、响应时间等指标来调整阈值。
import redis
import time


class DynamicFixedWindowLimiter:
    def __init__(self, redis_client, key, base_limit, window_seconds):
        self.redis_client = redis_client
        self.key = key
        self.base_limit = base_limit
        self.window_seconds = window_seconds

    def get_current_limit(self):
        # 模拟根据CPU使用率调整阈值
        cpu_usage = get_cpu_usage()
        if cpu_usage < 50:
            return self.base_limit * 1.5
        elif cpu_usage < 80:
            return self.base_limit
        else:
            return self.base_limit * 0.8

    def is_allowed(self):
        current_limit = self.get_current_limit()
        current_count = self.redis_client.incr(self.key)
        if current_count == 1:
            self.redis_client.expire(self.key, self.window_seconds)
        return current_count <= current_limit


def get_cpu_usage():
    # 这里需要实现获取CPU使用率的逻辑
    # 例如使用psutil库在Linux系统下获取
    # 简单模拟返回一个随机数
    import random
    return random.randint(0, 100)


# 使用示例
redis_client = redis.StrictRedis(host='localhost', port=6379, db = 0)
limiter = DynamicFixedWindowLimiter(redis_client, 'dynamic_example_limit_key', 10, 60)
if limiter.is_allowed():
    print("请求被允许")
else:
    print("请求超出限流")

在上述代码中,DynamicFixedWindowLimiter类增加了动态获取阈值的功能。get_current_limit方法根据模拟的CPU使用率来调整阈值,is_allowed方法在判断请求是否允许时使用动态获取的阈值。

  1. 多维度限流:在一些复杂业务中,可能需要从多个维度进行限流。例如,不仅要限制每个用户的请求频率,还要限制整个系统的总请求频率。可以通过在Redis中使用不同的键来分别存储不同维度的计数。
import redis


class MultiDimensionLimiter:
    def __init__(self, redis_client, user_key_prefix, global_key, user_limit, global_limit, window_seconds):
        self.redis_client = redis_client
        self.user_key_prefix = user_key_prefix
        self.global_key = global_key
        self.user_limit = user_limit
        self.global_limit = global_limit
        self.window_seconds = window_seconds

    def is_allowed(self, user_id):
        user_key = f"{self.user_key_prefix}:{user_id}"
        user_count = self.redis_client.incr(user_key)
        if user_count == 1:
            self.redis_client.expire(user_key, self.window_seconds)
        global_count = self.redis_client.incr(self.global_key)
        if global_count == 1:
            self.redis_client.expire(self.global_key, self.window_seconds)
        return user_count <= self.user_limit and global_count <= self.global_limit


# 使用示例
redis_client = redis.StrictRedis(host='localhost', port=6379, db = 0)
limiter = MultiDimensionLimiter(redis_client, 'user_limit', 'global_limit', 5, 100, 60)
user_id = "12345"
if limiter.is_allowed(user_id):
    print("请求被允许")
else:
    print("请求超出限流")

在上述代码中,MultiDimensionLimiter类实现了用户维度和全局维度的限流。is_allowed方法同时检查用户维度和全局维度的计数是否超过阈值。

  1. 限流策略融合:有时候单一的固定窗口限流可能无法满足复杂业务需求,需要与其他限流策略融合。比如结合滑动窗口限流,在固定窗口限流的基础上,更精确地控制请求频率。滑动窗口限流可以看作是多个固定窗口的叠加,通过记录每个小窗口内的请求数,来实现更细粒度的限流控制。
import redis
import time


class HybridLimiter:
    def __init__(self, redis_client, key, fixed_limit, sliding_limit, window_seconds, sliding_window_count):
        self.redis_client = redis_client
        self.key = key
        self.fixed_limit = fixed_limit
        self.sliding_limit = sliding_limit
        self.window_seconds = window_seconds
        self.sliding_window_count = sliding_window_count
        self.sliding_window_size = window_seconds // sliding_window_count

    def is_allowed(self):
        current_time = int(time.time())
        fixed_count = self.redis_client.incr(self.key)
        if fixed_count == 1:
            self.redis_client.expire(self.key, self.window_seconds)

        sliding_key_prefix = f"{self.key}:sliding"
        total_sliding_count = 0
        for i in range(self.sliding_window_count):
            window_start = current_time - (i + 1) * self.sliding_window_size
            window_key = f"{sliding_key_prefix}:{window_start}"
            window_count = self.redis_client.get(window_key)
            if window_count:
                total_sliding_count += int(window_count)
            self.redis_client.setex(window_key, self.sliding_window_size, 1)

        return fixed_count <= self.fixed_limit and total_sliding_count <= self.sliding_limit


# 使用示例
redis_client = redis.StrictRedis(host='localhost', port=6379, db = 0)
limiter = HybridLimiter(redis_client, 'hybrid_example_limit_key', 10, 5, 60, 6)
if limiter.is_allowed():
    print("请求被允许")
else:
    print("请求超出限流")

在上述代码中,HybridLimiter类结合了固定窗口限流和滑动窗口限流。is_allowed方法首先检查固定窗口的计数,然后计算滑动窗口内的总计数,只有两者都不超过阈值时,请求才被允许。

性能优化

  1. 批量操作:在处理多个请求的限流判断时,可以使用Redis的管道(Pipeline)技术。管道可以将多个命令一次性发送到Redis服务器,减少网络开销。例如,在批量处理用户请求时,可以将多个INCR命令和EXPIRE命令通过管道发送。
import redis


redis_client = redis.StrictRedis(host='localhost', port=6379, db = 0)
pipeline = redis_client.pipeline()
user_ids = ["1", "2", "3"]
for user_id in user_ids:
    key = f"user_limit:{user_id}"
    pipeline.incr(key)
    pipeline.expire(key, 60)
results = pipeline.execute()
  1. 缓存预热:对于一些固定的限流配置,可以在系统启动时进行缓存预热。例如,预先设置好一些常用的限流键及其初始值和过期时间,避免在请求到达时才创建键,从而提高首次请求的响应速度。
import redis


redis_client = redis.StrictRedis(host='localhost', port=6379, db = 0)
keys = ["user_limit:1", "user_limit:2"]
for key in keys:
    redis_client.setex(key, 3600, 0)

异常处理

  1. Redis连接异常:在使用Redis进行限流时,可能会遇到Redis连接失败的情况。可以通过设置重试机制来处理这种异常。例如,使用try - except语句捕获连接异常,在一定次数内进行重试。
import redis
import time


def try_connect_redis():
    max_retries = 3
    retry_delay = 1
    for i in range(max_retries):
        try:
            redis_client = redis.StrictRedis(host='localhost', port=6379, db = 0)
            return redis_client
        except redis.ConnectionError:
            if i < max_retries - 1:
                time.sleep(retry_delay)
                retry_delay *= 2
            else:
                raise


redis_client = try_connect_redis()
  1. 数据一致性问题:在高并发场景下,可能会出现数据一致性问题。例如,在多个进程同时进行限流计数时,可能会因为并发操作导致计数不准确。可以使用Redis的事务(Transaction)来保证数据的一致性。
import redis


redis_client = redis.StrictRedis(host='localhost', port=6379, db = 0)
pipe = redis_client.pipeline()
pipe.multi()
key = "example_limit_key"
pipe.incr(key)
pipe.expire(key, 60)
try:
    pipe.execute()
except redis.WatchError:
    # 处理事务执行失败的情况
    pass

与其他组件的集成

  1. 与Web框架集成:在Web开发中,可以将Redis固定窗口限流与常用的Web框架如Flask、Django集成。以Flask为例,可以通过装饰器的方式将限流逻辑应用到视图函数上。
from flask import Flask
import redis


app = Flask(__name__)
redis_client = redis.StrictRedis(host='localhost', port=6379, db = 0)


def limit_request(key, limit, window_seconds):
    def decorator(func):
        def wrapper(*args, **kwargs):
            current_count = redis_client.incr(key)
            if current_count == 1:
                redis_client.expire(key, window_seconds)
            if current_count > limit:
                return "请求超出限流", 429
            return func(*args, **kwargs)

        return wrapper

    return decorator


@app.route('/')
@limit_request('flask_example_limit_key', 10, 60)
def index():
    return "Hello, World!"


if __name__ == '__main__':
    app.run(debug=True)
  1. 与负载均衡器集成:在分布式系统中,可以将Redis固定窗口限流与负载均衡器(如Nginx)集成。Nginx可以通过Lua脚本调用Redis接口进行限流判断,对请求进行统一的限流处理,提高系统的整体性能和稳定性。

总结

Redis固定窗口限流为业务系统提供了一种简单有效的限流方式。通过合理设定阈值、窗口大小,并结合动态调整、多维度限流等技巧,可以很好地适配各种复杂的业务需求。在实际应用中,还需要注意性能优化、异常处理以及与其他组件的集成,以确保系统的高可用性和稳定性。希望通过本文的介绍和代码示例,能帮助开发者更好地理解和应用Redis固定窗口限流技术。