您现在的位置是:首页 >学无止境 >秒杀抢购案例,基于 Redis 实现网站首页学无止境

秒杀抢购案例,基于 Redis 实现

迷迷的k 2024-06-17 11:25:06
简介秒杀抢购案例,基于 Redis 实现

目录

1、关于全局唯一 ID 生成器

1.1 需要满足的特性

1.2 代码实现

1.3 其他的唯一 ID 生成策略

2、实现秒杀下单

2.1 超卖问题的产生

2.2 超卖问题的分析与解决

2.21 悲观锁与乐观锁 

2.22 乐观锁中的两种常用方案       

▶️version 版本控制方案

▶️CAS方案

2.3 实现一人一单

2.4 使用 @Transactional 注解 声明事务时的四种情况

2.5 集群下的线程并发安全问题

3、使用分布式锁解决多服务访问的问题

3.1 分布式锁误删情况分析以及解决

3.2 分布式锁原子性问题分析以及解决


1、关于全局唯一 ID 生成器

由来:

当用户抢购时,所生成的订单信息会保存到对应订单的数据库表中,而订单表如果使用数据库自增ID就存在一些问题:

  • id的规律性太明显

  • 受单表数据量的限制

特点:

以 "天" 为单位作为 key 值,方便统计订单量

场景分析一:

        如果我们的id具有太明显的规则,用户或者说商业对手很容易猜测出来我们的一些敏感信息,比如商城在一天时间内,卖出了多少单,这明显不合适。

场景分析二:

        随着我们商城规模越来越大,mysql的单表的容量不宜超过500W,数据量过大之后,我们要进行拆库拆表,但拆分表了之后,他们从逻辑上讲他们是同一张表,所以他们的id是不能一样的, 于是乎我们需要保证id的唯一性。

1.1 需要满足的特性

为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:

组成部分:

符号位:1bit,永远为0

时间戳:31bit,以秒为单位,可以使用69年

序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID

1.2 代码实现

  • 先设置开始的时间戳(可以自定义)
/**
     * 自定义开始时间戳
     */
    public static final Long BEGIN_TIME_START = 1672531200L;

    public static void main(String[] args) {
        //1. 这里是表示 2023.1.1 / 0 时 0 分 0秒
        LocalDateTime localDateTime = LocalDateTime.of(2023, 1, 1, 0, 0, 0);

        //2.这里将上面的时间变为以秒为单位,并设置时区
        long second = localDateTime.toEpochSecond(ZoneOffset.UTC);
        System.out.println("res:"+second);

    }
  • 这里是对时间戳与序列号的拼接,从而生成的唯一 ID
@Component
public class RedisSoleById {

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 自定义开始时间戳
     */
    private static final Long BEGIN_TIME_START = 1672531200L;

    public static final int COUNT_BITS = 32;
    /**
     * 生成唯一 ID 的主方法
     */
    public long soleId(String prefixKey){

        //1.生成时间戳,使用现在的时间与设置的开始时间戳相减
        LocalDateTime now = LocalDateTime.now();
        long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
        long timestamp = nowSecond-BEGIN_TIME_START;

        //2.生成序列号
        //2.1 获取当前日期,精确到天
        String nowDay = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
        //2.2 进行 id 自增长,并且存放到 redis 中
        Long count = stringRedisTemplate.opsForValue().increment("icr:" + prefixKey + ":" + nowDay);

        /**
         * 3.进行拼接并返回
         *
         * count<< COUNT_BITS 由于一共有 64 位,序列号需要占 32 位,所以将时间戳往前移 32 位
         *
         * 然后将时间戳与序列号进行 "或" 运算
         */
        return timestamp<< COUNT_BITS | count;

    }
}

1.3 其他的唯一 ID 生成策略

  • UUID
  • Redis 自增
  • snowflack 雪花算法
  • 数据库自增


2、实现秒杀下单

这里先实现最原始的秒杀下单功能,然后进行逐层分析,进行完善代码o(* ̄▽ ̄*)ブ

2.1 超卖问题的产生

秒杀下单,分析图解:

代码实现:

@Transactional  //添加事务
    public Result seckillVoucher(Long voucherId) {

        //1.查询优惠券
        SeckillVoucher voucher = seckillVoucherService.getById(voucherId);

        //2.判断秒杀时间是否开始
        if(voucher.getBeginTime().isAfter(LocalDateTime.now())){

            return Result.fail("亲,秒杀活动还未开始!");
        }

        //3.判断秒杀时间是否结束
        if(voucher.getEndTime().isBefore(LocalDateTime.now())){

            return Result.fail("亲,秒杀活动已经结束!");
        }

        //4.判断库存是否充足
        if(voucher.getStock()<1){

            return Result.fail("库存已经清空,尽情等待...");
        }

        //5.扣减库存
        //TODO 这里使用 lambda 方式来进行模拟超卖问题可能模拟失败
//        LambdaUpdateWrapper<SeckillVoucher> updateWrapper = new LambdaUpdateWrapper<>();
//        updateWrapper.eq(SeckillVoucher::getVoucherId,voucherId);
//        updateWrapper.set(SeckillVoucher::getStock,voucher.getStock()-1);
//        boolean update = seckillVoucherService.update(updateWrapper);
        boolean update = seckillVoucherService.update()
                .setSql("stock= stock -1")
                .eq("voucher_id", voucherId).update();

        if(!update){
            return Result.fail("库存已经清空,尽情等待...");
        }

        //6.创建订单
        //6.1 根据自己的业务,分别传入订单ID、用户ID、优惠券 ID
        VoucherOrder voucherOrder = new VoucherOrder();
        long orderId = redisSoleById.soleId("order");   //使用全局唯一 ID 生成器生成 id
        voucherOrder.setId(orderId);     //订单

        UserDTO user = UserHolder.getUser();
        voucherOrder.setUserId(user.getId());   //用户

        voucherOrder.setVoucherId(voucherId);   //优惠券

        voucherOrderService.save(voucherOrder);

        //7.返回订单 ID
        return Result.ok(orderId);
    }

这里进行上面代码的压测后的分析:

数据库中的优惠券秒杀总数量一共有100张,这里模拟一秒以内有200人进行抢购

         这里可以看见,当有200人进行瞬间抢购时,JM测试中的异常比例只达到了45.5%,而数据库中的优惠券剩余数量为-9;按理说,200人抢100张,异常比例应该达到50%,而数据库中的数据应该为0,而非负数。 这,就是经典的超卖问题

 

 2.2 超卖问题的分析与解决

超卖图解分析:

        “线程一” 首先查询优惠券的库存,而在 “线程一” 扣减之前 “线程二” 也进行了查询,这样,“线程一”“线程二” 所查询的库存数量都是一样的,因为在此之前,没有进行库存的扣减工作;然后再进行对应的库存扣减,这样下来,经过线程三、四、....N,超卖问题就随之产生了

2.21 悲观锁与乐观锁 


解决方案: 

 进行加锁,这里有悲观锁和乐观锁两种加锁方案

2.22 乐观锁中的两种常用方案       

乐观锁中的关键,是判断之前查询的数据是否被修改过

  • ▶️version 版本控制方案

        第一个线程在操作后,数据库中的version变成了2,但是他自己满足version=1 ,所以没有问题,此时线程2执行,线程2 最后也需要加上条件version =1 ,但是现在由于线程1已经操作过了,vsersion 的值已经变为了 2;所以线程2,操作时就不满足version=1 的条件了,所以线程2无法执行成功

 

线程一进行操作过后,进行了 version 版本号的添加 

 这时,线程二再去进行操作的时候,由于查询的条件还是之前的版本号,所以最终由于 version 版本号的不一致(version 等于线程一修改后的 2,而不是等于之前的 1 ),而导致操作失败

  • ▶️CAS方案

         这里只是对之前的 version 版本控制方案做了优化,将 version 字段取出,使用 stock 库存来代替版本号的查询判断

 

在 线程一 查询库存,且进行了对应的减库存操作后,此时,库存值发生了改变

 当 线程二 还是拿着之前的数据作为查询条件时,则会导致操作失败

由于 CAS 方案比 version 版本控制方案更简便,这里使用 CAS 方案来进行实现

代码实现(只展示更改了的部分):

boolean update = seckillVoucherService.update()
                .setSql("stock= stock -1")
                .eq("voucher_id", voucherId)
                .eq("stock",voucher.getStock())
                .update();

  • 存在问题:

这里进行压测后发现,虽然说数据库中没有存在超卖问题,但是JM 测试的异常比例过高,就业务而言,这样是显然不可行的(JM 测试数据:线程数200,时间0s,循环1)

  •  方案改进: 

分析问题:

        由于乐观锁中的数据判断太过严谨,可能会导致大片的线程由于前一个线程的操作更改,而大量的失败,从而导致了异常比例过高的现象

方案提议:

        这里将条件中  “与之前库存数量作等值判断”  的语句删除,改为  “当前库存大于0” 因为数据库中 update 自带了锁机制,使之串行化执行,由于线程并发就不会发生,所以就不存在前一个线程更改了库存数据,而导致后面一大片线程操作失败的现象

代码改进:

        boolean update = seckillVoucherService.update()
                .setSql("stock= stock -1")
                .eq("voucher_id", voucherId)
//                .eq("stock",voucher.getStock())
                .gt("stock",0)
                .update();

 

压测结果:

通过压测结果可以发现,JM 异常比例刚好达到了 50 %(JM 测试:200人抢100张优惠券),而且数据库中,库存刚好为0,不多不少( ̄︶ ̄)↗ 

2.3 实现一人一单

经过上面的过程,我们已经初步解决了超卖问题

但是,值得注意的是,虽然上面保证了不会超卖,但是不能保证一个用户只能抢一次优惠券,即同一个线程只能成功的执行一次,执行完后,这一个线程后面的操作将无效,机会留给未抢到的线程

如图所示:

方案分析:

        通过上面这张表可知,用户 ID 与 优惠券ID 是相关联的;所以,我们这里增加一个条件,即根据当前 用户ID 与 优惠券ID,做一个条件查询依据,若查询的结果不为空,则表示当前用户已经成功的抢到了优惠券,之后便不能再进行争抢操作,将机会留给其他线程 

代码实现:

@Transactional  //添加事务
    public Result seckillVoucher(Long voucherId) {

        //1.查询优惠券
        SeckillVoucher voucher = seckillVoucherService.getById(voucherId);

        //2.判断秒杀时间是否开始
        if(voucher.getBeginTime().isAfter(LocalDateTime.now())){

            return Result.fail("亲,秒杀活动还未开始!");
        }

        //3.判断秒杀时间是否结束
        if(voucher.getEndTime().isBefore(LocalDateTime.now())){

            return Result.fail("亲,秒杀活动已经结束!");
        }

        //4.判断库存是否充足
        if(voucher.getStock()<1){

            return Result.fail("库存已经清空,尽情等待...");
        }


        //5. 实现一人一单
        //5.1 根据优惠券 ID 和用户 ID 来查询订单
        UserDTO user = UserHolder.getUser();
        //5.2 判断订单是否存在
        LambdaQueryWrapper<VoucherOrder> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(VoucherOrder::getUserId,user.getId());
        queryWrapper.eq(VoucherOrder::getVoucherId,voucherId);
        int count = voucherOrderService.count(queryWrapper);
        //5.3 若存在,则给出提示信息
        if(count>0){
            return Result.fail("您已经成功参与此次活动,将机会留给其他人吧!");
        }


        //6.扣减库存
        //TODO 这里使用 lambda 方式来进行模拟超卖问题可能模拟失败
//        LambdaUpdateWrapper<SeckillVoucher> updateWrapper = new LambdaUpdateWrapper<>();
//        updateWrapper.eq(SeckillVoucher::getVoucherId,voucherId);
//        updateWrapper.set(SeckillVoucher::getStock,voucher.getStock()-1);
//        boolean update = seckillVoucherService.update(updateWrapper);
        boolean update = seckillVoucherService.update()
                .setSql("stock= stock -1")
                .eq("voucher_id", voucherId)
//                .eq("stock",voucher.getStock())
                .gt("stock",0)
                .update();

        if(!update){
            return Result.fail("库存已经清空,尽情等待...");
        }


        //7.创建订单
        //7.1 根据自己的业务,分别传入订单ID、用户ID、优惠券 ID
        VoucherOrder voucherOrder = new VoucherOrder();
        long orderId = redisSoleById.soleId("order");   //使用全局唯一 ID 生成器生成 id
        voucherOrder.setId(orderId);     //订单

        voucherOrder.setUserId(user.getId());   //用户

        voucherOrder.setVoucherId(voucherId);   //优惠券

        voucherOrderService.save(voucherOrder);

        //8.返回订单 ID
        return Result.ok(orderId);
    }

压测结果: 

通过压测结果可知,虽然单个线程的抢票数减少了,但还是同一个线程抢了多次

原因:线程并发问题

        当同一时刻多个线程执行到 用户ID 与 优惠券ID 关联查询的语句时,同时检测到其用户所对应的优惠券数量为0,所以同时进行争抢

改进方法:

使用悲观锁 syn 来进行加锁操作,实现串行化创建订单

/**
     * 这里将一人一单、扣减库存、创建订单做一个封装
     */
    @Transactional  //添加事务
    public synchronized Result createVoucherOrder(Long voucherId) {
        //5. 实现一人一单
        //5.1 根据优惠券 ID 和用户 ID 来查询订单
        UserDTO user = UserHolder.getUser();
        //5.2 判断订单是否存在
        LambdaQueryWrapper<VoucherOrder> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(VoucherOrder::getUserId,user.getId());
        queryWrapper.eq(VoucherOrder::getVoucherId, voucherId);
        int count = voucherOrderService.count(queryWrapper);
        //5.3 若存在,则给出提示信息
        if(count>0){
            return Result.fail("您已经成功参与此次活动,将机会留给其他人吧!");
        }


        //6.扣减库存
        boolean update = seckillVoucherService.update()
                .setSql("stock= stock -1")
                .eq("voucher_id", voucherId)
                .gt("stock",0)
                .update();

        if(!update){
            return Result.fail("库存已经清空,尽情等待...");
        }


        //7.创建订单
        //7.1 根据自己的业务,分别传入订单ID、用户ID、优惠券 ID
        VoucherOrder voucherOrder = new VoucherOrder();
        long orderId = redisSoleById.soleId("order");   //使用全局唯一 ID 生成器生成 id
        voucherOrder.setId(orderId);     //订单

        voucherOrder.setUserId(user.getId());   //用户

        voucherOrder.setVoucherId(voucherId);   //优惠券

        voucherOrderService.save(voucherOrder);

        //8.返回订单 ID
        return Result.ok(orderId);
    }

存在问题:

        这样添加锁,锁的粒度太粗了,在使用锁过程中,控制锁粒度 是一个非常重要的事情;因为如果锁的粒度太大,会导致每个线程进来都会锁住,所以我们需要去控制锁的粒度。

解决方法:

        根据当前用户 ID 来进行加锁判断,若是同一个用户的用户ID,则需要进行加锁判断,因为要判断该用户之前是否已经成功抢到优惠券;若不是同一个用户,则不需要进加锁; 

        如果我们直接使用 userId.toString() ,虽然拿到的可能是同一个数据,但是拿到的对象实际上是不同的对象,new出来的对象(地址不一样);我们使用锁必须保证锁必须是同一把,所以我们需要使用 intern() 方法,intern() 这个方法是从字符串常量池中,根据当前地址从而获取到对应数据 ,以保证最后拿到的是同一个对象

代码实现:

@Transactional  //添加事务
    public synchronized Result createVoucherOrder(Long voucherId) {
        //5. 实现一人一单
        //5.1 根据优惠券 ID 和用户 ID 来查询订单
        UserDTO user = UserHolder.getUser();

        synchronized (user.getId().toString().intern()) {
            //5.2 判断订单是否存在
            LambdaQueryWrapper<VoucherOrder> queryWrapper = new LambdaQueryWrapper<>();
            queryWrapper.eq(VoucherOrder::getUserId, user.getId());
            queryWrapper.eq(VoucherOrder::getVoucherId, voucherId);
            int count = voucherOrderService.count(queryWrapper);
            //5.3 若存在,则给出提示信息
            if (count > 0) {
                return Result.fail("您已经成功参与此次活动,将机会留给其他人吧!");
            }


            //6.扣减库存
            boolean update = seckillVoucherService.update()
                    .setSql("stock= stock -1")
                    .eq("voucher_id", voucherId)
                    .gt("stock", 0)
                    .update();

            if (!update) {
                return Result.fail("库存已经清空,尽情等待...");
            }


            //7.创建订单
            //7.1 根据自己的业务,分别传入订单ID、用户ID、优惠券 ID
            VoucherOrder voucherOrder = new VoucherOrder();
            long orderId = redisSoleById.soleId("order");   //使用全局唯一 ID 生成器生成 id
            voucherOrder.setId(orderId);     //订单

            voucherOrder.setUserId(user.getId());   //用户

            voucherOrder.setVoucherId(voucherId);   //优惠券

            voucherOrderService.save(voucherOrder);

            //8.返回订单 ID
            return Result.ok(orderId);
        }

    }

存在问题:

        由于方法上添加了事务,而 syn 锁是添加在其内部的。同一个用户,当线程一来获取锁并创建了订单后并释放锁;这时,事务可能还未提交,而此时另一个线程二又来获取锁并创建了订单,注意,线程二所查询到的数据库信息与线程一查询的数据一致,所以进行了同样的创建订单操作,此时又产生了并发问题≧ ﹏ ≦

解决方法:将 syn 锁添加到方法外部,将事务包括在内,达到事务提交后再进行释放锁

代码实现:

package com.hmdp.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.hmdp.dto.Result;
import com.hmdp.dto.UserDTO;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisSoleById;
import com.hmdp.utils.UserHolder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.time.LocalDateTime;

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {


    @Resource
    private SeckillVoucherServiceImpl seckillVoucherService;

    @Resource
    private RedisSoleById redisSoleById;    //这里是全局唯一 ID 生成方法

    @Resource
    private VoucherOrderServiceImpl voucherOrderService;

    /**
     * 这里是优惠券的秒杀抢购功能
     */
    @Override
    public Result seckillVoucher(Long voucherId) {

        //1.查询优惠券
        SeckillVoucher voucher = seckillVoucherService.getById(voucherId);

        //2.判断秒杀时间是否开始
        if(voucher.getBeginTime().isAfter(LocalDateTime.now())){

            return Result.fail("亲,秒杀活动还未开始!");
        }

        //3.判断秒杀时间是否结束
        if(voucher.getEndTime().isBefore(LocalDateTime.now())){

            return Result.fail("亲,秒杀活动已经结束!");
        }

        //4.判断库存是否充足
        if(voucher.getStock()<1){

            return Result.fail("库存已经清空,尽情等待...");
        }

        //TODO 代码改进,将锁添加到事务外面
        UserDTO user = UserHolder.getUser();
        synchronized (user.getId().toString().intern()) {
            return createVoucherOrder(voucherId);
        }

    }

    /**
     * 这里将一人一单、扣减库存、创建订单做一个封装
     */
    @Transactional  //添加事务
    public Result createVoucherOrder(Long voucherId) {
        //5. 实现一人一单
        //5.1 根据优惠券 ID 和用户 ID 来查询订单
        UserDTO user = UserHolder.getUser();

            //5.2 判断订单是否存在
            LambdaQueryWrapper<VoucherOrder> queryWrapper = new LambdaQueryWrapper<>();
            queryWrapper.eq(VoucherOrder::getUserId, user.getId());
            queryWrapper.eq(VoucherOrder::getVoucherId, voucherId);
            int count = voucherOrderService.count(queryWrapper);
            //5.3 若存在,则给出提示信息
            if (count > 0) {
                return Result.fail("您已经成功参与此次活动,将机会留给其他人吧!");
            }


            //6.扣减库存
            boolean update = seckillVoucherService.update()
                    .setSql("stock= stock -1")
                    .eq("voucher_id", voucherId)
                    .gt("stock", 0)
                    .update();

            if (!update) {
                return Result.fail("库存已经清空,尽情等待...");
            }


            //7.创建订单
            //7.1 根据自己的业务,分别传入订单ID、用户ID、优惠券 ID
            VoucherOrder voucherOrder = new VoucherOrder();
            long orderId = redisSoleById.soleId("order");   //使用全局唯一 ID 生成器生成 id
            voucherOrder.setId(orderId);     //订单

            voucherOrder.setUserId(user.getId());   //用户

            voucherOrder.setVoucherId(voucherId);   //优惠券

            voucherOrderService.save(voucherOrder);

            //8.返回订单 ID
            return Result.ok(orderId);

    }

}

存在问题:

        但是以上做法依然有问题,因为你调用的方法,其实是 this. 的方式调用的,事务想要生效,还得利用代理来生效,所以这个地方,我们需要获得原始的事务对象,来操作事务

部分代码改进:

注入当前类对象,给 Spring 容器管理

    @Resource
    private VoucherOrderServiceImpl voucherOrderService;
        UserDTO user = UserHolder.getUser();
        synchronized (user.getId().toString().intern()) {
            
            //TODO 方式一:这里使用代理对象来进行方法的调用,以保证事务的有效性
//            IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
//            return proxy.createVoucherOrder(voucherId);
            
            //TODO 方式二:也可以使用注入的方式进行引用
            return voucherOrderService.createVoucherOrder(voucherId);
        }

若使用 Proxy 代理类来实现,需要添加以下配置:

依赖引入:

<!--AOP 中代理对象管理的依赖-->
<dependency>
    <groupId>org.aspectj</groupId>
    <artifactId>aspectjweaver</artifactId>
</dependency>

主启动类中加上注解,进行暴露代理对象:

@EnableAspectJAutoProxy(exposeProxy = true) //这里进行暴露代理对象

压测结果:

同一个用户瞬间抢购200次,只成功了一次ヾ(≧▽≦*)o,从而达到了预期的效果

2.4 使用 @Transactional 注解 声明事务时的四种情况

  • 在不同类中,事务方法A调用非事务方法B,事务具有传播性,事务生效;
  • 在不同类中,非事务方法A调用事务方法B,事务生效;
  • 在同一个类中,事务方法A调用非事务方法B,事务生效;
  • 在同一个类中,非事务方法A调用事务方法B,事务失效,这是由于使用Spring AOP代理造成的,只有当事务方法被当前类以外的代码调用时,才会由Spring生成的代理对象来管理。

解决办法:

  1. 采用  AopContext.currentProxy().方法B名()  来进行调用
  2. ApplicationContext.getBean()
  3. 在当前类中注入自己
  4. 使用手动事务

2.5 集群下的线程并发安全问题

需要注意的是,以上只适合单机版,若多个服务同时执行的时候,由于多个 Tomcat 中对应着不同的 JVM ,所以所控制的锁也不一样,这样,就又会出现线程同步问题

有关锁失效原因分析:

        由于现在我们部署了多个tomcat,每个tomcat都有一个属于自己的jvm,那么假设在服务器A的tomcat内部,有两个线程,这两个线程由于使用的是同一份代码,那么他们的锁对象是同一个,是可以实现互斥的,但是如果现在是服务器B的tomcat内部,又有两个线程,但是他们的锁对象写的虽然和服务器A一样,但是锁对象却不是同一个,所以线程3和线程4可以实现互斥,但是却无法和线程1和线程2实现互斥,这就是 集群环境下,syn锁失效的原因,在这种情况下,我们就需要使用分布式锁来解决这个问题。

这里是负载均衡的配置:

 可见,在数据库中,库存在两个服务访问下(8081 / 8082),同一个用户中,进行抢票成功了两次,产生了线程并发问题


3、使用分布式锁解决多服务访问的问题

这里,使用 setnx 命令进行获取锁与释放锁:

  • 获取锁

NX 互斥:确保只有一个线程能够获取到锁?

  • 释放锁

3.1 分布式锁误删情况分析以及解决

关于锁误删说明:

当前持有锁的线程在锁的内部出现了阻塞,导致该线程的锁自动释放;(线程1)

这时线程2来尝试获得锁,就拿到了这把锁,然后线程2在持有锁执行过程中,线程1反应过来,继续执行,而线程1执行过程中,走到了删除锁逻辑,此时就会把本应该属于线程2的锁进行删除,这就是锁的误删

解决方法:在 Redis 中给当前线程设置一个线程名,或者UUID,以判断标识是否为当前线程;在获取锁的时候,将设置的标识存入,而在释放锁的时候,判断是否为对应的标识来进行释放锁,以防止误删操作

图解分析:

代码实现:

   这里是自定义工具类 SimpleRedisLock,用来对获取锁和释放锁,以及对锁标识的封装,实现分布式锁的功能

package com.hmdp.utils;

import cn.hutool.core.lang.UUID;
import cn.hutool.core.util.BooleanUtil;
import com.hmdp.service.ILock;
import org.springframework.data.redis.core.StringRedisTemplate;

import java.util.concurrent.TimeUnit;

public class SimpleRedisLock implements ILock {

    private StringRedisTemplate stringRedisTemplate;

    public static final String PREFIX_LOCK_NAME = "lock:";

    public static final String PRIFIX_ID = UUID.randomUUID().toString(true)+"-";     //这里使用 UUID 设置标识

    private String name;    //根据对应业务设置的名字

    public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
        this.stringRedisTemplate = stringRedisTemplate;
        this.name = name;
    }


    /**
     * 获取锁
     * @param timeOut 锁的超时时间
     */
    @Override
    public boolean tryLock(Long timeOut) {

        //1.获取当前线程的 ID ,作为标识
        String threadId = PRIFIX_ID+Thread.currentThread().getId();

        //2.封装信息到 setnx 命令中进行执行
        Boolean tryLock = stringRedisTemplate.opsForValue()
                .setIfAbsent(PREFIX_LOCK_NAME + name, threadId, timeOut, TimeUnit.SECONDS);    //若该对应 key 信息缺席才进行操作

        //3. 这里使用工具类,防止 java 自动拆箱,从而报空指针
        return BooleanUtil.isTrue(tryLock);
    }


    /**
     * 释放锁
     */
    @Override
    public void unLock() {

        //1.获取当前线程标识
        String threadId = PRIFIX_ID+Thread.currentThread().getId();

        //2.进行判断是否为之前获取锁的线程标识
        String beforeId = stringRedisTemplate.opsForValue().get(PREFIX_LOCK_NAME + name);
        if(threadId.equals(beforeId)){

            stringRedisTemplate.delete(PREFIX_LOCK_NAME + name);
        }

    }

}

这里是对优惠券秒杀功能的完善,添加了上面的 SimpleReidsLock 分布式锁的功能

/**
     * 这里是优惠券的秒杀抢购功能
     */
    @Override
    public Result seckillVoucher(Long voucherId) {

        //1.查询优惠券
        SeckillVoucher voucher = seckillVoucherService.getById(voucherId);

        //2.判断秒杀时间是否开始
        if(voucher.getBeginTime().isAfter(LocalDateTime.now())){

            return Result.fail("亲,秒杀活动还未开始!");
        }

        //3.判断秒杀时间是否结束
        if(voucher.getEndTime().isBefore(LocalDateTime.now())){

            return Result.fail("亲,秒杀活动已经结束!");
        }

        //4.判断库存是否充足
        if(voucher.getStock()<1){

            return Result.fail("库存已经清空,尽情等待...");
        }


        UserDTO user = UserHolder.getUser();
//        synchronized (user.getId().toString().intern()) {
//
//            //TODO 方式一:这里使用代理对象来进行方法的调用,以保证事务的有效性
//            IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
//            return proxy.createVoucherOrder(voucherId);
//
//            //TODO 方式二:也可以使用注入的方式进行引用
            return voucherOrderService.createVoucherOrder(voucherId);
//        }


        //TODO 这里 模拟 分布式锁进行实现
        //1.创建锁对象
        SimpleRedisLock simpleRedisLock = new SimpleRedisLock(stringRedisTemplate,"order:"+user.getId());

        //2.获取锁
        boolean tryLock = simpleRedisLock.tryLock(1500L);

        //3.判断是否获取到锁
        if(!tryLock){

            return Result.fail("同一个用户不能重复下单!");
        }

        try {
            return voucherOrderService.createVoucherOrder(voucherId);
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            //4.释放锁
            simpleRedisLock.unLock();
        }

    }

3.2 分布式锁原子性问题分析以及解决

问题产生:

        当 unlock 释放锁的时候,线程1 执行完 equals 后,在执行 delete 删除语句时,被阻塞了,这时锁被超时释放了,但是,线程1 还在执行 delete 删除锁的操作,它并不知道自己的锁被删除了

        这时,线程2 来进行获取锁,并且执行相关业务,就在这时,线程1 恢复行动能力,不再阻塞,继续执行释放锁的操作(注意,它现在不知道自己的锁已经被超时释放了,因为之前的判断语句已经通过,是执行delete 语句时被阻塞的),此时,就发生了很严重的问题,线程1 线程2 的锁删除了,以此类推.......

问题分析:

        这种问题再次引发了线程并发的问题,因为 equals 判断锁 和 delete 释放锁 这两个动作是独立的,互不干扰的,才会出现以上情况;

解决方法:

        现在,就需要将这两个动作相关联,也就是保证其原子性,要么同时成功,要么同时失败;这时,就可以用到 Lua 脚本语言来进行实现(我学的 lua 终于能用上了T_T)

Lua相关语法请参考:https://www.runoob.com/lua/lua-tutorial.html

Lua 脚本的代码实现:

        这里,KEYS【1】是 redis 中上次释放锁之前所存入的线程标识,ARGV【1】是当前的线程标识,将其进行对比

 创建一个 unlock.lua 文件:

-- 这里比较当前线程的标识,与锁中的标识是否一致
if(redis.call('get',KEYS[1]) == ARGV[1])    -- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标识
    then

    --若一致,则进行释放锁
    return redis.call('del',KEYS[1])
end
return 0

 这里是将之前的工具类进行完善,主要是在 unlock 释放锁的时候,利用 lua 的原子性进行操作,即将 equals判断 与 delete 删除 合并成一条语句,要么一起成功,要么一起失败

Lua 脚本语句与 Java 语法的关系图:

其中,sctipt 为具体的 Lua 脚本内容,key 对应键,args 对应值

之前 SimpleRedisLock 工具类的代码改进:

注意,这里将 script 对应的 Lua 脚本内容进行预加载,尽可能的提高性能

package com.hmdp.utils;

import cn.hutool.core.lang.UUID;
import cn.hutool.core.util.BooleanUtil;
import com.hmdp.service.ILock;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;

import java.util.Collections;
import java.util.concurrent.TimeUnit;

public class SimpleRedisLock implements ILock {

    private StringRedisTemplate stringRedisTemplate;

    public static final String PREFIX_LOCK_NAME = "lock:";

    public static final String PRIFIX_ID = UUID.randomUUID().toString(true)+"-";     //这里使用 UUID 设置标识

    private String name;    //根据对应业务设置的名字

    public static final DefaultRedisScript<Long> UNLOCK_SCRIPT;     //这里为 eval语法 中 script参数,表示对应的脚本内容

    /**
     * 在类加载的时候进行读取 lua 脚本文件
     */
    static {
        UNLOCK_SCRIPT = new DefaultRedisScript<>();
        UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));  //扫描 lua 文件,进行导入
        UNLOCK_SCRIPT.setResultType(Long.class);    //设置返回值类型
    }

    public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
        this.stringRedisTemplate = stringRedisTemplate;
        this.name = name;
    }


    /**
     * 获取锁
     * @param timeOut 锁的超时时间
     */
    @Override
    public boolean tryLock(Long timeOut) {

        //1.获取当前线程的 ID ,作为标识
        String threadId = PRIFIX_ID+Thread.currentThread().getId();

        //2.封装信息到 setnx 命令中进行执行
        Boolean tryLock = stringRedisTemplate.opsForValue()
                .setIfAbsent(PREFIX_LOCK_NAME + name, threadId, timeOut, TimeUnit.SECONDS);    //若该对应 key 信息缺席才进行操作

        //3. 这里使用工具类,防止 java 自动拆箱,从而报空指针
        return BooleanUtil.isTrue(tryLock);
    }


    /***
     * 释放锁,基于 Lua 脚本实现
     */
    @Override
    public void unLock(){

        //1. 调用 lua 脚本
        stringRedisTemplate.execute(UNLOCK_SCRIPT,
                Collections.singletonList(PREFIX_LOCK_NAME + name), //singletonList 返回一个长度只有1的不可变的集合,类似于视图,无法改变内容
                PRIFIX_ID+Thread.currentThread().getId());  //获取当前线程的标识信息
    }


    /**
     * 释放锁
     */
//    @Override
//    public void unLock() {
//
//        //1.获取当前线程标识
//        String threadId = PRIFIX_ID+Thread.currentThread().getId();
//
//        //2.进行判断是否为之前获取锁的线程标识
//        String beforeId = stringRedisTemplate.opsForValue().get(PREFIX_LOCK_NAME + name);
//        if(threadId.equals(beforeId)){
//
//            stringRedisTemplate.delete(PREFIX_LOCK_NAME + name);
//        }
//    }

}

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。