MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

缓存设计在高并发场景下的应用与优化

2023-01-073.1k 阅读

缓存基础概念

在深入探讨高并发场景下缓存设计的应用与优化之前,我们先来了解一些缓存的基础概念。缓存(Cache)是一种数据存储机制,它将经常访问的数据存储在离应用程序更近、访问速度更快的地方,以减少对原始数据源(如数据库)的访问次数,从而提高系统的响应速度和整体性能。

缓存通常具有以下特点:

  • 快速访问:缓存一般存储在内存中,相较于磁盘存储的数据库,其读写速度极快。例如,内存读写速度可以达到纳秒级别,而磁盘读写速度通常在毫秒级别,差距可达数千倍。
  • 数据临时性:缓存中的数据是临时存储的,它是原始数据的一个副本。当原始数据发生变化时,缓存中的数据需要相应更新或者失效,以保证数据的一致性。
  • 容量限制:由于内存成本较高,缓存的容量通常相对较小,不能存储所有的数据,这就需要合理的缓存淘汰策略来管理缓存空间。

常见的缓存类型包括:

  • 浏览器缓存:主要用于前端网页资源的缓存,如 HTML、CSS、JavaScript 文件以及图片等。浏览器根据资源的缓存策略(如 Cache - Control、Expires 等头部信息)来决定是否从缓存中加载资源,减少网络请求,加快页面加载速度。例如,一个包含多个图片和脚本的网页,如果这些资源设置了合理的缓存策略,用户再次访问该网页时,浏览器可以直接从本地缓存中加载,无需再次从服务器获取,大大提高了访问效率。
  • CDN 缓存:内容分发网络(Content Delivery Network)缓存。CDN 将内容缓存到离用户更近的边缘节点服务器上。当用户请求访问某个资源时,CDN 服务器会根据用户的地理位置等因素,从距离用户最近的缓存节点返回数据,减少数据传输的距离和时间。像大型视频网站,通过 CDN 缓存视频内容,用户在不同地区都能快速加载视频,提高了用户体验。
  • 应用层缓存:这是我们在后端开发中重点关注的缓存类型。应用层缓存位于应用程序内部,通常由应用程序自身来管理和维护。应用层缓存可以缓存数据库查询结果、计算结果等。例如,一个电商应用中,经常需要查询热门商品的信息,这些热门商品信息变化频率较低,可以将其查询结果缓存起来,当有新的请求时,直接从缓存中获取数据,而不必每次都查询数据库。

高并发场景对缓存的挑战

高并发场景是指在同一时间内,系统会收到大量的请求。在这种情况下,缓存面临着诸多挑战:

  • 缓存穿透:当大量请求查询一个在缓存和数据库中都不存在的数据时,这些请求会直接穿透缓存,到达数据库,给数据库带来巨大压力。例如,恶意攻击者可能会故意构造大量不存在的商品 ID 进行查询,如果系统没有相应的防护措施,这些请求都会打到数据库,可能导致数据库性能下降甚至崩溃。
  • 缓存雪崩:由于某种原因(如缓存服务器重启、缓存过期时间设置不合理等),大量缓存数据在同一时间过期失效,导致大量请求同时直接访问数据库,如同雪崩一样,可能使数据库不堪重负而崩溃。比如,在一个电商促销活动中,设置了大量商品缓存的过期时间为活动结束后同一时刻,如果此时缓存失效,所有关于这些商品的请求都会直接访问数据库,很容易造成数据库压力过大。
  • 缓存击穿:一个被高并发访问并且缓存失效的 key,瞬间有大量请求访问,这些请求同时穿过缓存,直接到达数据库,可能导致数据库压力过大。与缓存雪崩不同的是,缓存击穿是单个 key 的缓存失效问题,而缓存雪崩是大量 key 同时失效。例如,一个热门商品的缓存突然过期,在高并发情况下,大量请求同时尝试获取该商品信息,由于缓存中没有,都去查询数据库,可能对数据库造成较大冲击。
  • 数据一致性:在高并发环境下,数据的更新操作频繁,要保证缓存和数据库中数据的一致性变得更加困难。例如,当一个商品的库存数据在数据库中更新后,需要及时更新缓存中的库存数据,否则可能出现用户看到的库存数据不一致的情况。如果更新操作处理不当,还可能导致缓存和数据库之间的数据不一致循环问题。

缓存设计在高并发场景下的应用策略

为了应对高并发场景下的挑战,我们需要采用合适的缓存设计策略。

缓存穿透应对策略

  1. 布隆过滤器(Bloom Filter):布隆过滤器是一种空间效率很高的概率型数据结构,它可以用来判断一个元素是否在一个集合中。在缓存穿透场景中,我们可以在缓存之前使用布隆过滤器。当一个请求到来时,先通过布隆过滤器判断该数据是否存在,如果布隆过滤器判断不存在,那么可以直接返回,不再查询数据库,从而避免无效请求穿透到数据库。 以下是一个简单的布隆过滤器的 Python 实现示例:
import mmh3
from bitarray import bitarray


class BloomFilter:
    def __init__(self, capacity, error_rate):
        self.capacity = capacity
        self.error_rate = error_rate
        self.bit_array = bitarray(capacity)
        self.bit_array.setall(0)
        self.hash_count = int(-(capacity * (self.error_rate).bit_length()) / (self.capacity * (1 / self.capacity).bit_length()))

    def add(self, item):
        for i in range(self.hash_count):
            index = mmh3.hash(item, i) % self.capacity
            self.bit_array[index] = 1

    def check(self, item):
        for i in range(self.hash_count):
            index = mmh3.hash(item, i) % self.capacity
            if not self.bit_array[index]:
                return False
        return True


# 使用示例
bf = BloomFilter(capacity = 10000, error_rate = 0.01)
data = ["apple", "banana", "cherry"]
for item in data:
    bf.add(item)

print(bf.check("apple"))  
print(bf.check("date"))  
  1. 空值缓存:当查询的数据在数据库中不存在时,也将这个空值结果缓存起来,并设置一个较短的过期时间。这样,后续相同的请求就可以直接从缓存中获取空值,而不会穿透到数据库。例如:
cache = {}


def get_data(key):
    if key in cache:
        return cache[key]
    data = get_data_from_database(key)
    if data is None:
        cache[key] = None
        # 设置较短的过期时间,例如 60 秒
        cache_expiry_time[key] = time.time() + 60
    else:
        cache[key] = data
    return data


def get_data_from_database(key):
    # 实际的数据库查询逻辑,这里简单返回 None 模拟不存在的数据
    return None


缓存雪崩应对策略

  1. 随机化过期时间:避免将大量缓存设置为相同的过期时间。对于不同的缓存数据,设置一个随机的过期时间范围,例如在原本计划的过期时间基础上,上下浮动一定的时间。这样可以分散缓存过期的时间点,避免大量缓存同时失效。例如,原本计划所有商品缓存过期时间为 1 小时,可以改为在 50 分钟到 70 分钟之间随机设置过期时间。
import random
import time


cache = {}


def set_cache(key, value, base_expiry=3600):
    # 随机过期时间在 base_expiry 的 80% 到 120% 之间
    expiry = int(base_expiry * (0.8 + random.random() * 0.4))
    cache[key] = (value, time.time() + expiry)


def get_cache(key):
    if key in cache:
        value, expiry_time = cache[key]
        if time.time() < expiry_time:
            return value
        else:
            del cache[key]
    return None


  1. 多级缓存:采用多级缓存架构,例如一级缓存使用内存缓存(如 Redis),二级缓存可以使用磁盘缓存(如 Memcached 支持的磁盘持久化)或者其他较慢但容量较大的存储。当一级缓存失效时,先从二级缓存获取数据,如果二级缓存也没有,再查询数据库,并将结果同时更新到一级和二级缓存。这样可以在一定程度上缓解大量请求直接冲击数据库的问题。 以 Python 结合 Redis 和 DiskCache 为例:
import redis
import diskcache


# 初始化 Redis 客户端
redis_client = redis.StrictRedis(host='localhost', port=6379, db = 0)
# 初始化 DiskCache
disk_cache = diskcache.Cache('my_cache')


def get_data(key):
    data = redis_client.get(key)
    if data is not None:
        return data.decode('utf - 8')
    data = disk_cache.get(key)
    if data is not None:
        redis_client.set(key, data)
        return data
    data = get_data_from_database(key)
    if data is not None:
        redis_client.set(key, data)
        disk_cache.set(key, data)
    return data


def get_data_from_database(key):
    # 实际的数据库查询逻辑,这里简单返回一个字符串模拟数据
    return "data for " + key


缓存击穿应对策略

  1. 互斥锁:在缓存失效时,使用互斥锁(如 Redis 的 SETNX 命令实现分布式锁)来保证只有一个请求去查询数据库并更新缓存,其他请求等待。当第一个请求查询到数据并更新缓存后,其他请求可以直接从缓存中获取数据。以下是一个基于 Redis 实现互斥锁解决缓存击穿的 Python 示例:
import redis
import time


redis_client = redis.StrictRedis(host='localhost', port=6379, db = 0)


def get_data(key):
    data = redis_client.get(key)
    if data is not None:
        return data.decode('utf - 8')
    lock_key = "lock:" + key
    lock_acquired = redis_client.setnx(lock_key, 1)
    if lock_acquired:
        try:
            data = get_data_from_database(key)
            if data is not None:
                redis_client.set(key, data)
            return data
        finally:
            redis_client.delete(lock_key)
    else:
        # 等待一段时间后重试
        time.sleep(0.1)
        return get_data(key)


def get_data_from_database(key):
    # 实际的数据库查询逻辑,这里简单返回一个字符串模拟数据
    return "data for " + key


  1. 热点数据永不过期:对于一些热点数据,不设置过期时间,保证缓存永远有效。同时,通过后台线程定时更新缓存数据,或者在数据发生变化时主动更新缓存。这样可以避免缓存失效瞬间的高并发请求穿透到数据库。例如:
import threading
import time


cache = {}


def update_cache_periodically(key):
    while True:
        data = get_data_from_database(key)
        cache[key] = data
        time.sleep(3600)  # 每小时更新一次


def get_data(key):
    if key not in cache:
        data = get_data_from_database(key)
        cache[key] = data
        # 启动后台线程更新缓存
        threading.Thread(target = update_cache_periodically, args = (key,)).start()
    return cache[key]


def get_data_from_database(key):
    # 实际的数据库查询逻辑,这里简单返回一个字符串模拟数据
    return "data for " + key


数据一致性应对策略

  1. 读写锁:在读取数据时,允许多个线程同时读取,因为读操作不会修改数据,不会产生数据一致性问题。在写入数据时,获取写锁,保证只有一个线程可以进行写操作,避免并发写操作导致的数据不一致。例如,在 Java 中可以使用 ReentrantReadWriteLock 实现读写锁:
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;


public class CacheWithReadWriteLock {
    private final Map<String, Object> cache = new HashMap<>();
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();


    public Object get(String key) {
        lock.readLock().lock();
        try {
            return cache.get(key);
        } finally {
            lock.readLock().unlock();
        }
    }


    public void put(String key, Object value) {
        lock.writeLock().lock();
        try {
            cache.put(key, value);
        } finally {
            lock.writeLock().unlock();
        }
    }
}


  1. 先更新数据库,再删除缓存:在数据更新操作时,先更新数据库,成功后再删除缓存。这样当再次读取数据时,缓存中不存在数据,会从数据库中读取最新数据并更新到缓存。需要注意的是,在高并发情况下,可能会出现短暂的数据不一致问题。例如,在更新数据库和删除缓存之间,有其他线程读取数据,此时读取到的是旧数据。为了减少这种不一致的时间窗口,可以尽量缩短更新数据库和删除缓存之间的时间间隔,并且可以结合缓存的过期时间来处理。例如:
import redis


redis_client = redis.StrictRedis(host='localhost', port=6379, db = 0)


def update_data(key, new_value):
    # 更新数据库
    update_database(key, new_value)
    # 删除缓存
    redis_client.delete(key)


def get_data(key):
    data = redis_client.get(key)
    if data is not None:
        return data.decode('utf - 8')
    data = get_data_from_database(key)
    if data is not None:
        redis_client.set(key, data)
    return data


def update_database(key, new_value):
    # 实际的数据库更新逻辑,这里简单打印表示更新
    print(f"Updating database with key {key} and value {new_value}")


def get_data_from_database(key):
    # 实际的数据库查询逻辑,这里简单返回一个字符串模拟数据
    return "data for " + key


  1. 先删除缓存,再更新数据库:这种策略与先更新数据库再删除缓存相反,先删除缓存,再更新数据库。同样在高并发场景下可能出现数据不一致问题。例如,在删除缓存后,更新数据库前,有其他线程读取数据,会发现缓存中没有数据,然后从数据库读取旧数据并更新到缓存,之后数据库更新的数据就和缓存不一致了。可以通过设置缓存短时间失效,或者在更新数据库后,再次检查缓存并更新来尽量减少不一致的情况。例如:
import redis
import time


redis_client = redis.StrictRedis(host='localhost', port=6379, db = 0)


def update_data(key, new_value):
    # 删除缓存
    redis_client.delete(key)
    # 更新数据库
    update_database(key, new_value)
    # 等待一段时间确保数据库更新完成
    time.sleep(0.1)
    # 再次检查缓存并更新
    data = get_data_from_database(key)
    redis_client.set(key, data)


def get_data(key):
    data = redis_client.get(key)
    if data is not None:
        return data.decode('utf - 8')
    data = get_data_from_database(key)
    if data is not None:
        redis_client.set(key, data)
    return data


def update_database(key, new_value):
    # 实际的数据库更新逻辑,这里简单打印表示更新
    print(f"Updating database with key {key} and value {new_value}")


def get_data_from_database(key):
    # 实际的数据库查询逻辑,这里简单返回一个字符串模拟数据
    return "data for " + key


缓存优化技巧

除了应对高并发场景下的常见问题,我们还可以通过一些优化技巧来进一步提升缓存的性能和效率。

缓存数据结构优化

  1. 合理选择缓存数据类型:根据实际需求选择合适的缓存数据类型。例如,在 Redis 中,如果需要存储简单的键值对,使用字符串类型(String)即可;如果需要存储多个相关的数据,可以使用哈希类型(Hash)。比如存储用户信息,使用哈希类型可以将用户的各个属性(如姓名、年龄、地址等)存储在一个哈希表中,通过一个 key 进行访问,减少 key 的数量,提高缓存空间利用率。
import redis


redis_client = redis.StrictRedis(host='localhost', port=6379, db = 0)


# 使用哈希类型存储用户信息
user_key = "user:1"
user_data = {
    "name": "John",
    "age": 30,
    "address": "123 Main St"
}
redis_client.hmset(user_key, user_data)


# 获取用户信息
result = redis_client.hgetall(user_key)
print(result)


  1. 使用集合类型(Set)和有序集合类型(Sorted Set):当需要存储不重复的数据集合,或者需要对数据进行排序时,可以分别使用集合类型和有序集合类型。例如,在一个社交应用中,可以使用集合类型存储用户的好友列表,保证好友不会重复;使用有序集合类型存储用户的积分排行榜,通过积分进行排序。
import redis


redis_client = redis.StrictRedis(host='localhost', port=6379, db = 0)


# 使用集合类型存储好友列表
user1_key = "user:1:friends"
friends = ["Alice", "Bob", "Charlie"]
for friend in friends:
    redis_client.sadd(user1_key, friend)


# 获取好友列表
friend_list = redis_client.smembers(user1_key)
print(friend_list)


# 使用有序集合类型存储积分排行榜
rank_key = "rank:points"
users_points = {
    "Alice": 100,
    "Bob": 200,
    "Charlie": 150
}
for user, points in users_points.items():
    redis_client.zadd(rank_key, {user: points})


# 获取积分排行榜
rank_result = redis_client.zrange(rank_key, 0, -1, withscores = True)
print(rank_result)


缓存粒度优化

  1. 粗粒度缓存:将多个相关的数据组合在一起进行缓存,减少缓存的 key 数量,降低缓存管理的开销。例如,在一个新闻应用中,可以将一篇新闻的标题、正文、作者等相关信息组合成一个对象进行缓存,通过新闻 ID 作为 key。这样在获取新闻信息时,一次缓存查询就可以获取所有相关数据,提高查询效率。
import redis


redis_client = redis.StrictRedis(host='localhost', port=6379, db = 0)


# 新闻数据
news_id = "news:1"
news_data = {
    "title": "Sample News Title",
    "content": "This is the content of the news.",
    "author": "John Doe"
}
redis_client.set(news_id, str(news_data))


# 获取新闻数据
result = redis_client.get(news_id)
print(result)


  1. 细粒度缓存:将数据按照更细的粒度进行缓存,每个 key 对应更具体的数据项。这种方式适合数据更新频率不一致的情况。例如,在一个电商应用中,商品的基本信息(如名称、描述)更新频率较低,可以进行粗粒度缓存;而商品的库存信息更新频繁,适合细粒度缓存。这样在库存更新时,只需要更新库存相关的缓存,而不影响其他商品信息的缓存。
import redis


redis_client = redis.StrictRedis(host='localhost', port=6379, db = 0)


# 商品基本信息缓存
product_key = "product:1:info"
product_info = {
    "name": "Sample Product",
    "description": "This is a sample product."
}
redis_client.set(product_key, str(product_info))


# 商品库存缓存
stock_key = "product:1:stock"
redis_client.set(stock_key, "100")


# 获取商品基本信息
info_result = redis_client.get(product_key)
print(info_result)


# 获取商品库存
stock_result = redis_client.get(stock_key)
print(stock_result)


缓存预热

在系统启动时,提前将一些热点数据加载到缓存中,避免系统刚启动时,由于缓存中没有数据,大量请求直接访问数据库,导致数据库压力过大。可以通过脚本或者配置文件来指定需要预热的缓存数据。例如,在一个电商应用启动时,可以预先加载热门商品的信息到缓存中。

import redis


redis_client = redis.StrictRedis(host='localhost', port=6379, db = 0)


# 假设热门商品 ID 列表
popular_product_ids = ["product:1", "product:2", "product:3"]


def warm_up_cache():
    for product_id in popular_product_ids:
        product_data = get_product_data_from_database(product_id)
        if product_data is not None:
            redis_client.set(product_id, str(product_data))


def get_product_data_from_database(product_id):
    # 实际的数据库查询逻辑,这里简单返回一个字典模拟数据
    return {
        "name": "Sample Product for " + product_id,
        "price": 100
    }


if __name__ == "__main__":
    warm_up_cache()


缓存监控与调优

  1. 监控缓存命中率:缓存命中率是衡量缓存性能的重要指标,它表示缓存命中的次数与总请求次数的比例。通过监控缓存命中率,可以了解缓存的使用效果。如果缓存命中率过低,可能需要调整缓存策略,如增加缓存数据、优化缓存淘汰策略等。在 Redis 中,可以通过 INFO 命令获取缓存命中和未命中的次数,计算缓存命中率。
import redis


redis_client = redis.StrictRedis(host='localhost', port=6379, db = 0)


info = redis_client.info()
total_requests = info['total_commands_processed']
cache_hits = info['keyspace_hits']
cache_misses = info['keyspace_misses']
hit_rate = cache_hits / (cache_hits + cache_misses) if (cache_hits + cache_misses) > 0 else 0
print(f"Cache Hit Rate: {hit_rate}")


  1. 监控缓存内存使用情况:随着缓存数据的不断增加,需要监控缓存的内存使用情况,避免缓存占用过多内存导致系统性能下降。可以通过 Redis 的 INFO 命令获取内存使用相关信息,如已使用内存、内存碎片率等。如果内存使用率过高,可以考虑优化缓存数据结构,或者调整缓存淘汰策略,释放一些不必要的缓存空间。
import redis


redis_client = redis.StrictRedis(host='localhost', port=6379, db = 0)


info = redis_client.info('memory')
used_memory = info['used_memory']
memory_fragmentation_ratio = info['mem_fragmentation_ratio']
print(f"Used Memory: {used_memory} bytes")
print(f"Memory Fragmentation Ratio: {memory_fragmentation_ratio}")


  1. 根据监控数据进行调优:根据缓存命中率、内存使用等监控数据,对缓存进行调优。例如,如果缓存命中率低且内存有剩余,可以适当增加缓存数据的数量或者调整缓存过期时间;如果内存使用率过高,可以清理一些长时间未使用的缓存数据,或者优化缓存数据结构,减少内存占用。同时,还可以根据不同时间段的请求特点,动态调整缓存策略。比如在业务高峰期,适当增加缓存容量,保证高并发请求的处理效率。

分布式缓存

在大型高并发系统中,单机缓存往往无法满足需求,需要使用分布式缓存。分布式缓存将缓存数据分布在多个节点上,通过集群的方式提供高可用、高性能的缓存服务。

分布式缓存架构

  1. 客户端分片:客户端根据一定的算法(如一致性哈希算法)将数据映射到不同的缓存节点上。客户端需要维护一个节点列表,并根据算法计算出数据应该存储的节点。这种方式的优点是架构简单,不需要额外的中间件;缺点是客户端实现复杂,并且当节点数量发生变化时,需要重新计算数据的映射关系,可能导致大量数据迁移。 以下是一个简单的基于一致性哈希算法的客户端分片 Python 示例:
import hashlib
import bisect


class ConsistentHash:
    def __init__(self, nodes, replicas = 100):
        self.replicas = replicas
        self.ring = []
        self.node_map = {}
        for node in nodes:
            for i in range(replicas):
                hash_value = self._hash(f"{node}:{i}")
                self.ring.append(hash_value)
                self.node_map[hash_value] = node
        self.ring.sort()

    def _hash(self, key):
        return int(hashlib.md5(key.encode()).hexdigest(), 16)

    def get_node(self, key):
        hash_value = self._hash(key)
        index = bisect.bisect(self.ring, hash_value)
        if index == len(self.ring):
            index = 0
        return self.node_map[self.ring[index]]


# 使用示例
nodes = ["node1", "node2", "node3"]
ch = ConsistentHash(nodes)
print(ch.get_node("data1"))  
print(ch.get_node("data2"))  
  1. 代理分片:在客户端和缓存节点之间引入代理层,代理层负责根据一定的算法将请求转发到相应的缓存节点。代理层可以对请求进行统一管理,如缓存穿透、缓存雪崩的处理等。常见的代理分片中间件有 Twemproxy 等。这种方式的优点是客户端实现简单,节点的添加和删除由代理层处理,对客户端透明;缺点是代理层可能成为性能瓶颈,并且增加了系统的复杂度。
  2. 服务器端分片:缓存服务器自身实现数据分片功能,如 Redis Cluster。Redis Cluster 使用哈希槽(Hash Slot)的方式将数据分布在不同的节点上,每个节点负责一部分哈希槽。客户端只需要连接任意一个节点,该节点会将请求转发到正确的节点上。这种方式的优点是高可用、可扩展性强,节点的添加和删除自动进行数据迁移;缺点是集群管理相对复杂,需要一定的运维成本。

分布式缓存的一致性问题

在分布式缓存中,由于数据分布在多个节点上,一致性问题更加复杂。除了前面提到的缓存与数据库之间的数据一致性,还存在分布式缓存节点之间的数据一致性问题。

  1. 最终一致性:允许不同节点上的数据在一段时间内存在不一致,但最终会达到一致。例如,在使用 Redis Cluster 时,当一个节点的数据发生更新,其他节点不会立即同步,而是通过 gossip 协议在一段时间内逐渐同步数据。这种方式适合对一致性要求不是特别高,但对性能和可用性要求较高的场景。
  2. 强一致性:要求所有节点上的数据在任何时刻都保持一致。实现强一致性通常需要更多的同步机制和协调开销,会降低系统的性能和可用性。例如,使用分布式事务来保证缓存更新的一致性,但这会增加系统的复杂度和延迟。在实际应用中,需要根据业务需求来选择合适的一致性模型。

总结

在高并发场景下,缓存设计的应用与优化是提升系统性能和稳定性的关键。通过合理的缓存策略应对缓存穿透、缓存雪崩、缓存击穿等问题,优化缓存数据结构、缓存粒度,进行缓存预热、监控与调优,以及在必要时采用分布式缓存架构,可以有效地提高系统的响应速度,减轻数据库压力,提升用户体验。同时,要根据具体的业务场景和需求,灵活选择和调整缓存设计方案,以达到最佳的性能和成本效益。在实际开发中,不断地实践和总结经验,将有助于构建更加高效、稳定的后端系统。