MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

缓存预热策略在高并发场景中的应用

2021-05-206.1k 阅读

缓存预热策略概述

在高并发场景下,后端系统面临着巨大的压力。缓存作为一种有效的性能优化手段,能够显著提升系统的响应速度和吞吐量。然而,若缓存未经过预热,在高并发请求瞬间到达时,可能会出现大量缓存穿透、缓存雪崩等问题,导致系统性能急剧下降甚至崩溃。缓存预热策略就是为了解决这些问题而设计的。

缓存预热,简单来说,就是在系统上线或业务高峰来临之前,提前将部分热点数据加载到缓存中。这样当高并发请求到来时,大部分请求可以直接从缓存中获取数据,避免了直接查询数据库等慢速数据源带来的性能瓶颈。

高并发场景下未预热缓存的问题

  1. 缓存穿透:当客户端持续请求一个在缓存和数据库中都不存在的数据时,由于缓存中没有该数据,每次请求都会直接穿透到数据库,若这种请求量巨大,数据库很可能不堪重负而崩溃。例如,恶意攻击者故意构造大量不存在的商品 ID 进行请求,如果系统没有合适的防护措施,数据库将被大量无效查询淹没。
  2. 缓存雪崩:在缓存集中过期的情况下,大量原本应该从缓存获取数据的请求会同时落到数据库上,如同雪崩一般,对数据库造成巨大压力。假设电商系统中大量商品的缓存设置了相同的过期时间,当这个过期时间一到,所有这些商品的缓存都失效,瞬间大量请求涌到数据库,数据库很可能无法承受而宕机。
  3. 缓存击穿:某个热点数据在缓存中过期的瞬间,恰好有大量并发请求同时访问该数据,这些请求会直接打到数据库上。比如在秒杀活动中,某个热门商品的缓存过期,而此时大量用户同时请求该商品信息,数据库可能因为无法及时处理这些请求而出现性能问题。

缓存预热策略分类

  1. 启动时预热:在应用程序启动阶段,通过程序逻辑从数据库或其他数据源加载热点数据到缓存中。这种方式简单直接,适用于数据量不是特别大且热点数据相对固定的场景。例如,一个小型的新闻网站,每天的热门新闻数量有限且相对固定,在网站应用启动时,可以将前一天的热门新闻加载到缓存中。
  2. 定时预热:按照一定的时间周期,定时从数据源获取数据更新缓存。这种策略适用于数据有一定时效性,需要定期更新的场景。比如天气预报网站,每小时从气象局获取最新的天气数据并更新到缓存中,保证用户获取到的天气信息是较新的。
  3. 实时预热:当有新数据产生或者数据发生变化时,立即更新缓存。常用于对数据实时性要求极高的场景,如股票交易系统,股票价格实时变动,一旦价格有变化,相关的缓存数据必须立即更新。

缓存预热策略的实现方式

  1. 基于代码实现:在应用程序代码中直接编写加载数据到缓存的逻辑。以 Java 为例,使用 Spring Boot 框架和 Redis 缓存:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

@Component
public class CachePreloader implements CommandLineRunner {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    // 假设这是获取热点数据的服务
    @Autowired
    private HotDataService hotDataService;

    @Override
    public void run(String... args) throws Exception {
        // 从数据库获取热点数据
        Object hotData = hotDataService.getHotData();
        // 将热点数据存入 Redis 缓存
        redisTemplate.opsForValue().set("hot_data_key", hotData);
    }
}

上述代码中,CachePreloader 类实现了 CommandLineRunner 接口,在应用程序启动时会执行 run 方法,从 HotDataService 获取热点数据并存储到 Redis 缓存中,这是启动时预热的一种实现方式。

  1. 使用脚本实现:可以编写独立的脚本,定时执行来更新缓存。以 Python 脚本结合 Redis 为例:
import redis
import pymysql

# 连接 Redis
r = redis.Redis(host='localhost', port=6379, db=0)

# 连接 MySQL 数据库
conn = pymysql.connect(host='localhost', user='root', password='password', database='test')
cursor = conn.cursor()

# 执行 SQL 查询获取热点数据
cursor.execute("SELECT * FROM hot_data_table")
hot_data = cursor.fetchall()

# 将热点数据存入 Redis
for data in hot_data:
    r.set(f"hot_data_{data[0]}", str(data))

# 关闭数据库连接
cursor.close()
conn.close()

然后可以使用 Linux 的 crontab 工具来定时执行这个 Python 脚本,实现定时预热缓存。例如,设置每天凌晨 2 点执行:

0 2 * * * /usr/bin/python3 /path/to/your/script.py

热点数据的识别

  1. 基于历史数据统计:通过分析历史业务数据,找出访问频率高的数据。例如,电商系统可以统计过去一周内每个商品的浏览量、购买量等指标,将浏览量或购买量排名靠前的商品作为热点数据。数据库查询语句可以如下:
SELECT product_id, COUNT(*) AS view_count
FROM product_view_log
GROUP BY product_id
ORDER BY view_count DESC
LIMIT 100;

上述 SQL 语句从商品浏览日志表 product_view_log 中统计每个商品的浏览次数,并按浏览次数降序排列,取前 100 个商品作为热点数据。

  1. 实时数据分析:利用实时数据处理框架,如 Apache Kafka 结合 Flink,实时统计数据的访问频率。例如,当用户请求商品信息时,将请求信息发送到 Kafka 主题,Flink 从 Kafka 消费数据并实时统计每个商品的请求次数,动态识别热点商品。以下是一个简单的 Flink 代码示例:
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class HotProductDetector {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从 Kafka 读取商品请求数据
        DataStreamSource<String> stream = env.addSource(new KafkaSource());

        stream
           .map(line -> {
                String[] fields = line.split(",");
                return new ProductRequest(fields[0], Long.parseLong(fields[1]));
            })
           .keyBy(ProductRequest::getProductId)
           .timeWindow(Time.minutes(5))
           .process(new ProcessWindowFunction<ProductRequest, HotProduct, String, TimeWindow>() {
                @Override
                public void process(String key, Context context, Iterable<ProductRequest> elements, Collector<HotProduct> out) throws Exception {
                    long count = 0;
                    for (ProductRequest element : elements) {
                        count++;
                    }
                    out.collect(new HotProduct(key, count, context.window().getEnd()));
                }
            })
           .print();

        env.execute("Hot Product Detector");
    }
}

class ProductRequest {
    private String productId;
    private long timestamp;

    public ProductRequest() {}

    public ProductRequest(String productId, long timestamp) {
        this.productId = productId;
        this.timestamp = timestamp;
    }

    public String getProductId() {
        return productId;
    }

    public long getTimestamp() {
        return timestamp;
    }
}

class HotProduct {
    private String productId;
    private long count;
    private long windowEnd;

    public HotProduct(String productId, long count, long windowEnd) {
        this.productId = productId;
        this.count = count;
        this.windowEnd = windowEnd;
    }

    @Override
    public String toString() {
        return "HotProduct{" +
                "productId='" + productId + '\'' +
                ", count=" + count +
                ", windowEnd=" + windowEnd +
                '}';
    }
}

上述代码中,Flink 从 Kafka 读取商品请求数据,按商品 ID 进行分组,在 5 分钟的时间窗口内统计每个商品的请求次数,识别出热点商品。

缓存预热与其他性能优化策略的结合

  1. 与负载均衡结合:在高并发场景下,负载均衡器将请求均匀分配到多个后端服务器上。缓存预热可以在每个后端服务器启动时进行,确保每个服务器的缓存中都有热点数据。同时,负载均衡器可以根据服务器的缓存状态进行请求分配,优先将请求发送到缓存命中率高的服务器上。例如,Nginx 可以通过配置实现根据服务器的缓存状态进行负载均衡:
upstream backend {
    server backend1.example.com;
    server backend2.example.com;
    server backend3.example.com;

    least_conn;
    # 可以通过自定义模块获取服务器缓存命中率,根据命中率调整权重
    # 例如:server backend1.example.com weight=cache_hit_rate1;
}

server {
    listen 80;
    server_name example.com;

    location / {
        proxy_pass http://backend;
    }
}
  1. 与数据库优化结合:缓存预热虽然减少了数据库的直接访问,但数据库仍然是数据的最终来源。在进行缓存预热时,可以同时对数据库进行优化,如合理创建索引、优化查询语句等。对于那些被频繁预热到缓存中的数据对应的数据库查询,要确保其执行效率。例如,在电商系统中,对于热门商品的查询,在数据库表 products 上对商品 ID 和分类等经常用于查询的字段创建索引:
CREATE INDEX idx_product_id ON products(product_id);
CREATE INDEX idx_category ON products(category);

这样,当缓存预热从数据库获取热门商品数据时,查询效率会得到提升。

  1. 与异步处理结合:在缓存预热过程中,特别是加载大量数据时,可能会阻塞应用程序的启动或正常业务流程。可以采用异步处理的方式,将缓存预热任务放到单独的线程或线程池中执行。以 Java 为例,使用 CompletableFuture 实现异步缓存预热:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.util.concurrent.CompletableFuture;

@Component
public class AsyncCachePreloader {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    // 假设这是获取热点数据的服务
    @Autowired
    private HotDataService hotDataService;

    public void preloadCacheAsync() {
        CompletableFuture.runAsync(() -> {
            // 从数据库获取热点数据
            Object hotData = hotDataService.getHotData();
            // 将热点数据存入 Redis 缓存
            redisTemplate.opsForValue().set("hot_data_key", hotData);
        });
    }
}

在应用程序启动时,可以调用 preloadCacheAsync 方法,让缓存预热在后台异步执行,不影响应用程序的正常启动流程。

缓存预热策略的监控与调优

  1. 缓存命中率监控:缓存命中率是衡量缓存预热效果的重要指标,计算公式为:缓存命中率 = 缓存命中次数 / (缓存命中次数 + 缓存未命中次数)。可以通过在应用程序中添加监控代码来统计缓存命中和未命中次数。以 Python Flask 应用结合 Redis 缓存为例:
from flask import Flask
import redis

app = Flask(__name__)
r = redis.Redis(host='localhost', port=6379, db=0)

cache_hit_count = 0
cache_miss_count = 0

@app.route('/data')
def get_data():
    global cache_hit_count, cache_miss_count
    data = r.get('data_key')
    if data:
        cache_hit_count += 1
        return data.decode('utf-8')
    else:
        # 从数据库获取数据并更新缓存
        db_data = get_data_from_db()
        r.set('data_key', db_data)
        cache_miss_count += 1
        return db_data

def get_cache_hit_rate():
    total = cache_hit_count + cache_miss_count
    if total == 0:
        return 0
    return cache_hit_count / total

if __name__ == '__main__':
    app.run(debug=True)

通过定期查看缓存命中率,如果命中率较低,可能需要调整缓存预热策略,如扩大预热数据范围、优化热点数据识别等。

  1. 缓存容量监控:要监控缓存的使用容量,避免缓存溢出。不同的缓存系统有不同的监控方式。对于 Redis,可以使用 INFO 命令获取缓存的相关信息,包括已使用内存等。在 Python 中可以这样获取 Redis 缓存容量信息:
import redis

r = redis.Redis(host='localhost', port=6379, db=0)
info = r.info()
used_memory = info['used_memory']
print(f"Redis已使用内存: {used_memory} 字节")

如果缓存容量接近上限,可能需要调整缓存淘汰策略,或者增加缓存服务器资源,同时也需要评估缓存预热的数据量是否过大。

  1. 缓存更新时间监控:对于定时预热和实时预热策略,要监控缓存数据的更新时间,确保数据的时效性。可以在缓存数据中添加时间戳字段,每次获取缓存数据时检查时间戳。以 Java 操作 Redis 为例:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import java.util.Date;

@Service
public class CacheService {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    public Object getData(String key) {
        Object data = redisTemplate.opsForValue().get(key);
        if (data != null) {
            long timestamp = (long) redisTemplate.opsForValue().get(key + "_timestamp");
            Date updateTime = new Date(timestamp);
            // 检查数据是否过期,这里假设过期时间为 1 小时
            if (System.currentTimeMillis() - updateTime.getTime() > 60 * 60 * 1000) {
                // 数据过期,重新加载数据
                data = loadDataFromSource(key);
                redisTemplate.opsForValue().set(key, data);
                redisTemplate.opsForValue().set(key + "_timestamp", System.currentTimeMillis());
            }
        }
        return data;
    }

    private Object loadDataFromSource(String key) {
        // 从数据源加载数据的逻辑
        return null;
    }
}

通过监控缓存更新时间,及时发现数据更新不及时的问题,调整预热策略的时间周期或优化实时预热的逻辑。

不同业务场景下的缓存预热策略应用案例

  1. 电商业务场景
    • 商品详情页:在电商平台中,商品详情页是用户经常访问的页面。可以采用启动时预热和定时预热相结合的策略。在应用程序启动时,将热门商品的详情数据加载到缓存中,同时每天凌晨定时更新缓存,确保商品信息的准确性。例如,使用 Java 和 Redis 实现:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class ProductCachePreloader implements CommandLineRunner {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    // 假设这是获取热门商品数据的服务
    @Autowired
    private HotProductService hotProductService;

    @Override
    public void run(String... args) throws Exception {
        preloadHotProductCache();
    }

    @Scheduled(cron = "0 0 2 * * *") // 每天凌晨 2 点执行
    public void updateHotProductCache() {
        preloadHotProductCache();
    }

    private void preloadHotProductCache() {
        // 从数据库获取热门商品数据
        Object hotProducts = hotProductService.getHotProducts();
        // 将热门商品数据存入 Redis 缓存
        redisTemplate.opsForValue().set("hot_products", hotProducts);
    }
}
- **购物车**:购物车数据对实时性要求较高,适合采用实时预热策略。当用户添加或修改购物车中的商品时,立即更新缓存中的购物车数据。以 Python Django 框架结合 Redis 为例:
import redis
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt

r = redis.Redis(host='localhost', port=6379, db=0)

@csrf_exempt
def add_to_cart(request):
    if request.method == 'POST':
        product_id = request.POST.get('product_id')
        user_id = request.session['user_id']
        # 更新购物车缓存
        cart_key = f"cart_{user_id}"
        if r.exists(cart_key):
            cart = r.hgetall(cart_key)
            if product_id in cart:
                cart[product_id] = int(cart[product_id]) + 1
            else:
                cart[product_id] = 1
        else:
            cart = {product_id: 1}
        r.hmset(cart_key, cart)
        return JsonResponse({'message': '商品已添加到购物车'})
    return JsonResponse({'error': '无效请求'})
  1. 新闻资讯业务场景
    • 热门新闻:新闻网站的热门新闻可以采用定时预热策略,每隔一定时间从数据库获取最新的热门新闻更新缓存。例如,每 30 分钟更新一次。使用 Node.js 和 Redis 实现:
const redis = require('redis');
const { promisify } = require('util');
const mysql = require('mysql2');

const redisClient = redis.createClient();
const getAsync = promisify(redisClient.get).bind(redisClient);
const setAsync = promisify(redisClient.set).bind(redisClient);

const connection = mysql.createConnection({
    host: 'localhost',
    user: 'root',
    password: 'password',
    database: 'news_db'
});

async function preloadHotNews() {
    const query = 'SELECT * FROM news WHERE is_hot = 1';
    connection.query(query, async (error, results) => {
        if (!error) {
            await setAsync('hot_news', JSON.stringify(results));
        } else {
            console.error('获取热门新闻出错:', error);
        }
    });
}

setInterval(preloadHotNews, 30 * 60 * 1000); // 每 30 分钟执行一次
- **个性化推荐新闻**:对于个性化推荐新闻,由于每个用户的推荐内容不同,实时性要求较高。当用户行为发生变化,如阅读、点赞新闻时,立即更新个性化推荐缓存。以 Python Flask 应用结合 Redis 为例:
from flask import Flask, request
import redis

app = Flask(__name__)
r = redis.Redis(host='localhost', port=6379, db=0)

@app.route('/user_action', methods=['POST'])
def user_action():
    user_id = request.json.get('user_id')
    action_type = request.json.get('action_type')
    news_id = request.json.get('news_id')
    # 根据用户行为更新个性化推荐缓存
    if action_type =='read':
        # 假设这里有更新推荐逻辑
        r.sadd(f"recommended_news_{user_id}", news_id);
    return '用户行为处理成功'

if __name__ == '__main__':
    app.run(debug=True)

通过以上对缓存预热策略在高并发场景中的应用的详细阐述,包括策略分类、实现方式、热点数据识别、与其他优化策略结合、监控与调优等方面,以及不同业务场景下的应用案例,希望能帮助后端开发人员更好地设计和实施缓存预热策略,提升高并发场景下系统的性能和稳定性。