您现在的位置是:首页 >其他 >Redis实现分布式锁网站首页其他
Redis实现分布式锁
为什么需要分布式锁
在聊分布式锁之前,有必要先解释一下,为什么需要分布式锁。
与分布式锁相对就的是单机锁,我们在写多线程程序时,避免同时操作一个共享变量产生数据问题,通常会使用一把锁来互斥以保证共享变量的正确性,其使用范围是在同一个进程中。如果换做是多个进程,需要同时操作一个共享资源,如何互斥呢?现在的业务应用通常是微服务架构,这也意味着一个应用会部署多个进程,多个进程如果需要修改MySQL中的同一行记录,为了避免操作乱序导致脏数据,此时就需要引入分布式锁了。
想要实现分布式锁,必须借助一个外部系统,所有进程都去这个系统上申请加锁。而这个外部系统,必须要实现互斥能力,即两个请求同时进来,只会给一个进程加锁成功,另一个失败。这个外部系统可以是数据库,也可以是Redis或Zookeeper,但为了追求性能,我们通常会选择使用Redis或Zookeeper来做。
Redis本身可以被多个客户端共享访问,正好就是一个共享存储系统,可以用来保存分布式锁。而且 Redis 的读写性能高,可以应对高并发的锁操作场景。本文主要探讨如何基于Redis实现分布式锁以及实现过程中可能面临的问题。
分布式锁如何实现
作为分布式锁实现过程中的共享存储系统,Redis可以使用键值对来保存锁变量,在接收和处理不同客户端发送的加锁和释放锁的操作请求。那么,键值对的键和值具体是怎么定的呢?我们要赋予锁变量一个变量名,把这个变量名作为键值对的键,而锁变量的值,则是键值对的值,这样一来,Redis就能保存锁变量了,客户端也就可以通过Redis的命令操作来实现锁操作。
想要实现分布式锁,必须要求Redis有互斥的能力。可以使用SETNX命令,其含义是SET IF NOT EXIST,即如果key不存在,才会设置它的值,否则什么也不做。两个客户端进程可以执行这个命令,达到互斥,就可以实现一个分布式锁。
以下展示了Redis使用key/value对保存锁变量,以及两个客户端同时请求加锁的操作过程。
加锁操作完成后,加锁成功的客户端,就可以去操作共享资源,例如,修改MySQL的某一行数据。操作完成后,还要及时释放锁,给后来者让出操作共享资源的机会。如何释放锁呢?直接使用DEL命令删除这个key即可。这个逻辑非常简单,整体的流程写成伪代码就是下面这样。
// 加锁
SETNX lock_key 1
// 业务逻辑
DO THINGS
// 释放锁
DEL lock_key
但是,以上实现存在一个很大的问题,当客户端1拿到锁后,如果发生下面的场景,就会造成死锁。
程序处理业务逻辑异常,没及时释放锁
进程挂了,没机会释放锁
以上情况会导致已经获得锁的客户端一直占用锁,其他客户端永远无法获取到锁。
如何避免死锁
为了解决以上死锁问题,最容易想到的方案是在申请锁时,在Redis中实现时,给锁设置一个过期时间,假设操作共享资源的时间不会超过10s,那么加锁时,给这个key设置10s过期即可。
但以上操作还是有问题,加锁、设置过期时间是2条命令,有可能只执行了第一条,第二条却执行失败,例如:
SETNX执行成功,执行EXPIRE时由于网络问题,执行失败
SETNX执行成功,Redis异常宕机,EXPIRE没有机会执行
SETNX执行成功,客户端异常崩溃,EXPIRE没有机会执行
总之这两条命令如果不能保证是原子操作,就有潜在的风险导致过期时间设置失败,依旧有可能发生死锁问题。幸好在Redis 2.6.12之后,Redis扩展了SET命令的参数,可以在SET的同时指定EXPIRE时间,这条操作是原子的,例如以下命令是设置锁的过期时间为10秒。
SET lock_key 1 EX 10 NX
至此,解决了死锁问题,但还是有其他问题。想像下面这个这样一种场景:
客户端1加锁成功,开始操作共享资源
客户端1操作共享资源耗时太久,超过了锁的过期时间,锁失效(锁被自动释放)
客户端2加锁成功,开始操作共享资源
客户端1操作共享资源完成,在finally块中手动释放锁,但此时它释放的是客户端2的锁。
这里存在两个严重的问题:
锁过期
释放了别人的锁
第1个问题是评估操作共享资源的时间不准确导致的,如果只是一味增大过期时间,只能缓解问题降低出现问题的概率,依旧无法彻底解决问题。原因在于客户端在拿到锁之后,在操作共享资源时,遇到的场景是很复杂的,既然是预估的时间,也只能是大致的计算,不可能覆盖所有导致耗时变长的场景。
第2个问题是释放了别人的锁,原因在于释放锁的操作是无脑操作,并没有检查这把锁的归属,这样解锁不严谨。如何解决呢?
锁被别人给释放了
解决办法是,客户端在加锁时,设置一个只有自己知道的唯一标识进去,例如可以是自己的线程ID,如果是redis实现,就是SET key unique_value EX 10 NX。之后在释放锁时,要先判断这把锁是否归自己持有,只有是自己的才能释放它。
//释放锁 比较unique_value是否相等,避免误释放
if redis.get("key") == unique_value then
return redis.del("key")
这里释放锁使用的是GET + DEL两条命令,这时又会遇到原子性问题了。
客户端1执行GET,判断锁是自己的
客户端2执行了SET命令,强制获取到锁(虽然发生概念很低,但要严谨考虑锁的安全性)
客户端1执行DEL,却释放了客户端2的锁
由此可见,以上GET + DEL两个命令还是必须原子的执行才行。怎样原子执行两条命令呢?
答案是Lua脚本,可以把以上逻辑写成Lua脚本,让Redis执行。因为Redis处理每个请求是单线程执行的,在执行一个Lua脚本时其它请求必须等待,直到这个Lua脚本处理完成,这样一来GET+DEL之间就不会有其他命令执行了。
以下是使用Lua脚本(unlock.script)实现的释放锁操作的伪代码,其中,KEYS[1]表示lock_key,ARGV[1]是当前客户端的唯一标识,这两个值都是我们在执行 Lua脚本时作为参数传入的。
//Lua脚本语言,释放锁 比较unique_value是否相等,避免误释放
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
最后我们执行以下命令,即可
redis-cli --eval unlock.script lock_key , unique_value
这样一路优先下来,整个加锁、解锁流程就更严谨了,先小结一下,基于Redis实现的分布式锁,一个严谨的流程如下:
加锁时要设置过期时间SET lock_key unique_value EX expire_time NX
操作共享资源
释放锁:Lua脚本,先GET判断锁是否归属自己,再DEL释放锁
有了这个严谨的锁模型,我们还需要重新思考之前的那个问题,锁的过期时间不好评估怎么办。
有的面试问除了Lua脚本你还有什么方式?可以用redis事务来处理原子性问题
如何确定锁的过期时间
前面提到过,过期时间如果评估得不好,这个锁就会有提前过期的风险,一种妥协的解决方案是,尽量冗余过期时间,降低锁提前过期的概率,但这个方案并不能完美解决问题。是否可以设置这样的方案,加锁时,先设置一个预估的过期时间,然后开启一个守护线程,定时去检测这个锁的失效时间,如果锁快要过期了,操作共享资源还未完成,那么就自动对锁进行续期,重新设置过期时间。
这是一种比较好的方案,已经有一个库把这些工作都封装好了,它就是Redisson。
Redisson
Redisson是一个Java语言实现的Redis SDK客户端,在使用分布式锁时,它就采用了自动续期的方案来避免锁过期,这个守护线程我们一般叫它看门狗线程。这个SDK提供的API非常友好,它可以像操作本地锁一样操作分布式锁。客户端一旦加锁成功,就会启动一个watch dog看门狗线程,它是一个后台线程,会每隔一段时间(这段时间的长度与设置的锁的过期时间有关)检查一下,如果检查时客户端还持有锁key(也就是说还在操作共享资源),那么就会延长锁key的生存时间。
那如果客户端在加锁成功后就宕机了呢?宕机了那么看门狗任务就不存在了,也就无法为锁续期了,锁到期自动失效。
Redis的部署方式对锁的影响
上面讨论的情况,都是锁在单个Redis 实例中可能产生的问题,并没有涉及到Redis的部署架构细节。
Redis发展到现在,几种常见的部署架构有:
单机模式;
主从模式;
哨兵(sentinel)模式;
集群模式;
我们使用Redis时,一般会采用主从集群+哨兵的模式部署,哨兵的作用就是监测redis节点的运行状态。普通的主从模式,当master崩溃时,需要手动切换让slave成为master,使用主从+哨兵结合的好处在于,当master异常宕机时,哨兵可以实现故障自动切换,把slave提升为新的master,继续提供服务,以此保证可用性。
那么当主从发生切换时,分布式锁依旧安全吗?
想像这样的场景:
客户端1在master上执行SET命令,加锁成功
此时,master异常宕机,SET命令还未同步到slave上(主从复制是异步的)
哨兵将slave提升为新的master,但这个锁在新的master上丢失了,导致客户端2来加锁成功了,两个客户端共同操作共享资源
可见,当引入Redis副本后,分布式锁还是可能受到影响。即使Redis通过sentinel保证高可用,如果这个master节点由于某些原因发生了主从切换,那么就会出现锁丢失的情况。
Redis服务间锁的问题,主从架构锁失效问题
我们还可以用zookeeper,zookeeper先同步其他服务节点。这是和redis的区别。
CAP原则
一致,可用,分区容错,只能满足两种
而redis满足的是AP,可用
zookeeper满足的是CP,一致。其他节点同步后才告诉枷锁成功,牺牲效率保证一致性
如何选择,如果对并发要求高,就用redis,因为redis性能高
如何解决redis的服务间锁问题,RedLock,但是这会有影响,所以是否用RedLock要根据自己业务取舍。
集群模式+Redlock实现高可靠的分布式锁
为了避免Redis实例故障而导致的锁无法工作的问题,Redis的开发者 Antirez提出了分布式锁算法Redlock。
Redlock算法的基本思路,是让客户端和多个独立的Redis实例依次请求加锁,如果客户端能够和半数以上的实例成功地完成加锁操作,那么我们就认为,客户端成功地获得分布式锁了,否则加锁失败。这样一来,即使有单个Redis实例发生故障,因为锁变量在其它实例上也有保存,所以,客户端仍然可以正常地进行锁操作,锁变量并不会丢失。
来具体看下Redlock算法的执行步骤。Redlock算法的实现要求Redis采用集群部署模式,无哨兵节点,需要有N个独立的Redis实例(官方推荐至少5个实例)。接下来,我们可以分成3步来完成加锁操作。
第一步是,客户端获取当前时间。
第二步是,客户端按顺序依次向N个Redis实例执行加锁操作。
这里的加锁操作和在单实例上执行的加锁操作一样,使用SET命令,带上NX、EX/PX选项,以及带上客户端的唯一标识。当然,如果某个Redis实例发生故障了,为了保证在这种情况下,Redlock算法能够继续运行,我们需要给加锁操作设置一个超时时间。如果客户端在和一个Redis实例请求加锁时,一直到超时都没有成功,那么此时,客户端会和下一个Redis实例继续请求加锁。加锁操作的超时时间需要远远地小于锁的有效时间,一般也就是设置为几十毫秒。
第三步是,一旦客户端完成了和所有Redis实例的加锁操作,客户端就要计算整个加锁过程的总耗时。
客户端只有在满足两个条件时,才能认为是加锁成功,条件一是客户端从超过半数(大于等于 N/2+1)的Redis实例上成功获取到了锁;条件二是客户端获取锁的总耗时没有超过锁的有效时间。
为什么大多数实例加锁成功才能算成功呢?
多个Redis实例一起来用,其实就组成了一个分布式系统。在分布式系统中总会出现异常节点,所以在谈论分布式系统时,需要考虑异常节点达到多少个,也依旧不影响整个系统的正确运行。这是一个分布式系统的容错问题,这个问题的结论是:如果只存在故障节点,只要大多数节点正常,那么整个系统依旧可以提供正确服务。
在满足了这两个条件后,我们需要重新计算这把锁的有效时间,计算的结果是锁的最初有效时间减去客户端为获取锁的总耗时。如果锁的有效时间已经来不及完成共享数据的操作了,我们可以释放锁,以免出现还没完成共享资源操作,锁就过期了的情况。
当然,如果客户端在和所有实例执行完加锁操作后,没能同时满足这两个条件,那么,客户端就要向所有Redis节点发起释放锁的操作。为什么释放锁,要操作所有的节点呢,不能只操作那些加锁成功的节点吗?因为在某一个Redis节点加锁时,可能因为网络原因导致加锁失败,例如一个客户端在一个Redis实例上加锁成功,但在读取响应结果时由于网络问题导致读取失败,那这把锁其实已经在Redis上加锁成功了。所以释放锁时,不管之前有没有加锁成功,需要释放所有节点上的锁以保证清理节点上的残留的锁。
在Redlock算法中,释放锁的操作和在单实例上释放锁的操作一样,只要执行释放锁的 Lua脚本就可以了。这样一来,只要N个Redis实例中的半数以上实例能正常工作,就能保证分布式锁的正常工作了。所以,在实际的业务应用中,如果你想要提升分布式锁的可靠性,就可以通过Redlock算法来实现。
Redisson和Jedis、Lettuce有什么区别?
Redisson和它俩的区别就像一个用鼠标操作图形化界面,一个用命令行操作文件。Redisson是更高层的抽象,Jedis和Lettuce是Redis命令的封装。
Jedis是Redis官方推出的用于通过Java连接Redis客户端的一个工具包,提供了Redis的各种命令支持
Lettuce是一种可扩展的线程安全的 Redis 客户端,通讯框架基于Netty,支持高级的 Redis 特性,比如哨兵,集群,管道,自动重新连接和Redis数据模型。Spring Boot 2.x 开始 Lettuce 已取代 Jedis 成为首选 Redis 的客户端。
Redisson是架设在Redis基础上,通讯基于Netty的综合的、新型的中间件,企业级开发中使用Redis的最佳范本
Jedis把Redis命令封装好,Lettuce则进一步有了更丰富的Api,也支持集群等模式。但是两者也都点到为止,只给了你操作Redis数据库的脚手架,而Redisson则是基于Redis、Lua和Netty建立起了成熟的分布式解决方案,甚至redis官方都推荐的一种工具集。
Lua脚本是什么?
Lua脚本是redis已经内置的一种轻量小巧语言,其执行是通过redis的eval /evalsha 命令来运行,把操作封装成一个Lua脚本,如论如何都是一次执行的原子操作。
于是2.0版本通过Lua脚本删除
lockDel.lua如下
if redis.call('get', KEYS[1]) == ARGV[1]
then
-- 执行删除操作
return redis.call('del', KEYS[1])
else
-- 不成功,返回0
return 0
end
delete操作时执行Lua命令
// 解锁脚本
DefaultRedisScript<Object> unlockScript = new DefaultRedisScript();
unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lockDel.lua")));
// 执行lua脚本解锁
redisTemplate.execute(unlockScript, Collections.singletonList(keyName), value);
Redisson分布式锁
使用
- 依赖
<!-- 原生,本章使用-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
<!-- 另一种Spring集成starter,本章未使用 -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.13.6</version>
</dependency>
- 配置
@Configuration
public class RedissionConfig {
@Value("${spring.redis.host}")
private String redisHost;
@Value("${spring.redis.password}")
private String password;
private int port = 6379;
@Bean
public RedissonClient getRedisson() {
Config config = new Config();
//此为单机模式singleserver,其他模式redisson也支持
config.useSingleServer().
setAddress("redis://" + redisHost + ":" + port).
setPassword(password);
config.setCodec(new JsonJacksonCodec());
return Redisson.create(config);
}
}
- 启用分布式锁
@Resource
private RedissonClient redissonClient;
RLock rLock = redissonClient.getLock(lockName);
try {
boolean isLocked = rLock.tryLock(expireTime, TimeUnit.MILLISECONDS);
if (isLocked) {
// TODO
}
} catch (Exception e) {
//这里加个判断,否则可能在高并发下报错
if(rLock.isLocked() && rLock.isHeldByCurrentThread()){
rLock.unlock();
}
}
//这里加个判断,否则可能在高并发下报错
if(rLock.isLocked() && rLock.isHeldByCurrentThread()){
rLock.unlock();
}
简洁明了,只需要一个RLock,既然推荐Redisson,就往里面看看他是怎么实现的。
setnx是key存在不做任何操作
Redisson分布式锁实现原理
Redisson源码分析
RLock
RLock是Redisson分布式锁的最核心接口,继承了concurrent包的Lock接口和自己的RLockAsync接口,RLockAsync的返回值都是RFuture,是Redisson执行异步实现的核心逻辑,也是Netty发挥的主要阵地。
RLock如何加锁?
从RLock进入,找到RedissonLock类,找到tryLock 方法再递进到干事的tryAcquireOnceAsync 方法,这是加锁的主要代码(版本不一此处实现有差别,和最新3.15.x有一定出入,但是核心逻辑依然未变。此处以3.13.6为例)
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {
return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
此处出现leaseTime时间判断的2个分支,实际上就是加锁时是否设置过期时间,未设置过期时间(-1)时则会有watchDog 的锁续约 (下文),一个注册了加锁事件的续约任务。我们先来看有过期时间tryLockInnerAsync 部分,
evalWriteAsync是eval命令执行lua的入口
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {
return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
这里揭开真面目,eval命令执行Lua脚本的地方,此处的Lua脚本展开
-- 不存在该key时
if (redis.call('exists', KEYS[1]) == 0) then
-- 新增该锁并且hash中该线程id对应的count置1
redis.call('hincrby', KEYS[1], ARGV[2], 1);
-- 设置过期时间
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 存在该key 并且 hash中线程id的key也存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
-- 线程重入次数++
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
return redis.call('pttl', KEYS[1]);
和前面我们写自定义的分布式锁的脚本几乎一致,看来redisson也是一样的实现,具体参数分析:
// keyName
KEYS[1] = Collections.singletonList(this.getName())
// leaseTime
ARGV[1] = this.internalLockLeaseTime
// uuid+threadId组合的唯一值
ARGV[2] = this.getLockName(threadId)
总共3个参数完成了一段逻辑:
“
判断该锁是否已经有对应hash表存在,
• 没有对应的hash表:则set该hash表中一个entry的key为锁名称,value为1,之后设置该hash表失效时间为leaseTime
• 存在对应的hash表:则将该lockName的value执行+1操作,也就是计算进入次数,再设置失效时间leaseTime
• 最后返回这把锁的ttl剩余时间
”
也和上述自定义锁没有区别
既然如此,那解锁的步骤也肯定有对应的-1操作,再看unlock方法,同样查找方法名,一路到
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.unlockMessage, this.internalLockLeaseTime, this.getLockName(threadId)});
}
掏出Lua部分
-- 不存在key
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
-- 计数器 -1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
-- 过期时间重设
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
-- 删除并发布解锁消息
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;
该Lua KEYS有2个Arrays.asList(getName(), getChannelName())
name 锁名称
channelName,用于pubSub发布消息的channel名称
ARGV变量有三个LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)
LockPubSub.UNLOCK_MESSAGE,channel发送消息的类别,此处解锁为0
internalLockLeaseTime,watchDog配置的超时时间,默认为30s
lockName 这里的lockName指的是uuid和threadId组合的唯一值
步骤如下:
“
1.如果该锁不存在则返回nil;
2.如果该锁存在则将其线程的hash key计数器-1,
3.计数器counter>0,重置下失效时间,返回0;否则,删除该锁,发布解锁消息unlockMessage,返回1;
”
其中unLock的时候使用到了Redis发布订阅PubSub完成消息通知。
而订阅的步骤就在RedissonLock的加锁入口的lock方法里
long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
if (ttl != null) {
// 订阅
RFuture<RedissonLockEntry> future = this.subscribe(threadId);
if (interruptibly) {
this.commandExecutor.syncSubscriptionInterrupted(future);
} else {
this.commandExecutor.syncSubscription(future);
}
// 省略
当锁被其他线程占用时,通过监听锁的释放通知(在其他线程通过RedissonLock释放锁时,会通过发布订阅pub/sub功能发起通知),等待锁被其他线程释放,也是为了避免自旋的一种常用效率手段。
- 解锁消息
为了一探究竟通知了什么,通知后又做了什么,进入LockPubSub。
这里只有一个明显的监听方法onMessage,其订阅和信号量的释放都在父类PublishSubscribe,我们只关注监听事件的实际操作
protected void onMessage(RedissonLockEntry value, Long message) {
Runnable runnableToExecute;
if (message.equals(unlockMessage)) {
// 从监听器队列取监听线程执行监听回调
runnableToExecute = (Runnable)value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
// getLatch()返回的是Semaphore,信号量,此处是释放信号量
// 释放信号量后会唤醒等待的entry.getLatch().tryAcquire去再次尝试申请锁
value.getLatch().release();
} else if (message.equals(readUnlockMessage)) {
while(true) {
runnableToExecute = (Runnable)value.getListeners().poll();
if (runnableToExecute == null) {
value.getLatch().release(value.getLatch().getQueueLength());
break;
}
runnableToExecute.run();
}
}
}
发现一个是默认解锁消息 ,一个是读锁解锁消息 ,因为redisson是有提供读写锁的,而读写锁读读情况和读写、写写情况互斥情况不同,我们只看上面的默认解锁消息unlockMessage分支
LockPubSub监听最终执行了2件事
runnableToExecute.run() 执行监听回调
value.getLatch().release(); 释放信号量
Redisson通过LockPubSub 监听解锁消息,执行监听回调和释放信号量通知等待线程可以重新抢锁。
这时再回来看tryAcquireOnceAsync另一分支
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {
return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
可以看到,无超时时间时,在执行加锁操作后,还执行了一段费解的逻辑
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining) {
this.scheduleExpirationRenewal(threadId);
}
}
})
此处涉及到Netty的Future/Promise-Listener模型,Redisson中几乎全部以这种方式通信(所以说Redisson是基于Netty通信机制实现的),理解这段逻辑可以试着先理解
“
在 Java 的 Future 中,业务逻辑为一个 Callable 或 Runnable 实现类,该类的 call()或 run()执行完毕意味着业务逻辑的完结,在 Promise 机制中,可以在业务逻辑中人工设置业务逻辑的成功与失败,这样更加方便的监控自己的业务逻辑。
”
这块代码的表面意义就是,在执行异步加锁的操作后,加锁成功则根据加锁完成返回的ttl是否过期来确认是否执行一段定时任务。
这段定时任务的就是watchDog的核心。
锁续约
查看RedissonLock.this.scheduleExpirationRenewal(threadId)
private void scheduleExpirationRenewal(long threadId) {
RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
this.renewExpiration();
}
}
private void renewExpiration() {
RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (ee != null) {
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
if (ent != null) {
Long threadId = ent.getFirstThreadId();
if (threadId != null) {
RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
} else {
if (res) {
RedissonLock.this.renewExpiration();
}
}
});
}
}
}
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
}
拆分来看,这段连续嵌套且冗长的代码实际上做了几步
“
• 添加一个netty的Timeout回调任务,每(internalLockLeaseTime / 3)毫秒执行一次,执行的方法是renewExpirationAsync
• renewExpirationAsync重置了锁超时时间,又注册一个监听器,监听回调又执行了renewExpiration
”
renewExpirationAsync 的Lua如下
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('pexpire', KEYS[1], ARGV[1]);
return 1;
end;
return 0;
重新设置了超时时间。
Redisson加这段逻辑的目的是什么?
目的是为了某种场景下保证业务不影响,如任务执行超时但未结束,锁已经释放的问题。
当一个线程持有了一把锁,由于并未设置超时时间leaseTime,Redisson默认配置了30S,开启watchDog,每10S对该锁进行一次续约,维持30S的超时时间,直到任务完成再删除锁。
这就是Redisson的锁续约 ,也就是WatchDog (看门狗)实现的基本思路。
流程概括
通过整体的介绍,流程简单概括:
“
A、B线程争抢一把锁,A获取到后,B阻塞
B线程阻塞时并非主动CAS,而是PubSub方式订阅该锁的广播消息
A操作完成释放了锁,B线程收到订阅消息通知
B被唤醒开始继续抢锁,拿到锁
”
除了Lua脚本也可以用redis事务实现,但官方推荐Lua脚本,这里面试万一问可以答通过redis事务实现原子性。
参考文档
redisson