应用系统定制开发Redis实现消息队列

在Redis应用系统定制开发中提供了三种实现的方式:

  1. List结构:基于List应用系统定制开发结构来模拟消息队列
  2. PubSub:应用系统定制开发基本的点对点消息模型
  3. Stream:应用系统定制开发较完善的消息队列模型

1. List应用系统定制开发实现消息队列

Redis的List应用系统定制开发类型是一个双向链表,应用系统定制开发而队列要求进,应用系统定制开发出口不能在同一个位置,所以可以利用List的添加取出命令来实现模拟消息队列。

  1. LPUSH,RPOP
  2. RPUSH,LPOP

但是java在消费消息的时候,如果没有消息了,消费者应该是阻塞等待,等到有消息投递了,再继续消费信息,而上述命令不是阻塞式的,如果没有消息了还在获取的话会获取到Null。所以应该实现阻塞的效果用下列命令

  1. BRPOP
  2. BLPOP

上述两个命令的取出效果是阻塞式的。

List实现消息队列的缺点:

  1. 无法避免消息丢失:例如消费者拿到消息还没有消费就宕机了
  2. 只能支持单个消费

2. 基于PubSub的消息队列

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

  1. SUBSCRIBE channel [channel] :订阅一个或多个频道
  2. PUBLISH channel msg :向一个频道发送消息
  3. PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道

这里的PSUBSCRIBE与RabbitMQ的匹配相似。

基于PubSub的消息队列的缺点:

  1. List支持数据持久化,但是PubSub不支持数据持久化

3. 基于的消息队列

Stream是Redis5.0引入的新的数据类型,可以实现一个功能较为完善的消息队列

添加命令

例如

XADD users * name jack age 21
  • 1

users是队列,*表示消息id ,后面的部分表示消息体

消费命令


当ID为$时代表读取最新的消息。

例如

XREAD COUNT 1 STREAMS users 0
  • 1

COUNT 1 代表每次只读取一条,STREAMS users 表示从users这个队列里读取

注意:Stream的消息队列消费消息后是不会剔除该消息的

缺点:当指定ID为$,代表读取最新的消息,如果在处理一条新消息的时候,突然来了5条消息,当再次读取最新消息时,只能读取到5条消息的最后一条,造成消息漏读的现象

Stream消息队列的优点:

  1. 消息可回溯(消费后不会被剔除)
  2. 消息可以被多个消费者读取
  3. 可以阻塞读取

3.1 消费者组

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

  1. 消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度,同一个消费者组里的消费者之间处于一种竞争的关系,消息是不会出现消费重复的,同时一定程度上也可以避免消息漏读的现象
  2. 消息标识:消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费
  3. 消息确认:消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移

如何创建消费者组?

XGROUP CREATE key groupName ID [MKSTREAM]
  • 1
  1. key:队列名称
  2. groupName:消费者组名称
  3. ID:起始ID标识,$代表队列中最后一个消息,0代表队列中第一个消息
  4. MKSTREAM:队列不存在时自动创建

如何从消费者组读取消息?

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key..] ID [ID..]
  • 1
  1. group:消费者组名称
  2. consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
  3. count:本次查询最大数量
  4. BLOCK milliseconds:是否阻塞?阻塞的时间
  5. NOACK:消费消息后不响应
  6. STREAMS key:指定队列名称
  7. ID:获取消息的起始ID >表示从下一个未消费的消息开始 。其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

那么消费者消费完消息后如何确认消息呢?

XACK key group ID [ID..]
  • 1
  1. key:队列名称
  2. group:消费者组名称
  3. ID:消息的ID

java手动模拟消费者监听消息的代码

while(true){            Object message = redis.call("XREADGROUP GROUP 你的消费组名称 消费者名称 COUNT 每次读取消息的数量 BLOCK 阻塞时间毫秒 STREAMS 队列名称 >");            if (message == null){                continue;            }            try{                // 处理消息的逻辑 处理完毕后要ACK                handleMessage(message);            }catch (Exception e){                while (true){                    // 从等待响应的队列里拿消息                    Object unAckMessage = redis.call("XREADGROUP GROUP 你的消费组名称 消费者名称 COUNT 每次读取消息的数量 BLOCK 阻塞时间毫秒 STREAMS 队列名称 >");                    if (unAckMessage == null){                        continue;                    }                    try {                        handleMessage(unAckMessage);                    }catch (Exception e1){                        continue;                    }                }            }        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发