定制化开发RabbitMQ延迟队列

目录


💌 介绍

顾名思义:定制化开发首先它要具有的特性,定制化开发再给它附加一个延迟消定制化开发费队列消息的功能,定制化开发也就是说可以指定队列定制化开发中的消息在哪个时间点被消费。

💒 使用场景

  • 定制化开发预支付订单创建成功后,30定制化开发分钟后还没有支付,定制化开发自动取消订单,定制化开发修改订单状态
  • 定制化开发用户注册成功后,如果3定制化开发天没有登录则进行短信提醒
  • 定制化开发优惠券过期前发送短信进行提醒
  • ....

定制化开发以上场景都可以用延时定制化开发队列来完成


🏳‍🌈 模拟案例

需求:定制化开发生产者发布消息,10秒、60秒后消费者拿到消息进行消费

📕 准备工作

导入依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

 配置RabbitMQ连接相关信息

  1. #MySQL
  2. spring:
  3. rabbitmq:
  4. host: 127.0.0.1
  5. port: 5672
  6. username: xxxx
  7. password: xxx
  8. server:
  9. port: 8087

🏴 写法一(死信队列TTL)

生产者生产消息——>到交换机分发给对应的队列(A10秒过期,B60秒过期)——>过期后到死信交换机——>消费者进行消费(执行顺序如下图)

 RabbitMQ配置文件

  1. import org.springframework.amqp.core.*;
  2. import org.springframework.beans.factory.annotation.Qualifier;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import java.util.HashMap;
  6. /**
  7. * @author 小影
  8. * @create: 2022/8/18 10:26
  9. * @describe:mq配置 如示例图配置:2交换机、4队列、4路由key
  10. */
  11. @Configuration
  12. public class RabbitMQConfiguration {
  13. // 延迟交换机
  14. public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
  15. // 延迟队列
  16. public static final String DELAY_QUEUE_NAME_A = "delay.queue.a";
  17. public static final String DELAY_QUEUE_NAME_B = "delay.queue.b";
  18. // 延迟队列路由key
  19. public static final String DELAY_QUEUE_ROUTING_KEY_A = "delay.routingKey.a";
  20. public static final String DELAY_QUEUE_ROUTING_KEY_B = "delay.routingKey.b";
  21. // 死信交换机
  22. public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange";
  23. // 死信队列
  24. public static final String DEAD_LETTER_QUEUE_NAME_A = "dead.letter.queue.a";
  25. public static final String DEAD_LETTER_QUEUE_NAME_B = "dead.letter.queue.b";
  26. // 私信队列路由key
  27. public static final String DEAD_LETTER_ROUTING_KEY_A = "dead.letter.delay_10s.routingkey.a";
  28. public static final String DEAD_LETTER_ROUTING_KEY_B = "dead.letter.delay_60s.routingkey.b";
  29. // 声明延迟交换机
  30. @Bean("delayExchange")
  31. public DirectExchange delayExchange() {
  32. return new DirectExchange(DELAY_EXCHANGE_NAME);
  33. }
  34. // 声明死信交换机
  35. @Bean("deadLetterExchange")
  36. public DirectExchange deadLetterExchange() {
  37. return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
  38. }
  39. // 声明延迟队列A,延迟10s,并且绑定到对应的死信交换机
  40. @Bean("delayQueueA")
  41. public Queue delayQueueA() {
  42. HashMap<String, Object> args = new HashMap<>();
  43. // 声明队列绑定的死信交换机
  44. args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
  45. // 声明队列的属性路由key
  46. args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_A);
  47. // 声明队列的消息TTL存活时间
  48. args.put("x-message-ttl", 10000);
  49. return QueueBuilder.durable(DELAY_QUEUE_NAME_A).withArguments(args).build();
  50. }
  51. // 声明延迟队列B,延迟60s,并且绑定到对应的死信交换机
  52. @Bean("delayQueueB")
  53. public Queue delayQueueB() {
  54. HashMap<String, Object> args = new HashMap<>();
  55. // 声明队列绑定的死信交换机
  56. args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
  57. // 声明队列的属性路由key
  58. args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_B);
  59. // 声明队列的消息TTL存活时间
  60. args.put("x-message-ttl", 60000);
  61. return QueueBuilder.durable(DELAY_QUEUE_NAME_B).withArguments(args).build();
  62. }
  63. // 声明死信队列A,用于接收延迟10S的消息
  64. @Bean("deadLetterQueueA")
  65. public Queue deadLetterQueueA() {
  66. return new Queue(DEAD_LETTER_QUEUE_NAME_A);
  67. }
  68. // 声明死信队列B,用于接收延迟60S的消息
  69. @Bean("deadLetterQueueB")
  70. public Queue deadLetterQueueB() {
  71. return new Queue(DEAD_LETTER_QUEUE_NAME_B);
  72. }
  73. // 设置延迟队列A的绑定关系
  74. @Bean
  75. public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue,
  76. @Qualifier("delayExchange") DirectExchange exchange) {
  77. return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY_A);
  78. }
  79. // 设置延迟队列B的绑定关系
  80. @Bean
  81. public Binding delayBindingB(@Qualifier("delayQueueB") Queue queue,
  82. @Qualifier("delayExchange") DirectExchange exchange) {
  83. return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY_B);
  84. }
  85. // 设置死信队列A的绑定关系
  86. @Bean
  87. public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
  88. @Qualifier("deadLetterExchange") DirectExchange exchange) {
  89. return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY_A);
  90. }
  91. // 设置死信队列B的绑定关系
  92. @Bean
  93. public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
  94. @Qualifier("deadLetterExchange") DirectExchange exchange) {
  95. return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY_B);
  96. }
  97. }

此配置文件的代码关系图如下

 生产者

  1. import static com.ying.demo.config.RabbitMQConfiguration.DELAY_EXCHANGE_NAME;
  2. import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY_A;
  3. import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY_B;
  4. /**
  5. * @author 小影
  6. * @create: 2022/8/18 11:13
  7. * @describe:延迟消息生产者
  8. */
  9. @Component
  10. public class DelayMessageProducer {
  11. @Resource
  12. private RabbitTemplate rabbitTemplate;
  13. public void send(String message,int type) {
  14. switch (type){
  15. case 1: // 10s的消息
  16. // param:队列名称、路由key、消息
  17. rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY_A, message);
  18. break;
  19. case 2:// 60s的消息
  20. rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY_B, message);
  21. break;
  22. }
  23. }
  24. }

消费者

  1. import com.rabbitmq.client.Channel;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. import java.time.LocalDateTime;
  7. import static com.ying.demo.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_NAME_A;
  8. import static com.ying.demo.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_NAME_B;
  9. /**
  10. * @author 小影
  11. * @create: 2022/8/18 11:19
  12. * @describe:死信消费者
  13. */
  14. @Slf4j
  15. @Component
  16. public class DeadLetterQueueConsumer {
  17. /**
  18. * 监听私信队列A
  19. * @param message
  20. * @param channel 作手动回执、确认
  21. */
  22. @RabbitListener(queues = DEAD_LETTER_QUEUE_NAME_A)
  23. public void receiveA(Message message, Channel channel) {
  24. String msg = new String(message.getBody());
  25. log.info("当前时间:{},死信队列A收到消息:{}", LocalDateTime.now(),msg);
  26. }
  27. /**
  28. * 监听私信队列B
  29. * @param message
  30. * @param channel 作手动回执、确认
  31. */
  32. @RabbitListener(queues = DEAD_LETTER_QUEUE_NAME_B)
  33. public void receiveB(Message message, Channel channel) {
  34. String msg = new String(message.getBody());
  35. log.info("当前时间:{},死信队列B收到消息:{}", LocalDateTime.now(),msg);
  36. }
  37. }

测试

  1. @Slf4j
  2. @RestController
  3. @RequestMapping("rabbitmq")
  4. public class RabbitMqController {
  5. @Resource
  6. private DelayMessageProducer producer;
  7. @GetMapping("send")
  8. public void send(String message, Integer type) {
  9. log.info("当前时间:{},消息:{},延迟类型:{}", LocalDateTime.now(), message, Objects.requireNonNull(type));
  10. producer.send(message, type);
  11. }
  12. }

分别请求

http://localhost:8089/rabbitmq/send?message=我是10秒&type=1

http://localhost:8089/rabbitmq/send?message=我是60秒&type=2

如果出现异常:Channel shutdown: channel error; protocol method:#method(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'delay.exchange' in vhost '/': received ''x-delayed-message'' but current is 'direct', class-id=40, method-id=10

可能是mq已经存在交换机了先去删掉

弊端:后期要扩展其他不同延时的时间,就需要增加延时的配置,非常麻烦


🏴 写法二 (死信队列TTL)

生产者生产消息(并设置过期时间)——>到交换机分发给延迟队列——>过期后到死信交换机——>消费者进行消费(执行顺序如下图)

 RabbitMQ配置文件

  1. import org.springframework.amqp.core.*;
  2. import org.springframework.beans.factory.annotation.Qualifier;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import java.util.HashMap;
  6. /**
  7. * @author 小影
  8. * @create: 2022/8/18 10:26
  9. * @describe:mq配置 如示例图配置:2交换机、2队列、2路由key
  10. */
  11. @Configuration
  12. public class RabbitMQConfiguration {
  13. // 延迟交换机
  14. public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
  15. // 延迟队列
  16. public static final String DELAY_QUEUE_NAME = "delay.queue";
  17. // 延迟队列路由key
  18. public static final String DELAY_QUEUE_ROUTING_KEY = "delay.routingKey";
  19. // 死信交换机
  20. public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange";
  21. // 死信队列
  22. public static final String DEAD_LETTER_QUEUE_NAME = "dead.letter.queue";
  23. // 私信队列路由key
  24. public static final String DEAD_LETTER_ROUTING_KEY = "dead.letter.routingkey";
  25. // 声明延迟交换机
  26. @Bean("delayExchange")
  27. public DirectExchange delayExchange() {
  28. return new DirectExchange(DELAY_EXCHANGE_NAME);
  29. }
  30. // 声明死信交换机
  31. @Bean("deadLetterExchange")
  32. public DirectExchange deadLetterExchange() {
  33. return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
  34. }
  35. // 声明延迟队列,不设置存活时间,并且绑定到对应的死信交换机
  36. @Bean("delayQueue")
  37. public Queue delayQueue() {
  38. HashMap<String, Object> args = new HashMap<>();
  39. // 声明队列绑定的死信交换机
  40. args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
  41. // 声明队列的属性路由key
  42. args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
  43. return QueueBuilder.durable(DELAY_QUEUE_NAME).withArguments(args).build();
  44. }
  45. // 声明死信队列
  46. @Bean("deadLetterQueue")
  47. public Queue deadLetterQueue() {
  48. return new Queue(DEAD_LETTER_QUEUE_NAME);
  49. }
  50. // 设置延迟队列的绑定关系
  51. @Bean
  52. public Binding delayBinding(@Qualifier("delayQueue") Queue queue,
  53. @Qualifier("delayExchange") DirectExchange exchange) {
  54. return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY);
  55. }
  56. // 设置死信队列的绑定关系
  57. @Bean
  58. public Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue,
  59. @Qualifier("deadLetterExchange") DirectExchange exchange) {
  60. return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY);
  61. }
  62. }

生产者

  1. import static com.ying.demo.config.RabbitMQConfiguration.DELAY_EXCHANGE_NAME;
  2. import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY;
  3. /**
  4. * @author 小影
  5. * @create: 2022/8/18 11:13
  6. * @describe:延迟消息生产者
  7. */
  8. @Component
  9. public class DelayMessageProducer {
  10. @Resource
  11. private RabbitTemplate rabbitTemplate;
  12. /**
  13. *
  14. * @param message 消息
  15. * @param delayTime 存活时间
  16. */
  17. public void send(String message,String delayTime) {
  18. // param:延迟交换机,路由KEY,存活时间
  19. rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY, message, msg -> {
  20. msg.getMessageProperties().setExpiration(delayTime);
  21. return msg;
  22. });
  23. }
  24. }

消费者

  1. import com.rabbitmq.client.Channel;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. import java.time.LocalDateTime;
  7. import static com.ying.demo.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_NAME;
  8. /**
  9. * @author 小影
  10. * @create: 2022/8/18 11:19
  11. * @describe:死信消费者
  12. */
  13. @Slf4j
  14. @Component
  15. public class DeadLetterQueueConsumer {
  16. /**
  17. * 监听私信队列A
  18. * @param message
  19. * @param channel 作手动回执、确认
  20. */
  21. @RabbitListener(queues = DEAD_LETTER_QUEUE_NAME)
  22. public void receiveA(Message message, Channel channel) {
  23. String msg = new String(message.getBody());
  24. log.info("当前时间:{},死信队列收到消息:{}", LocalDateTime.now(),msg);
  25. }
  26. }

测试

  1. @Slf4j
  2. @RestController
  3. @RequestMapping("rabbitmq")
  4. public class RabbitMqController {
  5. @Resource
  6. private DelayMessageProducer producer;
  7. @GetMapping("send")
  8. public void send(String message, String delayTime) {
  9. log.info("当前时间:{},消息:{},存活时间:{}", LocalDateTime.now(), message, delayTime);
  10. producer.send(message, delayTime);
  11. }
  12. }

分别请求

http://localhost:8089/rabbitmq/send?message=我是60秒&delayTime=60000

http://localhost:8089/rabbitmq/send?message=我是10秒&delayTime=10000

弊端:由于是先进先出的,如果60秒进去了,10秒在进去,10秒结束了,他要等60秒结束,60秒出来10秒才能出来


🚩 写法三 (插件版本-推荐)

安装插件后会生成新的Exchange类型 x-delayed-message ,该类型消息支持延迟投递机制,接收消息后并未立即将消息投递至目标队列,而是存储在mnesia(一个分布式数据库)中,随后检测消息延迟时间,如达到投递时间讲其通过 x-delayed-type 类型标记的交换机投至目标队列。 

插件安装

1.进入mq官网社区插件:

2.找到rabbitmq_delayed_message_exchange

 选择对应版本的ez文件下载

 

 

 注:我的MQ是通过yum安装的

 1.在系统中查看安装的rabbitmq

rpm -qa |grep rabbitmq

 2.查询mq的安装的相关文件目录

rpm -ql rabbitmq-server-3.10.7-1.el8.noarch

 翻到最下面发现mnesia的安装目录; mnesia=分布式数据库,看看就好

 然后把我们下载的ez安装包解压放到 /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.7/plugins 里面

3.重启RabbitMQ服务

systemctl restart rabbitmq-server.service

4.重启插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

 


RabbitMQ配置文件

  1. /**
  2. * @author 小影
  3. * @create: 2022/8/18 10:26
  4. * @describe:mq配置 如示例图配置:1交换机、1队列、1路由key
  5. */
  6. @Configuration
  7. public class RabbitMQConfiguration {
  8. // 延迟交换机
  9. public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
  10. // 延迟队列
  11. public static final String DELAY_QUEUE_NAME = "delay.queue";
  12. // 延迟队列路由key
  13. public static final String DELAY_QUEUE_ROUTING_KEY = "delay.routingKey";
  14. // 声明延迟交换机
  15. @Bean("delayExchange")
  16. public CustomExchange delayExchange() {
  17. HashMap<String, Object> args = new HashMap<>();
  18. args.put("x-delayed-type", "direct");
  19. return new CustomExchange(DELAY_EXCHANGE_NAME,"x-delayed-message",true,false,args);
  20. }
  21. // 声明延迟队列
  22. @Bean("delayQueue")
  23. public Queue delayQueue() {
  24. return new Queue(DELAY_QUEUE_NAME);
  25. }
  26. // 设置延迟队列的绑定关系
  27. @Bean
  28. public Binding delayBinding(@Qualifier("delayQueue") Queue queue,
  29. @Qualifier("delayExchange") CustomExchange exchange) {
  30. return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY).noargs();
  31. }
  32. }

生产者

  1. import static com.ying.demo.config.RabbitMQConfiguration.DELAY_EXCHANGE_NAME;
  2. import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY;
  3. /**
  4. * @author 小影
  5. * @create: 2022/8/18 11:13
  6. * @describe:延迟消息生产者
  7. */
  8. @Component
  9. public class DelayMessageProducer {
  10. @Resource
  11. private RabbitTemplate rabbitTemplate;
  12. /**
  13. *
  14. * @param message 消息
  15. * @param delayTime 存活时间
  16. */
  17. public void send(String message,Integer delayTime) {
  18. // param:延迟交换机,路由KEY,存活时间
  19. rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY, message, msg -> {
  20. msg.getMessageProperties().setDelay(delayTime);
  21. return msg;
  22. });
  23. }
  24. }

消费者

  1. import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_NAME;
  2. /**
  3. * @author 小影
  4. * @create: 2022/8/18 11:19
  5. * @describe:消费者
  6. */
  7. @Slf4j
  8. @Component
  9. public class DeadLetterQueueConsumer {
  10. /*
  11. * 监听私信队列
  12. * @param message
  13. * @param channel 作手动回执、确认
  14. */
  15. @RabbitListener(queues = DELAY_QUEUE_NAME)
  16. public void receiveA(Message message, Channel channel) {
  17. String msg = new String(message.getBody());
  18. log.info("当前时间:{},延迟队列收到消息:{}", LocalDateTime.now(),msg);
  19. }
  20. }

测试

  1. @Slf4j
  2. @RestController
  3. @RequestMapping("rabbitmq")
  4. public class RabbitMqController {
  5. @Resource
  6. private DelayMessageProducer producer;
  7. @GetMapping("send")
  8. public void send(String message, Integer delayTime) {
  9. log.info("当前时间:{},消息:{},存活时间:{}", LocalDateTime.now(), message, delayTime);
  10. producer.send(message, delayTime);
  11. }
  12. }

启动项目查看rabbitmq的可视化界面

如下图此时生成的交换机是x-delayed-message类型的

 分别发送:

http://localhost:8089/rabbitmq/send?message=我是60秒&delayTime=60000

http://localhost:8089/rabbitmq/send?message=我是10秒&delayTime=10000

 结局并不是60秒先被消费,完成了我们的意愿。

原理:

  1. 交换机里面有个数据库,生产者生产信息把这个信息放入数据库中
  2. 交换机里面的插件就会一直监听这个时间
  3. 时间到了把对应数据取出来,放入队列,让消费者进行消费

👍 延迟队列方法推荐 

 这是小编在开发学习使用和总结,  这中间或许也存在着不足,希望可以得到大家的理解和建议。如有侵权联系小编!

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发