循序渐进 Redis 分布式锁(以及何时不用它)

虚幻大学 xuhss 161℃ 0评论

? 优质资源分享 ?

学习路线指引(点击解锁) 知识定位 人群定位
? Python实战微信订餐小程序 ? 进阶级 本课程是python flask+微信小程序的完美结合,从项目搭建到腾讯云部署上线,打造一个全栈订餐系统。
?Python量化交易实战? 入门级 手把手带你打造一个易扩展、更安全、效率更高的量化交易系统

场景

假设我们有个批处理服务,实现逻辑大致是这样的:

  1. 用户在管理后台向批处理服务投递任务;
  2. 批处理服务将该任务写入数据库,立即返回;
  3. 批处理服务有启动单独线程定时从数据库获取一批未处理(或处理失败)的任务,投递到消息队列中;
  4. 批处理服务启动多个消费线程监听队列,从队列中拿到任务并处理;
  5. 消费线程处理完成(成功或者失败)后修改数据库中相应任务的状态;

流程如图:

fce1a21378f16e45203394ed4743a3fd - 循序渐进 Redis 分布式锁(以及何时不用它)

现在我们单独看看上图中虚线框中的内容(3~6):批处理服务从数据库拉取任务列表投递到消息队列。

生产环境中,为了高可用,都会部署至少两台批处理服务器,也就是说至少有两个进程在执行虚线框中的流程。

有什么问题呢?

假设这两个进程同时去查任务表(这是很有可能的),它俩很可能会得到同一批任务列表,于是这批任务都会入列两次。

当然,这不是说一个任务入列两次就一定会导致任务被重复执行——我们可以通过多引入一个状态值来解决此问题。

消费者线程从队列中获取到任务后,再次用如下 SQL 更新任务状态:

-- status:1-待处理;2-已入列;3-处理中;4-失败待重试;5-彻底失败(不可重试);

update tasks set status=3 where status=2 and id=$id;

由于 where 条件有 status=2,即只有原先状态是“已入列”的才能变成“处理中”,如果多个线程同时拿到同一个任务,一定只有一个线程能执行成功上面的语句,进而继续后续流程(其实这就是通过数据库实现的简单的分布式锁——乐观锁)。

不过,当定时进程多了后,大量的重复数据仍然会带来性能等其他问题,所以有必要解决重复入列的问题。

有个细节:请注意上图中步骤 5、6,是先改数据库状态为“已入列”,再将消息投递到消息队列中——这和常规逻辑是反过来的。

能否颠倒 5 和 6 的顺序,先入列,再改数据库状态呢?

不能。从逻辑上来说确实应该如此,但它会带来问题。消费线程从队列中拿到任务后,会执行如下 SQL 语句:

update tasks set status=3 where status=2 and id=$id;

这条 SQL 依赖于前面(第 5 步)产生的状态值,所以它要求在执行该语句的时候,第 5 步的 SQL 语句(将状态改为“已入列”)一定已经执行完了。如果将 5 和 6 颠倒(先入列,再改状态值),就有可能出现下图的执行顺序,导致消费者线程修改状态失败,进而执行不下去:

cf7fd96e9dc477ccf4ec83e34e75ad02 - 循序渐进 Redis 分布式锁(以及何时不用它)

上图中,任务入列后立即被消费线程获取到并去修改数据库,而此时定时线程的 SQL 可能还没执行(可能网络延迟),这就出问题了。

定时线程先将状态改为“已入列”带来的问题是,如果改状态后(入列前)进程挂了,会导致任务一直处于已入列状态(但实际上未入列),所以还需要搭配其它的超时重试机制。

上图虚线框中那段逻辑在并发原语中有个专门名称叫“临界区”——我们要做的就是让多个操作者(进程、线程、协程)必须一个一个地(而不能一窝蜂地)去执行临界区内部的逻辑,手段就是加锁:

var lock = newLock()

// 加锁
lock.lock()

// 执行临界区的逻辑

// 释放锁
lock.unlock()

所谓锁,就是多个参与者(进程、线程)争抢同一个共享资源(术语叫“信号量”),谁抢到了就有资格往下走,没抢到的只能乖乖地等(或者放弃)。锁的本质是两点:

  1. 它是一种共享资源,对于多方参与者来说,只有一个,就好比篮球场上只有一个篮球,所有人都抢这一个球;
  2. 对该资源的操作(加锁、解锁)是原子性的。虽然大家一窝蜂都去抢一个球,但最终这个球只会属于某一个人,不可能一半在张三手上,另一半在李四手上。只有抢到球的一方才可以执行后续流程(投篮),另一方只能继续抢;

在单个进程中,以上两点很容易实现:同一个进程中的线程之间天然是共享进程内存空间的;原子性也直接由 CPU 指令保证。所以单个进程中,我们直接用编程语言提供的锁即可。

进程之间呢?

进程之间的内存空间是独立的。两个进程(可能在两台不同的物理机上)创建的锁资源自然也是独立的——这就好比两个篮球场上的两个篮球之间毫不相干。

那怎样让两个篮球场上的两队人比赛呢?只能让他们去同一个地方抢同一个球——这在编程中叫“分布式锁”。

有很多实现分布式锁的方案(关系数据库、zookeeper、etcd、Redis 等),本篇单讲用 Redis 来实现分布式锁。

小试牛刀

之所以能用 Redis 实现分布式锁,依赖于其三个特性:

  1. Redis 作为独立的存储,其数据天然可以被多进程共享;
  2. Redis 的指令是单线程执行的,所以不会出现多个指令并发地读写同一块数据;
  3. Redis 指令是纯内存操作,速度是微妙级的(不考虑网络时延),性能足够高;

有些人一想到“单线程-高性能”就条件反射地回答 IO 多路复用,其实 Redis 高性能最主要就是纯内存操作。

Redis 分布式锁的大体调用框架是这样的:

b92c36bc1c8f3a598d13f0c1abe2a498 - 循序渐进 Redis 分布式锁(以及何时不用它)

多个进程的多个线程争抢同一把 Redis 锁。

说到 Redis 分布式锁,大部分人都会想到 setnx 指令:

// setnx 使用方式
SETNX key value

意思是:如果 key 不存在(Not eXists),则将 key 设置为 value 并返回 1,否则啥也不做并返回 0——也就是说, key 只会被设置一次,利用这个特性就可以实现锁(如果返回 1 表示加锁成功,0 则说明别人已经加锁了,本次加锁失败)。

我们写下伪代码:

// 获取 redis client 单例
var redis = NewRedisClient(redisConf);

// 通过 SETNX 指令加锁
func lock(string lockKey) bool {
    result = redis.setnx(lockKey, 1);
    return bool(result);
}

// 通过 DEL 指令解锁
func unlock(string lockKey) {
    redis.del(lockKey);
}

上面的定时任务进程中这样使用:

var lockKey = "batch:task:list"

// 上锁
if (!lock(lockKey)) {
    // 获取锁失败,直接返回
    return false;
}

try {
    // 查询数据库获取待处理任务列表
    // 更新任务状态
    // 入列
} finally {
    // 解锁
    unlock(lockKey);    
}

很简单!半小时搞定,上线!

第一次懵逼

上线没跑几天就出问题了:任务无缘无故地不执行了,消息队列中很长时间没接收到消息了。

分析了半天,我们发现 Redis 中一直存在 batch:task:list 这条记录,没人去删除它!

盯着代码我们突然发现问题所在:这个 key 压根没有过期时间!也就是说,如果程序不 DEL 它就永远存在。

估计某进程在执行 unlock 之前崩溃了(或者哪个愣头青执行了 kill -9 ?),或者 unlock 时发生了网络问题,或者 Redis 宕机了?总之 DEL 没执行,于是这个锁永远得不到释放!

好办,加上过期时间呗:

...

// 通过 SETNX 指令加锁
// 加上过期时间,单位毫秒
func lock(string lockKey, int ttl = 3000) bool {
    // 加锁
    result = redis.setnx(lockKey, 1);
    // 设置过期时间(毫秒)
    redis.pexpire(lockKey, ttl);

    return bool(result);
}

...

这段代码有什么问题呢?

这里通过两次网络请求执行了两条 Redis 指令:setnx 设置 KV,expire 设置超时时间——我们前面说锁操作必须具备原子性,但这两条操作谁也不能保证要么都成功要么都失败啊。假如第一条指令(setnx)执行成功了,但 expire 由于网络原因或者进程崩溃导致执行失败了呢?此时同样会出现上面那个懵逼的问题啊。

我们可以用 Lua 脚本实现 setnx 和 expire 操作的原子性,不过 Redis 2.6.12 版本后可以用 SET 指令搞定:

// 2.6.12 后的 SET 指令格式
// 现在的 SET 指令相当强大也相当复杂,可以替代 SETNX, SETEX, PSETEX, GETSET, 此处只写出跟分布式锁有关的
// 其中两个可选参数:
// -- NX 表示 Not eXists,就是 SETNX 的意思;
// -- PX 是 PEXPIRE 的意思,表示设置 key 的过期时间(毫秒);

SET key value [NX] [PX milliseconds]

改下 Lock 代码:

// 加锁
func lock(string lockKey, int ttl = 3000) bool {
    // Set 函数参数对应上面的命令格式
    result = redis.set(lockKey, 1, "NX", "PX", ttl);

    return bool(result);
}

如此,加了过期时间防止锁无法释放,还保证了加锁操作的原子性,妥了,上线!

第二次懵逼

第二次上线没多久又出现了灵异事件:偶尔会出现一批任务重复入列——敢情这锁加了个寂寞?

各种打日志,终于发现了端倪:有个进程加锁 3.5 秒后才解锁,而且解锁成功了——但我们设置的锁超时时间是 3 秒啊!

也就是说,这个线程解的是别的线程的锁!

// 通过 DEL 指令解锁
// 这里直接调 Redis 的 DEL 指令删除 lockKey,并没有判断该 lockKey 的值是不是本进程设置的
// 所以在有 TTL 的情况下,删的可能是别的线程加的锁
func unlock(string lockKey) {
    redis.del(lockKey);
}

和进程内的本地锁不同的是,Redis 分布式锁加入超时机制后,锁的释放就存在两种情况:

  1. 加锁者主动释放;
  2. 超时被动释放;

所以解锁(DEL)之前需要判断锁是不是自己加的,方法是在加锁的时候生成一个唯一标识。之前我们 SET key value 时 value 给的是固定值 1,现在我们换成一个随机值:

// Redis 分布式锁
// 封装成类
// 该类实例不具备线程安全性,不应跨线程使用
class Lock {
    private redis;
    private name;
    private token;
    private ttl;
    private status;

    const ST\_UNLOCK = 1;
    const ST\_LOCKED = 2;
    const ST\_RELEASED = 3;

    public function Lock(Redis redis, string name, int ttl = 3000) {
        this.redis = redis;
    this.name = name;
    this.token = randStr(16);// 生成 16 字节随机字符串
    this.ttl = ttl;
    this.status = self::ST\_UNLOCK;
    }

    // 加锁
    public function lock() bool {
    if (this.status != self::ST\_UNLOCK) {
        return false;
    }

    // 使用 SET 命令加锁
    // value 不再传 1,而是设置成构造函数中生成的随机串
    try {
        result = redis.set(this.name, this.token, "NX", "PX", this.ttl);
        if (bool(result)) {
            this.status = self::ST\_LOCKED;
            return true;
        }
    } catch (Exception e) {
        return false;
    }

    return false;
    }

    // 解锁
    public function unlock() {
    if (this.status != self::ST\_LOCKED) {
        return;
    }

    // 执行 DEL 之前需要用 GET 命令判断 KEY 的值是不是当前的 token
    // 由于需要执行 GET 和 DEL 两条指令,而锁操作必须保证原子性,需要用 Lua 脚本
    // 脚本中通过 redis.call() 执行 Redis 命令
    // 注意 Lua 脚本数组下标从 1 开始
    // 这段脚本的意思是:
    // 如果 key 的值是 token,则 DEL key,否则啥也不做
    var lua = "
 if (redis.call('get', KEYS[1]) == ARGV[1]) then
 redis.call('del', KEYS[1]);
 end
 return 1;
 ";

    // 调 Redis 的 EVEL 指令执行 Lua 脚本
    // EVAL 指令格式:
    // EVEL script numkeys key1,key2,arg1,arg2... 
    // -- script: Lua 脚本
    // -- numkeys: 说明后面的参数中,有几个是 key,这些 key 后面的都是参数
    // 比如:EVAL "redis.call('set', KEYS[1], ARGV[1])" 1 mykey hello
    // 等价于命令 SET mykey hello
    // 参见:https://redis.io/commands/eval/
    redis.eval(lua, 1, this.name, this.token);
    this.status = self::ST\_RELEASED;
    }
}

业务调用:

lock = new Lock(redis, "batch:task:list");

try {
    if (!lock.lock()) {
    return false;
    }

    // 加锁成功,执行业务
} finally {
    lock.unlock();
}

上面这段代码实现了:

  1. 加锁的时候设置了过期时间,防止进程崩溃而导致锁无法释放;
  2. 解锁的时候判断了当前的锁是不是自己加的,防止释放别人的锁;
  3. 加锁和解锁操作都具备原子性;

这段代码已经是生产可用了,第三次上线。

不过,还是有些优化需要做的。

优化一:锁等待

上面的 lock() 方法中,如果获取锁失败则直接返回 false,结束执行流,这可能不能满足某些业务场景。

在本地锁场景中,如果获取锁失败,线程会进入阻塞等待状态——我们希望分布式锁也能提供该功能。

我们在加锁失败时增加重试功能:

class Lock {
    // 重试间隔:1 秒
    const RETRY_INTERVAL = 1000;
    // ...
    // 重试次数(包括首次)
    private retryNum;

    // retryNum: 默认只执行一次(不重试)
    public function Lock(Redis redis, string name, int ttl = 3000, int retryNum = 1) {
        ...
    // 做下防御
    if (retryNum < 0 || retryNum > 20) {
        retryNum = 1;
    }
    this.retryNum = retryNum;
    }

    // 加锁
    public function lock() bool {
        if (this.status != self::ST_UNLOCK) {
        return false;
    }

        // 使用 SET 命令加锁
    // 加入重试机制
    for (i = 0; i < this.retryNum; i++) {
        try {
                result = redis.set(this.name, this.token, "NX", "PX", this.ttl);
        if (bool(result)) {
            // 加锁成功,返回 
            this.status = self::ST_LOCKED;
            return true;
        }
        } catch (Exception e) {
            }

        // 加锁失败了,等待一定的时间后重试
        // 当前线程/协程进入休眠
        sleep(self::RETRY_INTERVAL);
    }

    return false;
    }
}

优化二:锁超时

我们再回头看看上面的加锁逻辑,其核心代码如下:

public function lock() bool {
   // ...
    result = redis.set(this.name, this.token, "NX", "PX", this.ttl);
    if (bool(result)) {
        // 加锁成功,返回 
        this.status = self::ST_LOCKED;
    return true;
    }
    //...
}

这段代码有没有什么问题呢?

想象如下的加锁场景:

// 锁超时时间是 2 秒
var lock = new Lock(redis, name, 2000);

if (lock.lock()) {
    // 加锁成功,加锁用时 2.5 秒
    try {
    // 执行业务逻辑
    } finally {
    // 解锁
    lock.unlock();
    }
}

如上,我们创建一个有效期 2 秒的锁,然后调 Redis 命令加锁,该过程花了 2.5 秒(可能网络抖动)。

对于本线程来说,得到加锁成功的返回值,继续往下执行。

但此时该 lockKey 在 Redis 那边可能已经过期了,如果此时另一个线程去拿锁,也会成功拿到锁——如此锁的作用便失效了。

9c512b64a4ab0363fda47b153c3fdcfe - 循序渐进 Redis 分布式锁(以及何时不用它)

所以,在 lock() 方法中,调 Redis 上锁成功后,需要判断上锁用时,如果时间超过了锁的有效期,则应视为上锁无效,如果有重试机制,则重试:

class Lock {
    // 加锁
    public function lock() bool {
        if (this.status != self::ST_UNLOCK) {
        return false;
    }

    for (i = 0; i < this.retryNum; i++) {
        try {
            // 上锁之前,保存当前毫秒数
        var startTime = getMillisecond();
        // 上锁
        result = redis.set(this.name, this.token, "NX", "PX", this.ttl);
        // 上锁后,计算使用的时间
        var useTime = getMillisecond() - startTime;

        // 加锁成功条件:Redis 上锁成功,且所用的时间小于锁有效期
        if (bool(result) && useTime < this.ttl) {
                // 加锁成功,返回 
            this.status = self::ST_LOCKED;
            return true;
        }
        } catch (Exception e) {}

        // 加锁失败了,等待一定的时间后重试
        // 当前线程/协程进入休眠
        sleep(self::RETRY_INTERVAL);
        }

    return false;
    }
}

如上,在判断条件中增加了加锁用时的判断。

这段代码还有问题吗?

有的。

我们用 Redis 的 SET NX 命令加锁,该命令如果发现 key 已经存在,则直接返回 0,加锁失败。

在上面的失败重试逻辑中,如果是因为加锁用时超限导致的失败(锁有效期是 2 秒,结果加锁操作用了 2.5 秒),此时我们并不能切确知道在 Redis 那边该 key 是否真的已经失效了,如果没有失效(比如来去网络用时各 1.24 秒,此时该 key 并没有失效),那么下一次的重试会因 SET NX 的机制而失败。

所以我们不能用 SET NX 加锁,只能用普通的 SET + Lua 脚本来实现:

class Lock {
 // 加锁
 public function lock() bool {
 if (this.status != self::ST\_UNLOCK) {
 return false;
 }

 // 加锁的 Lua 脚本
 // 注意 Lua 中的注释不是用 // 或者 /**/,而是用 --
 // 参数说明:
 // KEYS[1]: lockKey
 // ARGV[1]: token
 // ARGV[2]: ttl 毫秒
 var lua = "
 local val = redis.call('get', KEYS[1]);
 if (not val) then
 -- 没有设置,则直接设置
 return redis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2]);
 else
 -- 存在,则比较 val 是否等于 token
 if (val == ARGV[1] ) then
 -- 该 key 就是当前线程设置的
 -- 延长其 TTL
 return redis.call('pexpire', KEYS[1], ARGV[2]);
 else
 -- 其他线程上的锁
 return 0;
 end
 end
 ";

 for (i = 0; i < this.retryNum; i++) {
 // 加锁逻辑同上
 }

 return false;
 }
}

如此,便解决了加锁超时导致的竞态问题——但只解决了一半。

设想这样的场景:

进程 A 加了一个有效期 5 秒的锁,加锁成功后执行业务逻辑,业务逻辑执行耗时 10 秒——就是说,在业务逻辑执行到差不多一半的时候锁就失效了,此时别的进程就可以抢到锁了,这就会导致竞态问题。

有两种解决方案:

  1. 设置个较长的过期时间。这是最简单的(而且也很有效)。比如我们预估 99% 的处理时间不超过 2 秒,则将锁有效期设置为 10 秒。该方案最大的缺点是一旦进程崩溃导致无法主动释放锁,就会导致其他进程在很长一段时间内(如 10 秒)无法获得锁,这在某些场景下可能是非常严重的。
  2. 搞个定时任务线程,定时延长锁的有效期。

方案二伪代码如下:

// 带 Refresh 版本的分布式锁
class Lock {
    private redis;
    private name;
    private ttl;
    private token;
    private retryNum;
    private status;
    // 定时器
    private timer;

    // 锁状态:1 未加锁;2 已加锁;3 已释放
    const ST\_UNLOCK = 1;
    const ST\_LOCKED = 2;
    const ST\_RELEASED = 3;

    // 刷新状态:
    // 4 刷新成功;
    // 5 非法(key 不存在或者不是本线程加的锁)
    // 6 刷新失败(Redis 不可用)
    const RF\_SUC = 4;
    const RF\_INVALID = 5;
    const RF\_FAIL = 6;

    // 构造函数
    public function Lock(Redis redis, string name, int ttl = 2000, int retryNum = 1) {
        ...
    }

    // 加锁
    // 加锁成功后启动定时器
    public function lock() bool {
        if (this.status != self::ST\_UNLOCK) {
            return false;
    }

    // 加锁的 Lua 脚本,同前面的
    lua = "...";

    for (i = 0; i < this.retryNum; i++) {
        var startTime = getMillisecond();
        try {
            // 执行 Lua 脚本上锁
        result = this.redis.eval(lua, 1, this.name, this.token, this.ttl);
        var useTime = getMillisecond() - startTime;

        if (bool(result) && useTime < this.ttl) {
            // 加锁成功
            this.status = self::ST\_LOCKED;
            // 启动定时器
            this.tick();

            return true;
        }
        } catch (Exception e) {
        // Redis 不可用
        }

        // 失败重试
        sleep(RETRY_INTERVAL);
        }

    return false;
    }

    // 启动定时器,定时刷新过期时间
    private function tick() {
        this.timer = startTimerInterval(
        this.ttl / 3,
        function () {
            result = this.refresh();
        if (result == self::RF\_INVALID) {
            // key 不存在,或者该锁被其他线程占用
            // 停掉定时器
            this.timer.stop();
        }
        }
    );
    }

    // 释放锁
    // 需要停掉定时器
    public function unlock() {
        if (this.status != self::ST\_LOCKED) {
        return;
    }

        // 释放锁的 Lua 脚本,同前
    var lua = "...";

    try {
        this.redis.eval(lua, 1, this.name, this.token);
        } catch (Exception e) {} finally {
        this.status = self::ST\_RELEASED;
        // 停掉定时器
        this.timer.stop();
    }
    }

    // 刷新锁过期时间
    private function refresh() int {
        if (this.status != self::ST\_LOCKED) {
        return self::RF\_INVALID;
    }

    var lua = "
 -- key 存在而且其值等于 token 才刷新过期时间
 if (redis.call("get", KEYS[1]) == ARGV[1]) then
 return redis.call("pexpire", KEYS[1], ARGV[2])
 else
 return 0
 end
 ";

        try {
        result = this.redis.eval(lua, 1, this.name, this.token, this.ttl);
        if (result == 0) {
            // key 不存在或者是别人加的锁
        return self::RF\_INVALID;
        } else {
            // 刷新成功
        return self::RF\_SUC;
        }
    } catch (Exception e) {
        // Redis 不可用
        return self::RF\_FAIL;
    }
    }
}

如上,加锁成功后创建一个单独的定时器(独立的线程/协程)刷新锁的 TTL,只要锁没被主动释放(而且进程没有崩溃),就会不停地续命,保证不会过期。此时,我们就能在加锁时选择一个比较小的过期时间(比如 2 秒),一旦进程崩溃,其他进程也能较快获得锁。

上面定时器时间为何选择 ttl/3 呢?

假设锁过期时间(ttl)为 6 秒,由上面 lock() 函数逻辑可知,加锁耗时不可能超过 6 秒(超过就会判定为加锁失败)。我们假设某次加锁耗时比 6 秒小那么一丢丢(也就是近似 6 秒),接下来什么时候发起第一次刷新才能保证 Redis 那边的 key 不过期呢?极端情况下必须立即刷新(如果考虑刷新时的网络时延,就算立即刷新也不一定能保证)。

不过我们考虑的是一般情况。我们可以认为 6 秒耗时都花在网络上(Redis 本身执行时间可以忽略不计),然后再近似认为这 6 秒被来去均摊,各花 3 秒,因而当我们接收到 Redis 的响应时,该 key 在 Redis 那边的 TTL 已经用掉了一半,所以定时间隔必须小于 ttl/2,再将刷新时的网络时延考虑进去,取 ttl/3 或者 ttl/4 比较合适。

就算有了 refresh 机制,也不能说是万无一失了。

考虑 Redis 宕机或者网络不通的情况。

假设线程 A 加锁(ttl=2s)后不久 Redis 就宕机了(或者该业务服所在网络发生分区导致网络不通),宕机期间 refresh 会失败。2s 后 Redis 重启恢复正常,此时线程 A 设置的那个 key 已经过期了,其他线程就能够获取锁,如果线程 A 的执行时间超过 2s,就和其他线程产生竞态。

refresh 机制解决不了该问题,要用其他手段来保证 Redis 和锁的高可用性,如 Redis 集群、官方提供的 Redlock 方案等。

可重入性

一些语言(如 java)内置可重入锁,一些语言(如 go)则不支持。

我们通过代码说下可重入锁是什么:

var lock = newLock();

// 在同一个线程中, foo() 调 bar()
// 函数 foo() 和 bar() 都在竞争同一把锁

function foo() {
    lock.lock();
    ...
    bar();
    ...
    lock.unlock();
}

function bar() {
    lock.lock();
    // do something
    lock.unlock();
}

如上,同一个线程中 foo() 调 bar(),由于 foo() 调 bar() 之前加了锁,因而 bar() 中再竞争该锁时就会一直等待,导致 bar() 函数执行不下去,进而导致 foo() 函数无法解锁,于是造成死锁。

如果上面的 lock 是一把可重入锁,bar() 就会加锁成功。

实现原理是:加锁的 lock() 方法中会判断当前这把锁被哪个线程持有,如果持有锁的线程和现在抢锁的线程是同一个线程,则视为抢锁成功(这锁本来就是被它持有的嘛,抢啥呢)。

由于 foo() 和 bar() 是在同一个线程中调用的,所以他俩都会加锁成功。

锁是加成功了,解锁呢?bar() 中的 unlock() 要怎么处理呢?直接把锁释放掉?不行啊,foo() 中的 unlock() 还没执行呢,bar() 虽然用完锁了,但 foo() 还没用完啊,你 bar() 三下五除二把锁给释放了,其他线程拿到锁,不就和 foo() 中代码构成竞态了吗?

所以可重入锁采用信号量的思想,在内部维持了两个属性:threadid 表示哪个线程持有锁;lockNum 表示持有线程加了几次锁。同一个线程,每 lock() 一次 lockNum 加 1,每 unlock() 一次 lockNum 减 1,只有 lockNum 变成 0 了才表示这把锁真正释放了,其他线程才能用。

原理讲完了,但你不觉得上面的代码很怪吗?

既然 foo() 已经加锁了,bar() 为何还要加同一把锁呢?

在某些情况下这样做可能是有原因的,但大多数情况下,这个问题可以从设计上解决,而不是非要引入可重入锁。

比如我们可以将 bar() 声明为非线程安全的,将加锁工作交给调用者,同时限制 bar() 的可见域,防止其被滥用。

go 语言不支持可重入锁的理由就是:当你的代码需要用可重入锁了,你首先要做的是审视你的设计是否有问题。

可重入锁的便捷性可能会带来代码设计上的问题。

所以本篇并不打算去实现可重入能力——虽然实现起来并不难,无非是将上面讲的原理在 Redis 上用 Lua 脚本实现一遍而已。

不是银弹

有了锤子,全世界都是钉子。

分布式锁看似是颗银弹,但有些问题用其他方案会比分布式锁要好。

我们看看秒杀扣库存的例子。

网上很多讲分布式锁的文章都拿秒杀扣库存来举例。

秒杀场景为了应对高并发,一般会将秒杀商品库存提前写入到 Redis 中,我们假设就用字符串类型存商品库存:

// Redis 命令,设置商品 id=1234 的库存 100 件
set seckill.stock.1234 100

另外一个用户只能参加一次秒杀,所以扣库存前需要判断该用户是否已经参加了(防止羊毛党薅羊毛)。

扣库存逻辑是这样的:

var stockKey = "seckill.stock.1234";
var userKey = "seckill.ordered.users";
var lock = new Lock(redis, "seckill");

// 此处省略活动时间的判断

try {
    // 加分布式锁
    lock.lock();

    // 判断库存
    var stockNum = redis.get(stockKey);
    if (stockNum <= 0) {
        // 库存不足
        return false;
    }

    // 判断用户是否已经参加过
    if (redis.sismember(userKey, userId)) {
        return false;
    }

    // 扣库存
    if (redis.decr(stockKey) >= 0) {
        // 下单
    ...
    } else {
        return false;
    }

    // 将用户加入到已参加集合中
    redis.sadd(userKey, userId);
    return true;
} catch (Exception e) {
    // 异常
} finally {
    // 解锁
    lock.unlock();
}

以上逻辑为何要用分布式锁呢?

假设不用分布式锁,羊毛党同时发了十个请求(同一个用户),由于 redis.sismember(userKey, userId) 判断都会返回 0,于是都能扣库存下单,羊毛薅了一地。

但该场景有没有更优的解决方案呢?

我们使用分布式锁是为了保证临界区代码(lock 保护的区域)执行的原子性——不过 Redis 的原子性还可以通过 Lua 脚本来实现吧。

上面代码一共进行了 6 次 Redis 交互,假设每次用时 50ms,光 Redis 交互这块就用了 0.3s 的时间。

如果我们将这些逻辑封装成 Lua 脚本,只需要一次 Redis 交互就能保证原子性:

var lua = "
 -- 参数说明:
 -- KEYS[1]: actKey
 -- KEYS[2]: userKey
 -- KEYS[3]: stockKey
 -- ARGV[1]: userId

 -- 判断活动时间
 -- (事先将活动的关键信息保存到 Redis hash 中)
 -- 取活动的开始和结束时间
 local act = redis.call('hmget', KEYS[1], 'start', 'end');
 local now = redis.call('time')[1];
 if (not act[1] or now < act[1] or now >= act[2])
 then
 return 0;
 end

 -- 判断库存
 local stock = redis.call('get', KEYS[3]);
 if (not stock or tonumber(stock) <= 0)
 then
 return 0;
 end

 -- 判断用户是否已经参与过
 if (redis.call('sismember', KEYS[2], ARGV[1]) == 1)
 then
 return 0;
 end

 -- 扣库存
 if (redis.call('decr', KEYS[3]) >= 0)
 then
 -- 加入用户
 return redis.call('sadd', KEYS[2], ARGV[1]);
 else
 return 0;
 end
";

var actKey = "seckill.act."+actId;
var userKey = actKey + ".users";
var stockKey = actKey + ".stock." + goodsId;

if (redis.eval(lua, 3, actKey, userKey, stockKey, userId)) {
    // 扣库存成功,下单
    ...
}

上面的脚本还可以先缓存到 Redis 服务器中,然后用 evalsha 命令执行,这样客户端就不用每次都传这么一大坨代码,进一步提升传输性能。

总结

本篇我们从 setnx 命令开始实现了一个最简单的分布式锁,而后通过实际使用发现其存在各种缺陷并逐步增强其实现,主要涉及到以下几个方面:

  1. 被动释放。进程崩溃后,进程本地锁自然会销毁,但 Redis 锁不会。所以要加 TTL 机制,防止因加锁者崩溃而导致锁无法释放;
  2. 属主。线程不能释放别的线程的锁;
  3. 锁等待。加锁失败时可以等待一段时间并重试,而不是立即返回;
  4. 保活。通过定时刷新锁的 TTL 防止被动释放;

不难发现,分布式锁比进程内本地锁要复杂得多,也重得多(本地锁操作是纳秒级别,分布式锁操作是毫秒级别),现实中,在使用分布式锁之前我们要思考下有没有其它更优方案,比如乐观锁、Lua 脚本等。

另外需要注意的是,分布式锁只能解决多进程之间的并发问题,并不能实现数据操作的幂等性。一个例子是增减积分的操作。

增加积分的例子:

// 给用户增加积分
// sourceType、sourceId:积分来源标识,如消费赠送积分场景的 orderCode
// 幂等性:同样的 userId-sourceType-sourceId 不能重复加积分
function addBonus(userId, sourceType, sourceId, bonus) {
    // 加分布式锁
    var lock = new Lock(...);

    try {
        if (!lock.lock()) {
            return false;
        }

        // 检查是否重复
        if (isRepeat(userId, sourceType, sourceId)) {
            return false;
        }

        // 加积分
        add(userId, sourceType, sourceId, bonus);
    } finally {
        lock.unlock();
    }
}

上面分布式锁的作用是防止并发请求(调用端 bug?薅羊毛?),而该操作的幂等性是由 isRepeat() 保证的(查数据库)。

保障幂等性一般有悲观锁和乐观锁两种模式。

上面这种属于悲观锁模式(把整个操作锁起来),另一种乐观锁实现方式是给 userId-sourceType-sourceId 加上组合唯一键约束,此时就不需要加分布式锁,也不需要 isRepeat() 检测,直接 add(userId, sourceType, sourceId, bonus) 就能搞定。

最后说下文中为啥使用伪代码(而不是用具体某一门编程语言实现)。

用伪代码的最主要目的是省去语言特定的实现细节,将关注点放在逻辑本身。

比如 redis 客户端,不同语言有不同的使用方式,就算同一门语言的不同类库用法也不同,有些语言的类库用起来又臭又长,影响心情。

伪代码不受特定语言约束,用起来自由自在,本文中 redis 客户端的使用方式和 Redis 官方的原始命令格式完全一致,没有额外的心智负担。

再比如生成 token 的随机字符串函数 randStr(),go 语言要这样写:

func randStr(size int) (string, error) {
    sl := make([]byte, size)
    if _, err := io.ReadFull(rand.Reader, sl); err != nil {
        return "", err
    }
    return base64.RawURLEncoding.EncodeToString(sl), nil
}

代码虽然不多,但没玩过 go 的小伙伴看到这儿心里是不是要起伏那么两三下?但这玩意怎么实现跟本文的主题没半毛钱关系。

相反,本文的 lua 脚本都是货真价实的,测试通过的——因为这是本文的核心啊。

伪代码的缺点是它不能“拎包入住”,但本文的重点并不是要写个源码库——我们没必要真的自己写一个,直接用 redission 或者其他什么库不香吗?

本文的重点在于分析 Redis 分布式锁的原理,分布式锁面临哪些问题?解决思路是什么?使用时要注意什么?知其然知其所以然。

当你不知其所以然时,很多东西显得特高大上,什么“看门狗”,搞得神乎其神,当搞明白其原理和目的时,也就那么回事。

8cceb3f60a407b1a6f0334a8926bb5c5 - 循序渐进 Redis 分布式锁(以及何时不用它)

转载请注明:xuhss » 循序渐进 Redis 分布式锁(以及何时不用它)

喜欢 (0)

您必须 登录 才能发表评论!