MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Redis WATCH命令的灵活使用场景

2024-07-281.9k 阅读

Redis WATCH命令基础介绍

Redis是一个开源的内存数据存储系统,以其高性能、丰富的数据结构和简单的操作命令而被广泛应用。其中,WATCH命令在处理并发操作时扮演着重要的角色。

WATCH命令用于在执行MULTI/EXEC事务块之前,监视一个或多个键。一旦这些被监视的键中的任意一个被其他客户端修改,后续的EXEC命令将失败,事务不会被执行。这确保了在事务执行期间,所依赖的数据没有被其他客户端修改,从而保证数据的一致性。

语法为:WATCH key [key ...],例如WATCH user:1:balance,这表示开始监视user:1:balance这个键。

并发控制场景

防止并发修改导致数据不一致

在一个多客户端同时操作数据的场景中,数据的一致性是一个关键问题。以银行转账为例,假设有两个账户A和B,A向B转账一定金额。如果没有合适的并发控制,可能会出现数据不一致的情况。

假设账户A有1000元,账户B有500元,A向B转账100元。在传统的非事务性操作下,如果两个客户端同时执行转账操作,可能会出现以下情况:

客户端1读取A的余额为1000元,客户端2也读取A的余额为1000元。然后客户端1从A账户减去100元,将A余额更新为900元,并向B账户增加100元,B余额变为600元。此时客户端2继续执行,它从A账户减去100元(认为A还是1000元),将A余额更新为900元,再向B账户增加100元,B余额变为700元。最终结果是A账户少了200元,而不是100元,这就导致了数据不一致。

使用WATCH命令可以有效避免这种情况。以下是Python代码示例,使用redis - py库:

import redis

r = redis.Redis(host='localhost', port=6379, db = 0)

# 初始化账户余额
r.set('account:A', 1000)
r.set('account:B', 500)


def transfer(from_account, to_account, amount):
    pipe = r.pipeline()
    while True:
        try:
            # 监视源账户
            pipe.watch(from_account)
            balance = int(pipe.get(from_account))
            if balance < amount:
                pipe.unwatch()
                return False
            pipe.multi()
            pipe.decrby(from_account, amount)
            pipe.incrby(to_account, amount)
            pipe.execute()
            return True
        except redis.WatchError:
            continue


transfer('account:A', 'account:B', 100)

在上述代码中,pipe.watch(from_account)监视了源账户account:A。每次循环开始时,先监视账户余额,然后读取余额检查是否足够转账。如果余额足够,开启事务并执行转账操作。如果在执行EXEC之前,account:A被其他客户端修改,就会抛出WatchError,然后重新尝试整个过程,从而保证了数据的一致性。

乐观锁机制的实现

WATCH命令本质上实现了一种乐观锁机制。乐观锁假设在大多数情况下,数据不会发生冲突,只有在提交修改时才检查数据是否被其他事务修改。

在分布式系统中,多个节点可能同时尝试修改同一个数据。例如,在一个分布式库存管理系统中,多个订单处理服务可能同时尝试减少库存。使用WATCH命令可以实现乐观锁,确保库存数据的一致性。

以下是Java代码示例,使用Jedis库:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;

public class InventoryManager {
    private Jedis jedis;

    public InventoryManager() {
        jedis = new Jedis("localhost", 6379);
        // 初始化库存
        jedis.set("product:1:stock", "100");
    }

    public boolean decreaseStock(String productId, int quantity) {
        while (true) {
            try {
                jedis.watch(productId + ":stock");
                int stock = Integer.parseInt(jedis.get(productId + ":stock"));
                if (stock < quantity) {
                    jedis.unwatch();
                    return false;
                }
                Transaction transaction = jedis.multi();
                transaction.decrBy(productId + ":stock", quantity);
                transaction.exec();
                return true;
            } catch (Exception e) {
                if (e instanceof redis.clients.jedis.exceptions.WatchError) {
                    continue;
                }
                e.printStackTrace();
                return false;
            }
        }
    }

    public static void main(String[] args) {
        InventoryManager manager = new InventoryManager();
        boolean result = manager.decreaseStock("product:1", 10);
        System.out.println("库存减少操作结果: " + result);
    }
}

在这段代码中,jedis.watch(productId + ":stock")监视库存键。每次尝试减少库存时,先读取库存数量检查是否足够。如果足够,开启事务并减少库存。如果在事务执行前库存被其他服务修改,就会捕获WatchError并重新尝试,实现了乐观锁机制。

数据完整性场景

保证关联数据的一致性

在一些应用中,数据之间存在关联关系。例如,在一个电商系统中,商品的库存和商品的销售记录是关联的。当库存减少时,相应的销售记录应该增加。

假设我们有两个键,product:1:stock表示商品1的库存,product:1:sales表示商品1的销售记录。当进行销售操作时,我们需要保证库存减少和销售记录增加这两个操作的一致性。

以下是Node.js代码示例,使用ioredis库:

const Redis = require('ioredis');
const redis = new Redis();

// 初始化数据
redis.set('product:1:stock', 100);
redis.set('product:1:sales', 0);

async function sellProduct(productId, quantity) {
    while (true) {
        const multi = redis.multi();
        try {
            await multi.watch(`product:${productId}:stock`);
            const stock = await multi.get(`product:${productId}:stock`);
            if (parseInt(stock) < quantity) {
                multi.unwatch();
                return false;
            }
            multi.multi();
            multi.decrBy(`product:${productId}:stock`, quantity);
            multi.incrBy(`product:${productId}:sales`, quantity);
            await multi.exec();
            return true;
        } catch (error) {
            if (error.name === 'WatchError') {
                continue;
            }
            console.error(error);
            return false;
        }
    }
}

sellProduct(1, 10).then(result => {
    console.log('销售操作结果: ', result);
});

在上述代码中,multi.watch(product:${productId}:stock)监视商品库存键。在每次销售操作时,先检查库存是否足够。如果足够,通过事务同时减少库存和增加销售记录。如果库存被其他操作修改,EXEC会失败并捕获WatchError,然后重新尝试,保证了关联数据的一致性。

防止部分更新导致数据无效

在某些情况下,对数据的更新需要是原子性的,否则可能导致数据无效。例如,在一个用户信息管理系统中,用户的基本信息和用户的权限信息是关联的。当修改用户权限时,需要同时更新用户的基本信息中的一些标志位,以确保数据的有效性。

假设我们有两个键,user:1:info存储用户基本信息,user:1:permissions存储用户权限。当提升用户权限时,我们需要同时修改user:1:info中的权限等级标志。

以下是C#代码示例,使用StackExchange.Redis库:

using StackExchange.Redis;
using System;

class UserManager
{
    private ConnectionMultiplexer redis;
    private IDatabase db;

    public UserManager()
    {
        redis = ConnectionMultiplexer.Connect("localhost:6379");
        db = redis.GetDatabase();
        // 初始化数据
        db.StringSet("user:1:info", "basic_info;low_permission");
        db.StringSet("user:1:permissions", "low");
    }

    public bool UpgradeUserPermissions(int userId)
    {
        while (true)
        {
            var tran = db.CreateTransaction();
            var watchKey = $"user:{userId}:permissions";
            try
            {
                tran.AddCondition(Condition.HashEqual(watchKey, "", ""));
                var info = db.StringGet($"user:{userId}:info");
                var permissions = db.StringGet($"user:{userId}:permissions");
                if (permissions.ToString() != "low")
                {
                    return false;
                }
                tran.StringSet($"user:{userId}:info", info.ToString().Replace("low_permission", "high_permission"));
                tran.StringSet($"user:{userId}:permissions", "high");
                if (tran.Execute())
                {
                    return true;
                }
            }
            catch (RedisServerException ex) when (ex.Message.Contains("WATCHED"))
            {
                continue;
            }
        }
    }

    public static void Main()
    {
        UserManager manager = new UserManager();
        bool result = manager.UpgradeUserPermissions(1);
        Console.WriteLine("用户权限提升结果: " + result);
    }
}

在这段代码中,tran.AddCondition(Condition.HashEqual(watchKey, "", ""))相当于监视了user:1:permissions键。在每次尝试提升用户权限时,先检查当前权限是否为低权限。如果是,通过事务同时修改用户基本信息和权限信息。如果在事务执行前权限被其他操作修改,事务执行会失败并捕获RedisServerException(包含WATCHED关键字),然后重新尝试,防止了部分更新导致数据无效的情况。

缓存更新场景

缓存与数据库一致性维护

在使用Redis作为缓存的系统中,保证缓存数据和数据库数据的一致性是一个常见的挑战。当数据库中的数据发生变化时,需要及时更新缓存。然而,在高并发环境下,可能会出现缓存更新不及时或不一致的问题。

假设我们有一个博客系统,文章内容存储在数据库中,同时在Redis中缓存。当文章内容被修改时,需要更新数据库并同时更新缓存。

以下是PHP代码示例,使用Predis库:

<?php
require_once 'Predis/Autoloader.php';
Predis\Autoloader::register();

$redis = new Predis\Client();

// 初始化数据
$redis->set('article:1:content', '初始文章内容');
// 模拟从数据库读取文章内容
function getArticleFromDB($articleId) {
    // 实际应用中从数据库查询
    return '数据库中的文章内容';
}

function updateArticle($articleId, $newContent) {
    while (true) {
        try {
            $redis->watch('article:'. $articleId. ':content');
            // 先更新数据库
            // 实际应用中执行数据库更新操作
            // 这里简单模拟
            echo "更新数据库文章内容为: ". $newContent. "\n";
            $redis->multi();
            $redis->set('article:'. $articleId. ':content', $newContent);
            $redis->execute();
            return true;
        } catch (Predis\Command\CommandException $e) {
            if ($e->getMessage() === 'WATCHED variable changed.' ) {
                continue;
            }
            throw $e;
        }
    }
}

updateArticle(1, '新的文章内容');
?>

在上述代码中,$redis->watch('article:'. $articleId. ':content')监视缓存中的文章内容键。在更新文章时,先执行数据库更新操作(这里简单模拟),然后通过事务更新缓存。如果在事务执行前缓存被其他操作修改,就会捕获CommandException(消息为WATCHED variable changed.),然后重新尝试,保证了缓存和数据库数据的一致性。

分布式缓存更新同步

在分布式系统中,多个节点可能都有自己的缓存副本。当数据发生变化时,需要确保所有节点的缓存都能及时更新,以避免数据不一致。

假设我们有一个分布式电商系统,多个服务器节点都缓存了商品价格信息。当商品价格在数据库中被修改时,需要同步更新所有节点的缓存。

以下是Go代码示例,使用go - redis库:

package main

import (
    "fmt"
    "github.com/go - redis/redis/v8"
    "context"
)

var ctx = context.Background()

func main() {
    rdb := redis.NewClient(&redis.Options{
        Addr:     "localhost:6379",
        Password: "",
        DB:       0,
    })

    // 初始化数据
    rdb.Set(ctx, "product:1:price", 100, 0)

    // 模拟从数据库读取商品价格
    func getProductPriceFromDB(productId int) float64 {
        // 实际应用中从数据库查询
        return 120.0
    }

    func updateProductPrice(productId int, newPrice float64) bool {
        for {
            pipe := rdb.Pipeline()
            pipe.Watch(ctx, fmt.Sprintf("product:%d:price", productId))
            price, err := pipe.Get(ctx, fmt.Sprintf("product:%d:price", productId)).Float64()
            if err!= nil && err!= redis.Nil {
                return false
            }
            pipe.Multi()
            pipe.Set(ctx, fmt.Sprintf("product:%d:price", productId), newPrice, 0)
            _, err = pipe.Exec(ctx)
            if err == nil {
                return true
            }
            if _, ok := err.(*redis.WatchError); ok {
                continue
            }
            return false
        }
    }

    updateProductPrice(1, 120.0)
}

在这段代码中,pipe.Watch(ctx, fmt.Sprintf("product:%d:price", productId))监视商品价格的缓存键。在更新商品价格时,先读取当前缓存价格,然后通过事务更新缓存。如果在事务执行前缓存被其他节点修改,就会捕获WatchError,然后重新尝试,实现了分布式缓存更新的同步,保证了各个节点缓存数据的一致性。

计数器与限流场景

防止计数器超调

在一些应用中,计数器被广泛使用,例如记录网站的访问量、API的调用次数等。在高并发环境下,如果没有合适的控制,计数器可能会出现超调的情况。

假设我们有一个API调用次数计数器,限制每个用户每小时最多调用100次。在高并发请求下,如果多个请求同时检查计数器并更新,可能会导致某个用户在一小时内调用次数超过100次。

以下是Python代码示例,使用redis - py库:

import redis
import time

r = redis.Redis(host='localhost', port=6379, db = 0)

def is_api_call_allowed(user_id):
    key = f'user:{user_id}:api_calls'
    while True:
        try:
            r.watch(key)
            calls = int(r.get(key) or 0)
            if calls >= 100:
                r.unwatch()
                return False
            pipe = r.pipeline()
            pipe.multi()
            pipe.incr(key)
            pipe.execute()
            return True
        except redis.WatchError:
            continue


# 模拟API调用
user_id = 1
for _ in range(105):
    if is_api_call_allowed(user_id):
        print(f'用户{user_id} API调用成功')
    else:
        print(f'用户{user_id} API调用次数已达限制')
    time.sleep(0.1)

在上述代码中,r.watch(key)监视计数器键。每次尝试调用API时,先读取当前调用次数并检查是否超过限制。如果未超过,通过事务增加调用次数。如果在事务执行前计数器被其他请求修改,就会捕获WatchError并重新尝试,防止了计数器超调。

分布式限流实现

在分布式系统中,限流是保证系统稳定性的重要手段。使用Redis的WATCH命令可以实现分布式限流,确保在整个系统范围内,某个资源的访问频率不超过设定的阈值。

假设我们要对某个接口进行限流,限制每秒最多处理100个请求。我们可以在每个请求到达时,通过Redis检查当前请求数是否超过阈值。

以下是Java代码示例,使用Jedis库:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;

public class RateLimiter {
    private Jedis jedis;
    private String limitKey;
    private int limit;

    public RateLimiter(String limitKey, int limit) {
        this.jedis = new Jedis("localhost", 6379);
        this.limitKey = limitKey;
        this.limit = limit;
    }

    public boolean isRequestAllowed() {
        while (true) {
            try {
                jedis.watch(limitKey);
                long currentCount = Long.parseLong(jedis.get(limitKey) == null? "0" : jedis.get(limitKey));
                if (currentCount >= limit) {
                    jedis.unwatch();
                    return false;
                }
                Transaction transaction = jedis.multi();
                transaction.incr(limitKey);
                transaction.expire(limitKey, 1);
                transaction.exec();
                return true;
            } catch (Exception e) {
                if (e instanceof redis.clients.jedis.exceptions.WatchError) {
                    continue;
                }
                e.printStackTrace();
                return false;
            }
        }
    }

    public static void main(String[] args) {
        RateLimiter limiter = new RateLimiter("api:request_count", 100);
        for (int i = 0; i < 110; i++) {
            boolean result = limiter.isRequestAllowed();
            System.out.println("请求 " + i + " 是否允许: " + result);
        }
    }
}

在这段代码中,jedis.watch(limitKey)监视请求计数器键。每次请求到达时,先读取当前请求数并检查是否超过限制。如果未超过,通过事务增加请求数并设置过期时间为1秒(确保每秒重新计数)。如果在事务执行前计数器被其他请求修改,就会捕获WatchError并重新尝试,实现了分布式限流。

总结

Redis的WATCH命令在并发控制、数据完整性维护、缓存更新以及计数器与限流等多个场景中都有着重要的应用。通过合理使用WATCH命令,结合MULTI/EXEC事务机制,我们能够有效地解决分布式系统和高并发环境下的数据一致性问题。无论是在金融领域的交易处理,还是在电商系统的库存管理,亦或是API的限流控制,WATCH命令都能为我们提供可靠的解决方案。在实际应用中,需要根据具体的业务需求和系统架构,灵活运用WATCH命令,以确保系统的稳定性和数据的准确性。同时,不同编程语言和Redis客户端库对WATCH命令的使用方式略有差异,开发者需要熟悉相应的库文档,正确地实现相关功能。