您现在的位置是:首页 >技术交流 >【异步秒杀实现过程】网站首页技术交流
【异步秒杀实现过程】
秒杀优化方案(消息队列实现异步秒杀)
优化方案:我们将耗时比较短的逻辑判断放入到redis中,比如是否库存足够,比如是否一人一单,这样的操作,只要这种逻辑可以完成,就意味着我们是一定可以下单完成的,我们只需要进行快速的逻辑判断,根本就不用等下单逻辑走完,我们直接给用户返回成功.
再在后台开一个线程,后台线程慢慢的去执行queue里边的消息,这样程序不就超级快了吗?而且也不用担心线程池消耗殆尽的问题,因为这里我们的程序中并没有手动使用任何线程池,当然这里边有两个难点
实现流程
需求:
- 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
- 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
- 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
- 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
这里我们将库存信息存入redis,可以在redis进行逻辑下单,在mysql进行真实下单,实现了异步下单.
难点一:
第一个难点是我们怎么在redis中去快速校验一人一单,还有库存判断?
难点一:解决方案
使用lua脚本保证并发安全,保证原子性,实际上就是隐式的加锁
-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 1.3.订单id
local orderId = ARGV[3]
-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId
-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
-- 3.2.库存不足,返回1
return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
-- 3.3.存在,说明是重复下单,返回2
return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId redis的逻辑下单。 但是数据库并没有下单
redis.call('sadd', orderKey, userId)
-- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
-- redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0
//这里通过库存key查看库存,用于检查库存超卖.
//这里使用到了set集合,用于判断当前商品key中的集合值是否有当前用户,解决一人一单的判断.
//这里使用了set的add来进行下单,操作是将userid添加到集合中.
同时将下单需要的信息存入队列.(这一步放lua最好,放入代码块不能保证被原子性.)
难点二
第二个难点是由于我们校验和tomct下单是两个线程,那么我们如何知道到底哪个单他最后是否成功,或者是下单完成,为了完成这件事我们在redis操作完之后,我们会将一些信息返回给前端,同时也会把这些信息丢到异步queue中去,后续操作中,可以通过这个id来查询我们tomcat中的下单逻辑是否完成了。
难点二:解决方案
我们使用消息队列,这里我门为了简单,则直接使用java包中的阻塞队列
去实现下单到MYSQL
以下三个步骤为
1.添加秒杀券
2.redis实现逻辑下单
3.mysql实现真实下单.
添加秒杀券
1.在mysql添加秒杀券信息。
2.在redis添加库存信息。
库存 添加到redis其实是为了方便实现异步扣库存。
代码如下
@Override
@Transactional
//这个事务应该-没有效果吧,我觉得应该使用分布式事务。
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息 --数据库添加秒杀券。
SeckillVoucher seckillVoucher = new SeckillVoucher();`在这里插入代码片`
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
// 保存秒杀库存到Redis中
//SECKILL_STOCK_KEY 这个变量定义在RedisConstans中
// private static final String SECKILL_STOCK_KEY ="seckill:stock:";
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}
实现redis的逻辑下单(Redis层面)
Redis实现逻辑秒杀(数据库并未更新,只不过是把需要更新的数据放入了消息队列,等待后续的异步更新数据库)
方法一:实现redis的逻辑下单(Redis层面)
1.写lua脚本
1.1 -通过比较redis的库存是否大于0,来判断是否超卖。
1.2-通过set 来判断,set集合中是否有当前用户,来判断一人一单(是否重复下单)
1.3 -可以下单
2.执行lua脚本
3.判断是否逻辑下单
4.若可以下单,则将下单所需要相关信息存入消息队列。
@Override
public Result seckillVoucher(Long voucherId) {
// Long userId = UserHolder.getUser().getId();
// Long orderId = redisIdWorker.nextId("order");
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
// 1.执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(), String.valueOf(orderId)
);
int r = result.intValue();
// 2.判断结果是否为0
if (r != 0) {
// 2.1.不为0 ,代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
//3 TODO 保存阻塞队列
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setVoucherId(voucherId);
voucherOrder.setUserId(userId);
voucherOrder.setId(orderId);
orderTasks.add(voucherOrder);
//4.在主线程获取代理对象,方便子线程使用,
iVoucherOrderService = (IVoucherOrderService) AopContext.currentProxy();
//返回订单id
return Result.ok(orderId);
}
面试问题
1.为什么要使用消息队列,异步下单?
答:因为串行化执行耗时长,异步下单我们可以将耗时长的过程(操作数据库)存入消息队列
直接使用redis进行下单资格判断.逻辑上认为是否可以下单.
2.使用lua脚本的目的是干什么?
答:lua脚本保证多个操作的原子性,是解决并发安全的一种方式,
这里我们通过原子性
解决了超卖 --1
-一人一单-的问题,
同时还保证了下单到阻塞队列的安全.
实现数据库的真实下单(MYSQL)
1)这里我们主要是通过线程池开启一个线程去通过消息队列执行下单,将信息持久化到MYSQL.
1.创建线程池–使用Executors
2.使用 @PostConstruct注解 在类初始化之后立刻执行方法. --保证了程序一启动,任务可以被立刻执行.
3.创建任务类,通过循环,实现了从消息队列拿订单信息.实现下单.
代码如下
```java
//异步处理线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
//java代码实现阻塞队列
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
```java
//在类初始化之后执行,因为当这个类初始化好了之后,随时都是有可能要执行的
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
// 用于线程池处理的任务
// 当初始化完毕后,就会去从队列中去拿信息
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
// 1.获取队列中的订单信息
VoucherOrder voucherOrder = orderTasks.take();
// 2.创建订单
handleVoucherOrder(voucherOrder);
} catch (Exception e) {
log.error("处理订单异常", e);
}
}
}
}
面试问题:
1.线程池创建方式?
)1.通过 Executors 创建的线程池。
)2.通过 ThreadPoolExecutor 创建的线程池;
细分为:
1.
Executors.newFixedThreadPool:创建⼀个固定⼤⼩的线程池,可控制并发的线程数,超出的线程会在队列中等待;
2. Executors.newCachedThreadPool:创建⼀个可缓存的线程池,若线程数超过处理所需,缓存⼀段时间后会回收,若线程数不够,则新建线程;
3. Executors.newSingleThreadExecutor:创建单个线程数的线程池,它可以保证先进先出的执⾏顺序;
4. Executors.newScheduledThreadPool:创建⼀个可以执⾏延迟任务的线程池;
5. Executors.newSingleThreadScheduledExecutor:创建⼀个单线程的可以执⾏延迟任务的线程池;
6. Executors.newWorkStealingPool:创建⼀个抢占式执⾏的线程池(任务执⾏顺序不确定)【JDK1.8 添加】。
7. ThreadPoolExecutor:最原始的创建线程池的⽅式,它包含了 7 个参数可供设置,后⾯会详细讲。
2.如何使用线程池执行任务的?
这里就要区分是哪一种线程池了.
ThreadPoolExecutor是用的excute 执行任务
我们这个单例线程池是使用的submit执行任务.
3.如何保证项目启动时,阻塞队列的任务可以立刻被执行?
答:我们使用@PostConstruct这个注解,类初始化,构造方法执行后就立刻执行这个方法
我们在 @PostConstruct 标记的方法提交(submit)任务.
任务通过while循环从消息队列不断获取任务并执行…