定制软件开发RabbitMQ消息确认机制和消息重发机制

一.机制

定制软件开发首先我们要知道一条消定制软件开发息的传递过程。

生产者 -> 交换机 ->  队列

定制软件开发我们的生产者生产消息,定制软件开发生产完成的消息发送到交换机,定制软件开发由交换机去把这个消息定制软件开发转发到对应的队列上。定制软件开发这其中我们可能在生产者 -> 定制软件开发交换机丢失消息,也可能在 交换机 -> 定制软件开发队列上丢失消息。定制软件开发因此我们需要引入2个概念。

1: 定制软件开发生产者到交换机的可靠保证 (confirmCallback ) 确认回调机制

2: 交换机到队列的保证 (returnCallback ) 返回回调机制

二. 保证生产者到交换机的可靠传递

因为我们的消息都要经过路由,然后去对应的队列,所以第一条线路至关重要。我们使用confirm机制。这个confirm机制是一个异步的,也就是说我们发送一条消息之后可以继续发送下一条消息。比自带的事务好很多。

使用confirm机制首先需要在配置文件中开启confirm机制

  1. rabbitmq:
  2. host: localhost
  3. port: 5672
  4. virtual-host: /
  5. username: admin
  6. password: password
  7. # 开启生产者消息确认
  8. publisher-confirm-type: correlated

生产者代码

  1. @GetMapping("/send/{tel}")
  2. public Result send(@PathVariable("tel") String tel) {
  3. // 开启生产者回调
  4. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  5. @Override
  6. public void confirm(CorrelationData correlationData, boolean b, String s) {
  7. if (b) {
  8. log.info("消息发送到交换机成功");
  9. } else {
  10. log.error("消息发送到交换机失败,失败信息[{}]",s);
  11. }
  12. }
  13. });
  14. // 发送消息
  15. rabbitTemplate.convertAndSend(RabbitConstant.DIRECT_EXCHANGE,"sms",tel);
  16. return null;
  17. }

这样我们的消息如果发送到交换机,就会执行[消息发送到交换机成功](签收成功是后面消费者里的消息签收机制,现在不用在意)

 现在我们测试一下,我在交换机名字后面加上一个字符串,现在这个交换机是不存在的。看看会发生什么

rabbitTemplate.convertAndSend(RabbitConstant.DIRECT_EXCHANGE+"sora33","sms",tel);

 发送到交换机失败了,执行了我们失败里的回调。这里我们就可以看出这个confirm机制的作用了。它是用来确保确保我们的消息是否到达了交换机。到达了执行ack,没有到达执行nack。我们可以在发送失败的方法里加入自己的逻辑。比如加入到发送失败的表中,或者尝试重新发送...

消息的发送确认机制讲完之后。接下来我们来看一下交换机到队列要如何保证消息的可靠性。

三.保证交换机到队列的可靠传递

使用ReturnCallback机制来保证。假设我现在有一个路由模式的交换机。绑定了一个队列,叫send_sms。对应的路由键是sms。如果我给这个交换机发送一条消息。路由键指定smssss。肯定是找不到对应的队列。那么这个时候就会触发ReturnCallback。

setMandatory是用来设置如果没有找到队列,是丢弃还是执行returnedMessage里的方法。false丢弃。

要使用ReturnCallback,我们同样需要在设置中打开配置,很简单。只需要在yml里的mq下面跟一条配置就行了。打开return回调机制

publisher-returns:true

 加入下面的回调属性设置。可以和消息确认机制一起使用。2者互不影响,直接写上去就行。

  1. // 队列收到消息确认机制
  2. rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
  3. @Override
  4. public void returnedMessage(Message message, int i, String context, String exchange,
  5. String routeKey) {
  6. log.error("消息[{}]未到达队列[{}],使用的路由键[{}]",message,exchange,routeKey);
  7. }
  8. });
  9. // true -> 消息未到队列中触发MessageReturn false -> 消息未到队列直接丢弃该消息
  10. rabbitTemplate.setMandatory(true);

 现在我给一个不存在的路由key发送。交换机肯定是找不到对应的队列的 我们的交换机目前只绑定了路由为sms的一个队列 

rabbitTemplate.convertAndSend(RabbitConstant.DIRECT_EXCHANGE,"smssss",tel);

 可以看到,虽然消息进入了交换机,但是找不到对应的队列,执行ReturnCallback回调函数

生产者方面的一些机制讲完之后。接下来我们来看消费者中的消息签收机制以及如何重新发送失败的消息。

因为rabbitMQ默认是签收消息的。我们先把签收模式设置为手动签收 顺便配置一下我们的重发配置

  1. # 消费端设置手动签收
  2. listener:
  3. direct:
  4. acknowledge-mode: manual
  5. simple:
  6. acknowledge-mode: manual
  7. retry:
  8. # 开启消息重发机制
  9. enabled: true
  10. # 重试次数3
  11. max-attempts: 3

生产者代码 生产者的逻辑很简单。肯定会抛异常。因为我手动设置了一个被除数异常。进入到catch块中。我做了一个存入redis 的操作,将这个消息的标签值作为键。值设置为1.作为重试次数。存入之后mq会自动进行一个重发。当判断重试次数达到3次。直接拒绝签收。并将该消息存到数据库中的重试表。进行一个人工操作...

  1. int a = 0;
  2. @RabbitListener(queues = {RabbitConstant.SEND_SMS})
  3. public void smsQueue(String tel, Message message, Channel channel) throws IOException {
  4. try {
  5. int c = 1/a;
  6. // 签收
  7. channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
  8. log.info("签收成功[{}]",tel);
  9. } catch (Exception e) {
  10. // 获取redis重试次数
  11. Integer value = (Integer)redisUtil.get(message.getMessageProperties().getDeliveryTag() + "");
  12. if (value == null) {
  13. // 存入redis
  14. redisUtil.set(message.getMessageProperties().getDeliveryTag()+"", 1);
  15. } else if (value.intValue() == 2) { // 如果第三次还是有异常,那么第三次的次数value值还是2 所以加入重试表
  16. // logic // 加入重试表
  17. log.error("消息[{}]消费失败...传递参数[{}]", message, tel);
  18. log.warn("已加入重试表...");
  19. // 签收失败并不重试
  20. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
  21. return;
  22. } else {
  23. redisUtil.set(message.getMessageProperties().getDeliveryTag() + "", ++value);
  24. }
  25. log.info("签收失败[{}]",tel);
  26. throw new RuntimeException("签收异常");
  27. }
  28. }

 当我们给int c = 1/a 改为 c = 1/a++

这个时候第一次会进入catch块。第二次因为a自增。所以不会抛出异常,签收成功

 四.总结

        RabbitMQ在我们工作中是常用的一个中间件,必须要对齐了如指。既然是中间件,那么势必会有消息丢失产生,还要保证消息的幂等性。本文章是一个进阶文章。不懂得小伙伴可以去看看我的前两篇。

 更多知识请移步个人博客:​​​​​​​

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发