您现在的位置是:首页 >技术交流 >Redis分布式锁网站首页技术交流
Redis分布式锁
为什么需要分布式锁
我们在多线程情况下面对对资源的竞争我们通过mutex互斥锁来保护临界资源来保证数据的正确性,那么也就意味着mutex这把锁多个线程都能看到,这是在同一个进程里面的情况。
那么如果换做是多进程,需要同时操作同一个共享资源那么改如何来操作了?
我们又如何来实现互斥了?现在的业务应用通常都是微服务的业务,那么也就意味着一个应用内部可能有多个进程访问数据库,此时为了操作乱序导致预期结果不一致,此时我们需要引入分布式锁来进行保护。要想实现分布式锁,必须借助一个外部系统,因为进程之间是具有独立性的。所有的进程都要通过这个系统进行加锁,而这个外部的系统必须实现互斥功能只要一个进程加锁成功之后,其他进程在进行加锁则加锁失败。这个外部的系统可以是数据库也可以是redis但是为了追求性能我们一般使用redis.
如何实现分布式锁
我们只要一把锁必须要有的功能就是互斥功能,这也就意味着要想使用redis实现分布式锁那么redis就必须要有这个互斥功能,在redis当中我们可以使用setnx命令来达到这个目的。setnx命令博主在这里说一下他的意思是set if not exist 如果key不存在设置其值,如果key已经存在了那么就什么也不干。这样多个进程来执行条命令就可以达到互斥的功能。
加锁成功的进程就可以修改共享资源,而加锁失败的线程就等待锁资源。当操作完成之后需要释放锁给后来的操作者操作共享资源的机会。那么在redis当中我们如何释放了?很简单我们只需要del lock就可以了。下面给出这个流程的伪代码
//加锁
setnx lock 1
//业务逻辑
do things
//释放锁
del lock
那么实现一个锁就真的怎么简单吗?其实不是,一把合格的分布式锁需要满足以下内容:
我们上面的实现存在着很大的一个问题,那就是当一个客户端拿到锁后,如果发生了下面以下场景机会造成死锁:
- 程序处理业务请求逻辑异常并没有计数释放锁
- 进程挂了,没有机会释放锁
上面这两种情况都会导致已经获得锁的客户端一直占用着锁,导致其他客户端无法获取到锁。那么我们如何来解决了?最容易想到的就是在申请的时候给这个锁设置一个过期时间。注意我们在给锁设置过期时间的时候不能这样设置:
setnx lock 1
expire lock 10
这样是绝对不可以的,这样是存在线程安全问题的。因为这个步骤是分两部的,如果出现一下情况仍然会导致死锁:
- setnx lock 1执行成功,但是expire lock 10由于网络问题执行失败。
- setnx lock 1执行成功,但是expire lock 10由于redis宕机导致执行失败
- setnx lock 1执行成功,但是expire lock 10由于客户端崩溃导致执行失败
幸好redis扩展了set 命令的参数可以在set 的时候同时指定expire过期时间,这条操作是原子的。下面我们给出一个示例:
set lock 1 ex 10 nx
下面我们来使用golang语言来实现一把redis分布式锁
package main
import (
"github.com/go-redis/redis"
"sync"
"time"
)
//初始化redis连接
func initRedis() *redis.Client {
client := redis.NewClient(&redis.Options{
Addr: "101.35.98.26:6379", //redis服务器的ip地址
Password: "1234", //redis服务器的密码
DB: 0, //选择的是那个库
PoolSize: 100, //连接池大小
})
_, err := client.Ping().Result() //真的开始连接redis
if err != nil {
panic(err)
}
return client
}
type RedisLock struct {
rdb *redis.Client
Key string
expireTime int64
}
func NewRedisLock(key string, exp int64) *RedisLock {
return &RedisLock{
rdb: initRedis(),
Key: key,
expireTime: exp,
}
}
func (lock *RedisLock) Lock() {
for {
isLock, _ := lock.rdb.SetNX(lock.Key, 1, time.Duration(lock.expireTime)*time.Second).Result()
if !isLock { //加锁失败
time.Sleep(time.Millisecond * 30) //加锁失败睡眠30毫秒再次尝试进行加锁
} else { //加锁成功
break
}
}
}
func (lock *RedisLock) UnLock() {
_, err := lock.rdb.Del(lock.Key).Result()
//释放锁
if err != nil {
panic(err)
}
}
func main() {
lock := NewRedisLock("lock", 2)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
lock.Lock()
//业务逻辑
lock.UnLock()
}()
go func() {
defer wg.Done()
lock.Lock()
//业务逻辑
lock.UnLock()
}()
wg.Wait()
}
上面的代码有没有什么问题了?我们设置这个过期时间一般比这个业务正常情况下处理所需要的时间要多一点,那处理业务过程当中出现了异常了?下面我们举个例子来说名一下这个情况
- 客户端1加锁成功,开始操作共享资源。但是客户端操作共享资源太久超过了锁的过期时间,锁已经失效了。
- 此时客户端2尝试进行加锁那么此时,客户端2肯定是加锁成功的但是等客户端1操作共享资源完成之后,进行释放锁也就是del操作但是此时这把锁并不是客户端1的锁把客户端2的锁给释放了。
也就是存在严重的两个问题:
- 锁过期了
- 释放了别人的锁
这个解决办法很容易就是客户端加锁时设置一个只有自己知道的唯一标识,在释放锁时判断当前这把锁是不是自己的,如果是才进行删除。不是就啥都不干。下面给出修改后的代码
package main
import (
"fmt"
"github.com/go-redis/redis"
"math/rand"
"strconv"
"sync"
"time"
)
//初始化redis连接
func initRedis() *redis.Client {
client := redis.NewClient(&redis.Options{
Addr: "101.35.98.26:6379", //redis服务器的ip地址
Password: "1234", //redis服务器的密码
DB: 0, //选择的是那个库
PoolSize: 100, //连接池大小
})
_, err := client.Ping().Result() //真的开始连接redis
if err != nil {
panic(err)
}
return client
}
type RedisLock struct {
rdb *redis.Client
Key string //key
expireTime int64 //过期时间
id string //用来标识
}
func NewRedisLock(key string, exp int64) *RedisLock {
return &RedisLock{
rdb: initRedis(),
Key: key,
expireTime: exp,
id: "",
}
}
func (lock *RedisLock) Lock() {
id := rand.Int()
for {
isLock, _ := lock.rdb.SetNX(lock.Key, 1, time.Duration(lock.expireTime)*time.Second).Result()
if !isLock { //加锁失败
time.Sleep(time.Millisecond * 30) //加锁失败睡眠30毫秒再次尝试进行加锁
fmt.Println("加锁失败")
} else { //加锁成功
lock.id = strconv.Itoa(id) //记录一下这个id
break
}
}
}
func (lock *RedisLock) UnLock() {
//判断一个当前这把锁是不是自己的
if id, _ := lock.rdb.Get(lock.Key).Result(); id == lock.id { //注意这两部不是原子的,后面我们详细说
_, err := lock.rdb.Del(lock.Key).Result()
//释放锁
if err != nil {
panic(err)
}
} else {
fmt.Println("当前锁不是自己的")
}
}
func main() {
lock := NewRedisLock("lock", 2)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
lock.Lock()
time.Sleep(time.Second * 3)
lock.UnLock()
}()
go func() {
defer wg.Done()
lock.Lock()
time.Sleep(time.Second * 2)
}()
wg.Wait()
}
但是这也带来了这个原子性的问题。释放锁的逻辑是通过GET+DEL两条命令来完成,并不是原子的,这时候可能会带来原子性的问题:
- 客户端1执行GET,判断锁是自己的,执行成功了
- 刚好此时锁的过期时间到了,客户端2开始执行setnx进行加锁,客户端1进行del释放送可能由于网络原因客户端2先执行成功,此时客户端1进行del释放锁释放的是客户端2的锁,张冠李戴。
那么我们如何让这两条命令原子性的执行了?答案是Lua脚本我们可以将上述逻辑写出Lua脚本。让redis执行,因为Redis的工作线程是单线程的,在执行Lua脚本时其他请求必须等待,这样GET和DEl之间就不会被其他命令干扰。
Lua脚本
Redis调用Lua脚本通过eval命令保证代码执行的原子性,直接使用return 来返回脚本执行后的结果。
下面我们来编写一个入门的案例hello lua通过Lua脚本来返回这个结果,先给出这个Lua脚本的格式。eval luascript numkeys [key [key …]] [arg [arg …]]
eval "return 'hello lua'" 0
执行结果如下:
在这里特别需要注意的时这个""里面的东西我们只能使用单引号,至于这个0是什么意思我们后面慢慢解释。
下面我们希望通过Lua脚本给我们执行一下命令:
set k1 v1
expire k1 60
get k1
通过Lua脚本让这三条命令一起执行完毕。如果我们需要执行redis的命令在Lua脚本当中我们使用redis.call()即可
eval "redis.call('set','k1','v1') redis.call('expire','k1','30') return redis.call('get','k1')" 0
注意这三条命令都有返回值,如果我们在第一个redis.call就return 是会报错的。所以我们需要在最后执行这个return命令。
还有这个k1,v1 我们这里是写死的,那我们能不能传参数了?当然可以我们下面看一下这个怎么写
eval "return redis.call('mset',KEYS[1],ARGV[1],KEYS[2],ARGV[2])" 2 k1 k2 lua1 lua2
其实最后面这个数字表示的是这个传递的参数个数,类似于我们之前所说的环境变量里面的argv和env那样。
注意在这里数组的下标是从1开始的特别需要注意的是。
下面我们来学习这个带这个if 和else 命令的lua脚本
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
下面我们使用sublime这个工具将其整合成为一行,然后我们来执行一下这个命令
eval "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end" 1 k1 v2
执行结果如下:
通过Lua脚本改造分布式锁
下面我们通过Lua脚本来执行一下改造一下上面那个redis分布式锁,保证其原子性
package main
import (
"fmt"
"github.com/go-redis/redis"
"math/rand"
"strconv"
"sync"
"time"
)
const (
script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end" //lua脚本
)
//初始化redis连接
func initRedis() *redis.Client {
client := redis.NewClient(&redis.Options{
Addr: "101.35.98.26:6379", //redis服务器的ip地址
Password: "1234", //redis服务器的密码
DB: 0, //选择的是那个库
PoolSize: 100, //连接池大小
})
_, err := client.Ping().Result() //真的开始连接redis
if err != nil {
panic(err)
}
return client
}
type RedisLock struct {
rdb *redis.Client
Key string //key
expireTime int64 //过期时间
id string //用来标识
}
func NewRedisLock(key string, exp int64) *RedisLock {
return &RedisLock{
rdb: initRedis(),
Key: key,
expireTime: exp,
id: "",
}
}
func (lock *RedisLock) Lock() {
id := rand.Int()
for {
isLock, _ := lock.rdb.SetNX(lock.Key, 1, time.Duration(lock.expireTime)*time.Second).Result()
if !isLock { //加锁失败
time.Sleep(time.Millisecond * 30) //加锁失败睡眠30毫秒再次尝试进行加锁
fmt.Println("加锁失败")
} else { //加锁成功
fmt.Println("加锁成功")
lock.id = strconv.Itoa(id) //记录一下这个id
break
}
}
}
func (lock *RedisLock) UnLock() {
lock.rdb.Eval(script, []string{lock.Key}, lock.id) //执行Lua脚本
}
func main() {
lock := NewRedisLock("lock", 2)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
lock.Lock()
time.Sleep(time.Second * 3)
lock.UnLock()
}()
go func() {
defer wg.Done()
lock.Lock()
time.Sleep(time.Second * 2)
}()
wg.Wait()
}
到这里我们的redis分布式锁就差不多了,但是我们没有解决则个锁的可重入性问题。在同一个线程当中我已经获取到锁了,我们在处理业务逻辑的时候我还需要获取到这把锁,应该成功才对。所以使用setnx命令是远远不够的。
可重入的Redis分布式锁
上面那个分布式锁并没有解决这个可重入问题和锁的续期。如何兼顾锁的可重入问题?
用一句话来说就是一个线程当中的多个流程可以获取到同一把锁,持有这把锁同步锁可以再次进入,自己可以获取自己的内部锁。
很明显我们使用setnx命令已经无法保证这个锁的可重入性了。此时我们可以使用hash这种数据类型,通过引用计数的方式来保证锁的可重入性。
下面我们通过这个redis命令来说明这个如何实现redis分布式锁。
同样的这里命令的条数有很多我们需要他们一起执行成功,依然需要通过Lua脚本来保证其原子性。下面我们开始来编写可重入redis分布式锁的加锁的Lua脚本
//加锁的lua脚本
if redis.call('exists','key')==0 then//锁是否存在
redis.call('hset','key','uuid:threadid','1')
return 1
elseif redis.call('hexists','key','uuid:threadid')==1 then//判断锁是不是自己的
redis.call('hincrby','key','uuid:threadid',1)
redis.call('expire','key',50)
return 1
else
return 0
end
上面这个是这个这个伪代码,其实这个if和elseif可以进行合并因为hincrby如果不存在会新建。那么这个lua脚本就可以合并成为这样
if redis.call('exists','key')==0 or redis.call('hexists','key','uuid:threadid')==1 then
redis.call('hincrby','key','uuid:threadid','1')
redis.call('expire','key','40')
return 1
else
return 0
end
下面我们可以将这个写死的进行参数替换
if redis.call('exists',KEYS[1])==0 or redis.call('hexists',KEYS[1],ARGV[1])==1 then
redis.call('hincrby',KEYS[1],ARGV[1],'1')
redis.call('expire',KEYS[1],ARGV[2])
return 1
else
return 0
end
我们在放到Redis当进行执行看看
eval "if redis.call('exists',KEYS[1])==0 or redis.call('hexists',KEYS[1],ARGV[1])==1 then redis.call('hincrby',KEYS[1],ARGV[1],1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end" 1 lock 123456 50
执行结果如上图所示。
下面我们来编写一下解锁的逻辑,其实同样的很简单,下面给出解锁的伪代码
//解锁unlock 方法
if redis.call('hexists',key,uuid:threadid)==0 then
return nil
elseif redis.call('hincrby',key,uuid:threadid,-1)==0 then
return redis.call('del',key)
else
return 0
end
下面我们记录将这个写死的变量,写出参数传递的样子。下面我直接给出这个代码
if redis.call('hexists',KEYS[1],ARGV[1])==0 then
return nil
elseif redis.call('hincrby',KEYS[1],ARGV[1],-1)==0 then
return redis.call('del',KEYS[1])
else
return 0
end
我们先加锁三次然后在解锁三次我们在次查看这个锁就把删除了。这个非常的舒服哈哈哈下面我们进入编码阶段编写一个可插入的redis分布式锁.
package main
import (
"context"
"fmt"
"github.com/go-redis/redis"
"strconv"
"sync"
"time"
)
const (
lockLua = "if redis.call('exists',KEYS[1])==0 or redis.call('hexists',KEYS[1],ARGV[1])==1 then redis.call('hincrby',KEYS[1],ARGV[1],1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end"
)
const (
unlockLua = "if redis.call('hexists',KEYS[1],ARGV[1])==0 then return -1 elseif redis.call('hincrby',KEYS[1],ARGV[1],-1)==0 then return redis.call('del',KEYS[1]) else return 0 end"
)
const (
keepAlive = "if redis.call('hexists',KEYS[1],ARGV[1])==1 then return redis.call('expire',KEYS[1],ARGV[2]) else return 0 end"
)
//初始化redis连接
func initRedis() *redis.Client {
client := redis.NewClient(&redis.Options{
Addr: "101.35.98.26:6379", //redis服务器的ip地址
Password: "1234", //redis服务器的密码
DB: 0, //选择的是那个库
PoolSize: 100, //连接池大小
})
_, err := client.Ping().Result() //真的开始连接redis
if err != nil {
panic(err)
}
return client
}
type RedisLock struct {
rdb *redis.Client
Key string //key
expireTime int64 //过期时间
id string //用来标识
ctx context.Context //用来控制任务
cancel context.CancelFunc //取消函数
MaxWait time.Duration //等待锁的最长超时时间
}
func NewRedisLock(key string, exp int64) *RedisLock {
return &RedisLock{
rdb: initRedis(),
Key: key,
expireTime: exp,
id: "",
ctx: context.Background(),
MaxWait: time.Second * 10,
cancel: nil,
}
}
func (lock *RedisLock) TryLock(id int) bool {
res := lock.rdb.Eval(lockLua, []string{lock.Key}, id, lock.expireTime)
fmt.Printf("the id is%d
", id)
if res.Err() != nil {
fmt.Println(res.Err())
return false
}
val, ok := res.Val().(int64) //类型断言
fmt.Println(val)
if ok && val == 1 {
if lock.cancel == nil { //说明已经设置过了
ctx, cancel := context.WithCancel(lock.ctx)
lock.cancel = cancel
go keepAliveWorker(ctx, lock)
}
lock.id = strconv.Itoa(id)
return true
}
return false
}
func (lock *RedisLock) Lock(id int) (err error) {
res := lock.TryLock(id)
if res == true {
fmt.Printf("加锁成功%d", id)
return nil
}
ctx, _ := context.WithTimeout(lock.ctx, lock.MaxWait)
for {
waiter := time.After(time.Duration(30) * time.Millisecond) //每隔30秒获取一次
select {
case <-waiter:
res = lock.TryLock(id)
if res {
fmt.Printf("获取锁成功%d", id)
return nil
}
case <-ctx.Done():
fmt.Println("获取锁超时")
return fmt.Errorf("获取锁超时")
}
}
return
}
func keepAliveWorker(ctx context.Context, lock *RedisLock) {
waiter := time.Tick(time.Duration(lock.expireTime/3) * time.Second)
for {
select {
case <-waiter: //执行续租任务
res := lock.rdb.Eval(keepAlive, []string{lock.Key}, lock.id, lock.expireTime)
fmt.Printf("the 续期id为%s", lock.id)
if res.Err() != nil {
panic(res.Err())
}
if res.Val().(int64) == 0 {
fmt.Println("锁已经过期了不需要在续了")
return
} else {
fmt.Println("续期成功")
}
case <-ctx.Done():
return //不需要续租了
}
}
}
func (lock *RedisLock) UnLock() error {
res := lock.rdb.Eval(unlockLua, []string{lock.Key}, lock.id)
if res.Err() != nil {
fmt.Println(res.Err())
return res.Err()
}
val := res.Val().(int64)
fmt.Printf("the unlockval is%d %s
", val, lock.id)
if val == 1 && lock.cancel != nil {
lock.cancel()
fmt.Printf("释放锁成功%s
", lock.id)
lock.cancel = nil
return nil
} else {
fmt.Printf("还没解锁完%s", lock.id)
return fmt.Errorf("还没有解锁完")
}
}
func main() {
lock := NewRedisLock("lock", 10)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
lock.Lock(1)
time.Sleep(time.Second * 20)
lock.UnLock()
}()
time.Sleep(time.Second * 1)
go func() {
defer wg.Done()
lock.Lock(2)
fmt.Println("locked?")
//lock.UnLock()
}()
wg.Wait()
lock.Lock(5)
}
本次编写的redis分布式锁考虑了这个续期的问题,也就是万一处理业务时间太长,我们需要加个钟,所以我们就可以加锁成功之后马上创建一个协程定时的去查看锁的是否过期。这个也叫做看门狗。
如果此时客户端加锁成功后就宕机了?那么看门狗线程就不存在也就无法为这个锁续期了,锁到期就自动释放了。
RedLock
我们之前说的之前的锁都是一台机器实列当中,如果这台机器挂了那么完蛋了无法提供加锁和解锁服务。
Redis发展到现在,几种常见的部署架构有:
- 单机模式;
- 主从模式;
- 哨兵(sentinel)模式;
- 集群模式;
我们使用Redis时,一般会采用主从集群+哨兵的模式部署,哨兵的作用就是监测redis节点的运行状态。普通的主从模式,当master崩溃时,需要手动切换让slave成为master,使用主从+哨兵结合的好处在于,当master异常宕机时,哨兵可以实现故障自动切换,把slave提升为新的master,继续提供服务,以此保证可用性。那么当主从发生切换时,分布式锁依旧安全吗?
想像这样的场景:
1.客户端1在master上执行SET命令,加锁成功
2.此时,master异常宕机,SET命令还未同步到slave上(主从复制是异步的)
3.哨兵将slave提升为新的master,但这个锁在新的master上丢失了,导致客户端2来加锁成功了,两个客户端共同操作共享资源。
可见,当引入Redis副本后,分布式锁还是可能受到影响。即使Redis通过sentinel保证高可用,如果这个master节点由于某些原因发生了主从切换,那么就会出现锁丢失的情况。
为了避免Redis实例故障而导致的锁无法工作的问题,Redis的开发者 Antirez提出了分布式锁算法Redlock。Redlock算法的基本思路,是让客户端和多个独立的Redis实例依次请求加锁,如果客户端能够和半数以上的实例成功地完成加锁操作,那么我们就认为,客户端成功地获得分布式锁了,否则加锁失败。这样一来,即使有单个Redis实例发生故障,因为锁变量在其它实例上也有保存,所以,客户端仍然可以正常地进行锁操作,锁变量并不会丢失。
来具体看下Redlock算法的执行步骤。Redlock算法的实现要求Redis采用集群部署模式,无哨兵节点,需要有N个独立的Redis实例(官方推荐至少5个实例)。接下来,我们可以分成3步来完成加锁操作。
- 第一步是,客户端获取当前时间。
- 第二步是,客户端按顺序依次向N个Redis实例执行加锁操作。
- 这里的加锁操作和在单实例上执行的加锁操作一样,使用SET命令,带上NX、EX/PX选项,以及带上客户端的唯一标识。当然,如果某个Redis实例发生故障了,为了保证在这种情况下,Redlock算法能够继续运行,我们需要给加锁操作设置一个超时时间。如果客户端在和一个Redis实例请求加锁时,一直到超时都没有成功,那么此时,客户端会和下一个Redis实例继续请求加锁。加锁操作的超时时间需要远远地小于锁的有效时间,一般也就是设置为几十毫秒。
- 第三步是,一旦客户端完成了和所有Redis实例的加锁操作,客户端就要计算整个加锁过程的总耗时。客户端只有在满足两个条件时,才能认为是加锁成功,条件一是客户端从超过半数(大于等于 N/2+1)的Redis实例上成功获取到了锁;条件二是客户端获取锁的总耗时没有超过锁的有效时间。
- 为什么大多数实例加锁成功才能算成功呢?多个Redis实例一起来用,其实就组成了一个分布式系统。在分布式系统中总会出现异常节点,所以在谈论分布式系统时,需要考虑异常节点达到多少个,也依旧不影响整个系统的正确运行。这是一个分布式系统的容错问题,这个问题的结论是:如果只存在故障节点,只要大多数节点正常,那么整个系统依旧可以提供正确服务。
- 在满足了这两个条件后,我们需要重新计算这把锁的有效时间,计算的结果是锁的最初有效时间减去客户端为获取锁的总耗时。如果锁的有效时间已经来不及完成共享数据的操作了,我们可以释放锁,以免出现还没完成共享资源操作,锁就过期了的情况。