定制软件开发微服务Spring Boot 整合Redis 基于Redis的Stream 消息队列 实现异步秒杀下单

文章目录

一、什么是 Redis ?

定制软件开发字面意思就是定制软件开发存放消息的队列。定制软件开发最简单定制软件开发的消息队列模型包括3个角色:

  • 消息队列:定制软件开发存储和管理消息,定制软件开发也被称为消息代理(Message Broker)
  • 生产者:定制软件开发发送消息到消息队列
  • 消费者:定制软件开发从消息队列获取消息并处理消息

定制软件开发使用队列的好处在于 解耦 定制软件开发解除数据之间的耦合性

定制软件开发这里最好的是使用MQ、RabbitMQ、RocketMQ、Kafka定制软件开发等消息队列,定制软件开发我们本节主要介绍 Redis 的消息队列。

二、Redis 消息队列 – 基于 Redis List 定制软件开发实现消息队列

基于List定制软件开发结构模拟消息队列

消息队列(Message Queue):字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟出队列效果。

队列是入口和出口不在一边,我们可以通过 LPush、RPOP、RPush、LPOP 这些来实现。

注意 : 如果获取 LPOP、RPOP获取消息如果没有的话,会直接返回null,所以我们使用阻塞:BLPOP、BRPOP来实现阻塞效果

基于List 结构的消息队列的优缺点?

优点:

  • 利用Redis存储、不受限于JVM 内存上限
  • 基于Redis 的持久化机制、数据安全性有保障
  • 可以满足消息有序性

缺点:

  • 无法避免消息丢失
  • 只支持单消费者

三、Redis 消息队列 – 基于 Pubsub 的消息队列

PubSub(发布订阅)Redis2.0版本引入的消息传递模型

顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

Pubsub 常用命令

SUBSCRIBE channel [channel] :订阅一个或多个频道PUBLISH channel msg :向一个频道发送消息PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道
  • 1
  • 2
  • 3

基于PubSub的消息队列有哪些优缺点?
优点

  • 采用发布订阅模型,支持多生产、多消费

缺点

  • 不支持数据持久化
  • 无法避免消息丢失
  • 消息堆积有上限,超出时数据丢失

四、基于Redis 的 的消费队列

Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

⛅Stream 简单语法

Stream 常用语法:

例如

创建为 users 的消息队列,并向其中发送一条消息 使用Redis 自动生成id

读取消息的方式之一:XRead

利用 XRead 读取一个消息

XRead 阻塞方式,读取最新的消息

在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果

注意: 当我们指定起始ID 为 $ 时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取的也是只有最新的一条,会出现消息漏读的问题

STREAM类型消息队列的XREAD命令特点

  • 消息可回溯
  • 一个消息可以被多个消费者读取
  • 可以阻塞读取
  • 有消息漏读的风险

⚡Stream 的消费者组

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

创建消费者组:

XGROUP CREATE key groupName ID [MKSTREAM]
  • 1
  • key:队列名称
  • groupName:消费者组名称
  • ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
  • MKSTREAM:队列不存在时自动创建队列

其它常用命令

删除指定的消费者组

XGROUP DESTORY key groupName
  • 1

给指定的消费者组添加消费者

XGROUP CREATECONSUMER key groupname consumername
  • 1

删除消费者组中的指定消费者

XGROUP DELCONSUMER key groupname consumername
  • 1

从消费者组读取消息:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • 1
  • group:消费组名称
  • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
  • count:本次查询的最大数量
  • BLOCK milliseconds:当没有消息时最长等待时间
  • NOACK:无需手动ACK,获取到消息后自动确认
  • STREAMS key:指定队列名称
  • ID:获取消息的起始ID:

>”:从下一个未消费的消息开始
其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

消费者监听消息的基本思路:

STREAM类型消息队列的XREADGROUP命令特点:

  • 消息可回溯
  • 可以多消费者争抢消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏读的风险
  • 有消息确认机制,保证消息至少被消费一次

三种消息队列对比

五、基于Redis Stream消息队列实现异步秒杀

需求:

  • 创建一个Stream类型的消息队列,名为stream.orders
  • 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
  • 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单

修改 seckill.lua 脚本

-- 1.3.订单idlocal orderId = ARGV[3]-- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
  • 1
  • 2
  • 3
  • 4
  • 5

修改VoucherOrderService

private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {    SECKILL_SCRIPT = new DefaultRedisScript<>();    SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));    SECKILL_SCRIPT.setResultType(Long.class);}private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();//在类初始化之后执行,因为当这个类初始化好了之后,随时都是有可能要执行的@PostConstructprivate void init() {    SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}/**     * 使用 Redis消息队列建立 读队列、编写下订单任务     */private class VoucherOrderHandler implements Runnable {    @Override    public void run() {        while (true) {            try {                // 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >                List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(                    Consumer.from("g1", "c1"),                    StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),                    StreamOffset.create("stream.orders", ReadOffset.lastConsumed())                );                // 2.判断订单信息是否为空                if (list == null || list.isEmpty()) {                    // 如果为null,说明没有消息,继续下一次循环                    continue;                }                // 解析数据                MapRecord<String, Object, Object> record = list.get(0);                Map<Object, Object> value = record.getValue();                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);                // 3.创建订单                createVoucherOrder(voucherOrder);                // 4.确认消息 XACK                stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());            } catch (Exception e) {                log.error("处理订单异常", e);                //处理异常消息 去 Pading-List读取消息                handlePendingList();            }        }    }}/**     *  Redis消息队列出现异常,调用此方法去 Pading—List中重新读取     */private void handlePendingList() {    while (true) {        try {            // 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0            List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(                Consumer.from("g1", "c1"),                StreamReadOptions.empty().count(1),                StreamOffset.create("stream.orders", ReadOffset.from("0"))            );            // 2.判断订单信息是否为空            if (list == null || list.isEmpty()) {                // 如果为null,说明没有异常消息,结束循环                break;            }            // 解析数据            MapRecord<String, Object, Object> record = list.get(0);            Map<Object, Object> value = record.getValue();            VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);            // 3.创建订单            createVoucherOrder(voucherOrder);            // 4.确认消息 XACK            stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());        } catch (Exception e) {            log.error("处理pendding订单异常", e);            try{                Thread.sleep(20);            }catch(Exception ee){                ee.printStackTrace();            }        }    }}private void handleVoucherOrder(VoucherOrder voucherOrder) {    //1.获取用户    Long userId = voucherOrder.getUserId();    // 2.创建锁对象    RLock lock = redissonClient.getLock("lock:order:" + userId);    // 3.尝试获取锁    boolean isLock = lock.tryLock();    // 4.判断是否获得锁成功    if (!isLock) {        // 获取锁失败,直接返回失败或者重试        log.error("不允许重复下单!");        return;    }    try {        //注意:由于是spring的事务是放在threadLocal中,此时的是多线程,事务会失效        proxy.createVoucherOrder(voucherOrder);    } finally {        // 释放锁        lock.unlock();    }}// 代理对象private IVoucherOrderService proxy;@Overridepublic Result seckillVoucher(Long voucherId) {    //获取用户    Long userId = UserHolder.getUser().getId();    //生成订单ID    long orderId = redisIdWorker.nextId("order");    // 1.执行lua脚本    Long result = stringRedisTemplate.execute(        SECKILL_SCRIPT,        Collections.emptyList(),        voucherId.toString(), userId.toString(), String.valueOf(orderId)    );    int r = result.intValue(); // 转成int    // 2.判断结果是否为0    if (r != 0) {        // 2.1.不为0 ,代表没有购买资格        return Result.fail(r == 1 ? "库存不足" : "不能重复下单");    }    //3.获取代理对象    proxy = (IVoucherOrderService) AopContext.currentProxy();    //4.返回订单id    return Result.ok(orderId);}@Transactionalpublic void createVoucherOrder (VoucherOrder voucherOrder){    // 5.一人一单逻辑    // 5.1.用户id    Long userId = voucherOrder.getUserId();    // 判断是否存在    int count = query().eq("user_id", userId)        .eq("voucher_id", voucherOrder.getId()).count();    // 5.2.判断是否存在    if (count > 0) {        // 用户已经购买过了        log.error("用户已经购买过了");    }    //6,扣减库存    boolean success = seckillVoucherService.update()        .setSql("stock= stock -1") //set stock = stock -1        .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock",0).update(); //where id = ? and stock > 0    // .eq("voucher_id", voucherId).eq("stock",voucher.getStock()).update(); //where id = ? and stock = ?    if (!success) {        //扣减库存        log.error("库存不足!");    }    save(voucherOrder);}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185

六、程序测试

ApiFox 简单测试

请求成功,完成基本测试,下面恢复数据库,进行压力测试

Jmeter 压力测试

Jmeter测试

查看Redis

查看MySQL

⛵小结

以上就是【Bug 终结者】对 微服务Spring Boot 整合Redis 基于Redis的Stream 消息队列 实现异步秒杀下单 的简单介绍,在分布式系统下,高并发的场景下,使用消息队列来实现秒杀下单,可见性能提升了很大! 在开发中,我们还是使用MQ比较多一点的,Redis 消息队列作为拓展,本次秒杀下单系列到此就更新完毕啦! 如有需要源码的,可去公众号获取!

如果这篇【文章】有帮助到你,希望可以给【Bug 终结者】点个赞👍,创作不易,如果有对【后端技术】、【前端领域】感兴趣的小可爱,也欢迎关注❤️❤️❤️ 【Bug 终结者】❤️❤️❤️,我将会给你带来巨大的【收获与惊喜】💝💝💝!

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发