基于SpringBoot实现过一个分布式锁:SpringBoot实现Redis分布式锁,最近面试候选人,这个问题也问的比较多,在这里自己也复习一下。

什么是分布式锁

需要一种支持分布式集群环境下的锁:查询 DB 时,只有一个线程能访问,其他线程都需要等待第一个线程释放锁资源后,才能继续执行。

客户端

根据上图,可以简单描述下分布式锁的工作流程:

  1. 前端将 10W 的高并发请求转发给四个微服务。
  2. 每个微服务处理 2.5 W 个请求。
  3. 每个处理请求的线程在执行业务之前,需要先抢占锁。
  4. 获取到锁的线程在执行完业务后,释放锁。
  5. 未获取到的线程需要等待锁释放。
  6. 释放锁后,其他线程抢占锁。
  7. 重复执行步骤 4、5、6。

所有请求的线程都去同一个地方获取,如果有,就执行业务逻辑,没有,就需要等其他线程释放。这个是所有线程可见的,可以把这个放到 Redis 缓存或者数据库。

Redis的SETNX

Redis作为一个可以公共访问的地方,非常适合用来做分布式锁。

用 Redis 实现分布式锁的几种方案,都是用 SETNX 命令(设置 key 等于某 value)。只是高阶方案传的参数个数不一样,以及考虑了异常情况。

SETNXset If not exist的简写。意思就是当 key 不存在时,设置 key 的值,存在时,什么都不做。

Redis命令如下:

set <key> <value> NX

返回 OK,表示设置成功。重复执行该命令,会返回 nil表示设置失败。

简单方案

用 Redis 的 SETNX 命令来实现最简单的分布式锁

原理图

image-20210608113847222

  • 多个并发线程都去 Redis 中申请锁,也就是执行 setnx 命令,假设线程 A 执行成功,说明当前线程 A 获得了。
  • 其他线程执行 setnx 命令都会是失败的,所以需要等待线程 A 释放锁。
  • 线程 A 执行完自己的业务后,删除锁。
  • 其他线程继续抢占锁,也就是执行 setnx 命令。因为线程 A 已经删除了锁,所以又有其他线程可以抢占到锁了。
示例代码
// 1.先抢占锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "123");
if (lock) {
// 2.抢占成功,执行业务
Object object = doSomeThing();
// 3.解锁
redisTemplate.delete("lock");
return object;
} else {
// 4.休眠一段时间
sleep(100);
// 5.抢占失败,等待锁释放
return doSomethingsByRedisDistributedLock();
}

递归调用,可能会导致栈空间溢出。因此休眠一段时间。

缺陷

这个方案会有个问题,当setnx占锁成功之后,业务代码或服务器宕机,没有执行删除锁的逻辑,则会造成死锁

那如何规避这个风险呢?

设置锁的自动过期时间,过一段时间后,自动删除锁,这样其他线程就能获取到锁了。

过期自动解锁方案

原理图

image-20210608140209079

示例代码

清理redis key代码如下

// 在 10s 以后,自动清理 lock
redisTemplate.expire("lock", 10, TimeUnit.SECONDS);

完整代码

示例代码
// 1.先抢占锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "123");
if(lock) {
// 2.在 10s 以后,自动清理 lock
redisTemplate.expire("lock", 10, TimeUnit.SECONDS);
// 3.抢占成功,执行业务
Object object = doSomeThing();
// 4.解锁
redisTemplate.delete("lock");
return object;
}

缺陷

这个方案解决了线程异常或服务器宕机造成的锁未释放的问题,但还是存在其他问题:

因为占锁和设置过期时间是分两步执行的,所以如果在这两步之间发生了异常,则锁的过期时间根本就没有设置成功。

所以和简单方案方案有一样的问题:锁永远不能过期

事务原子性方案

原子性:多条命令要么都成功执行,要么都不执行。

将获取锁和设置锁过期时间放到一步执行。

Redis中可以这样执行:

# 设置某个 key 的值并设置多少毫秒或秒 过期。
set <key> <value> PX <多少毫秒> NX
# 或
set <key> <value> EX <多少秒> NX

原理图

与前面两种方案相比。获取锁的时候,也需要设置锁的过期时间,这是一个原子操作,要么都成功执行,要么都不执行。如下图所示:

image-20210608141319974

示例代码

设置 lock 的值等于 123,过期时间为 10 秒。如果 10 秒 以后,lock 还存在,则清理 lock。

setIfAbsent("lock", "123", 10, TimeUnit.SECONDS);

缺陷

这个方案也有缺陷,比如:

  • A抢占到了锁,并设置了10秒过期时间,锁编号123

  • 10秒以后,A还在执行,此时锁被自动打开

  • 此时B抢占到锁,设置编号123,设置10秒过期

  • 因为该资源只能有一个线程执行,所以A和B执行任务就产生冲突

  • 在15秒的时候A终于完成了任务,这个时候B还没完成任务

  • A主动释放了编号123的锁

  • B还在执行任务,但是锁已经被打开了

    — B任务还没执行完,锁就被打开了

  • B的锁被A打开(编号都是123)后,B还在执行任务

  • C抢占到锁,C开始执行任务

  • 这时B和C又产生了冲突

  • ··· ···

A 处理任务所需要的时间大于锁自动清理(开锁)的时间,所以在自动开锁后,又有其他用户抢占到了锁。当用户 A 完成任务后,会把其他用户抢占到的锁给主动打开。

不同编号锁方案

上面方案的缺陷,过程看似复杂,但其实也很好解决,给每个锁设置不同编号就行了。

原理图

image-20210608142711123

示例代码

示例代码
// 1.生成唯一 id
String uuid = UUID.randomUUID().toString();
// 2. 抢占锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 10, TimeUnit.SECONDS);
if(lock) {
System.out.println("抢占成功:" + uuid);
// 3.抢占成功,执行业务
Object object = doSomeThing();
// 4.获取当前锁的值
String lockValue = redisTemplate.opsForValue().get("lock");
// 5.如果锁的值和设置的值相等,则清理自己的锁
if(uuid.equals(lockValue)) {
System.out.println("清理锁:" + lockValue);
redisTemplate.delete("lock");
}
return object;
} else {
System.out.println("抢占失败,等待锁释放");
// 4.休眠一段时间
sleep(100);
// 5.抢占失败,等待锁释放
return doSomethingsByRedisDistributedLock();
}
  1. 生成随机唯一 id,给锁加上唯一值。
  2. 抢占锁,并设置过期时间为 10 s,且锁具有随机唯一 id。
  3. 抢占成功,执行业务。
  4. 执行完业务后,获取当前锁的值。
  5. 如果锁的值和设置的值相等,则清理自己的锁。

缺陷

此方案还是会有点问题:

第 4 步和第 5 步并不是原子性的。

image-20210608144433881

  • 时刻:0s。线程 A 抢占到了锁。
  • 时刻:9.5s。线程 A 向 Redis 查询当前 key 的值。
  • 时刻:10s。锁自动过期。
  • 时刻:11s。线程 B 抢占到锁。
  • 时刻:12s。线程 A 在查询途中耗时长,终于拿多锁的值。
  • 时刻:13s。线程 A 还是拿自己设置的锁的值和返回的值进行比较,值是相等的,清理锁,但是这个锁其实是线程 B 抢占的锁。

最终方案

上面的线程 A 查询锁和删除锁的逻辑不是原子性的,所以将查询锁和删除锁这两步作为原子指令操作就可以了。

原理图

用脚本进行删锁,达到原子性操作。

image-20210608144959531

示例代码

redis中的脚本:

if redis.call("get",KEYS[1]) == ARGV[1]
then
return redis.call("del",KEYS[1])
else
return 0
end

这段脚本和上一个方案的获取key,删除key的方式很像。先获取 KEYS[1] 的 value,判断 KEYS[1] 的 value 是否和 ARGV[1] 的值相等,如果相等,则删除 KEYS[1]。

分两步执行这段脚本:先定义脚本;用 redisTemplate.execute 方法执行脚本。

脚本解锁
// 脚本解锁
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), uuid);

上面的代码中,KEYS[1] 对应lock,ARGV[1] 对应 uuid,含义就是如果 lock 的 value 等于 uuid 则删除 lock。

这段 Redis 脚本是由 Redis 内嵌的 Lua 环境执行的,所以又称作 Lua 脚本。

使用Redisson的方案

Redisson

Redisson 提供了使用 Redis的最简单和最便捷的方法。

Redisson的宗旨是促进使用者对 Redis 的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格(In-Memory Data Grid)。

点击查看详细信息
  • Netty 框架:Redisson采用了基于NIO的Netty框架,不仅能作为Redis底层驱动客户端,具备提供对Redis各种组态形式的连接功能,对Redis命令能以同步发送、异步形式发送、异步流形式发送或管道形式发送的功能,LUA脚本执行处理,以及处理返回结果的功能
  • 基础数据结构:将原生的Redis HashListSetStringGeoHyperLogLog等数据结构封装为Java里大家最熟悉的映射(Map)列表(List)集(Set)通用对象桶(Object Bucket)地理空间对象桶(Geospatial Bucket)基数估计算法(HyperLogLog)等结构,
  • 分布式数据结构:这基础上还提供了分布式的多值映射(Multimap),本地缓存映射(LocalCachedMap),有序集(SortedSet),计分排序集(ScoredSortedSet),字典排序集(LexSortedSet),列队(Queue),阻塞队列(Blocking Queue),有界阻塞列队(Bounded Blocking Queue),双端队列(Deque),阻塞双端列队(Blocking Deque),阻塞公平列队(Blocking Fair Queue),延迟列队(Delayed Queue),布隆过滤器(Bloom Filter),原子整长形(AtomicLong),原子双精度浮点数(AtomicDouble),BitSet等Redis原本没有的分布式数据结构。
  • 分布式锁:Redisson还实现了Redis文档中提到像分布式锁Lock这样的更高阶应用场景。事实上Redisson并没有不止步于此,在分布式锁的基础上还提供了联锁(MultiLock)读写锁(ReadWriteLock)公平锁(Fair Lock)红锁(RedLock)信号量(Semaphore)可过期性信号量(PermitExpirableSemaphore)闭锁(CountDownLatch)这些实际当中对多线程高并发应用至关重要的基本部件。正是通过实现基于Redis的高阶应用方案,使Redisson成为构建分布式系统的重要工具。
  • 节点:Redisson作为独立节点可以用于独立执行其他节点发布到分布式执行服务分布式调度服务里的远程任务。

整合Redisson

SpringBoot整合Redisson的示例代码可点击这里查看:cayzlh-starter

原理图

因为 Redisson 非常强大,实现分布式锁的方案非常简洁。

image-20210608150643995

示例代码

// 1.设置分布式锁
RLock lock = redisson.getLock("lock");
// 2.占用锁
lock.lock();
// 3.执行业务
...
// 4.释放锁
lock.unlock();

和之前 Redis 的方案相比,简洁很多。

分布式读写锁

基于 Redis 的 Redisson 分布式可重入读写锁RReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。其中读锁和写锁都继承了 RLock接口。

写锁是一个排他锁(互斥锁),读锁是一个共享锁。

  • 读锁 + 读锁:相当于没加锁,可以并发读。
  • 读锁 + 写锁:写锁需要等待读锁释放锁。
  • 写锁 + 写锁:互斥,需要等待对方的锁释放。
  • 写锁 + 读锁:读锁需要等待写锁释放。

示例代码:

RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();

另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 或
rwlock.writeLock().lock(10, TimeUnit.SECONDS);

// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// 或
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();

小结

上面几种方案的不断演进的过程中,知道了系统中哪些地方可能存在异常情况,以及该如何更好地进行处理。