定制软件开发RabbitMq生产者和消费者消息确认机制(ack)

定制软件开发消息确认的本质也就是为了解决RabbitMQ定制软件开发消息丢失问题,定制软件开发因为哪怕我们做了RabbitMQ持久化,定制软件开发其实也并不能保证解决定制软件开发我们的消息丢失问题


RabbitMQ定制软件开发的消息确认有两种

  • 定制软件开发第一种是消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。
  • 第二种是消费接收确认。这种是确认消费者是否成功消费了队列中的消息。

1.消息发送确认(生产者)

正常情况下,生产者会通过交换机发送消息至队列中,再由消费者来进行消费,但是其实RabbitMQ在接收到消息后,还需要一段时间消息才能存入磁盘,并且其实也不是每条消息都会存入磁盘,可能仅仅只保存到cache中,这时,如果RabbitMQ正巧发生崩溃,消息则就会丢失,所以为了避免该情况的发生,我们引入了生产者确认机制,rabbitmq对此提供了两种方式:

方法一:Confirm模式

通过设置生产者Channel为comfirm模式,该Channel上发布的所有消息都会被指派一个唯一ID(每次从1开始累加),当消息到达生产者指定的消息队列后,broker会返回一个确认给生产者(包含之前的ID),这样生产者就能知道哪条消息成功发送了。

代码段:

  1. public void sendQueue(String appId, String handleUserId, List<String> deviceIds) {
  2. List<Object> list = new ArrayList<>();
  3. JSONObject jsonObject = new JSONObject();
  4. jsonObject.put(DeviceConstant.COMMAND, DELETE);
  5. jsonObject.put(DeviceConstant.BODY, list );
  6. String topicExchange = RabbitMqConstant.EXCHANGE_TOPIC_DATA;
  7. String routingKey = RabbitMqConstant.ROUTING_KEY_LOCAL_DATA;
  8. //rabbitTemplate.convertAndSend(topicExchange, routingKey, jsonObject.toJSONString());
  9. try {
  10. Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(false);
  11. channel.confirmSelect();
  12. channel.basicPublish(topicExchange, routingKey, null, jsonObject.toJSONString().getBytes());
  13. channel.addConfirmListener(new ConfirmListener() {
  14. //消息失败处理
  15. @Override
  16. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
  17. log.info("sendQueue-ack-confirm-fail==>exchange:{}--routingkey:{}--deliveryTag:{}--multiple:{}--message:{}", topicExchange, routingKey, deliveryTag, multiple, jsonObject);
  18. try {
  19. Thread.sleep(3000l);
  20. } catch (InterruptedException e) {
  21. throw new RuntimeException(e);
  22. }
  23. //重发
  24. channel.basicPublish(topicExchange, routingKey, null, jsonObject.toJSONString().getBytes());
  25. }
  26. //消息成功处理
  27. @Override
  28. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
  29. log.info("sendQueue-ack-confirm-successs==>exchange:{}--routingkey:{}--deliveryTag:{}--multiple:{}", topicExchange, routingKey, deliveryTag, multiple);
  30. }
  31. });
  32. } catch (Exception e) {
  33. log.error("sendQueue-ack-发送消息失败:{}", ExceptionUtils.getStackTrace(e));
  34. }
  35. }

 方法二:手动确认,ConfirmCallback、returnCallback

代码段:

  1. import com.alibaba.fastjson.JSONObject;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.connection.CorrelationData;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.retry.policy.SimpleRetryPolicy;
  8. import org.springframework.retry.support.RetryTemplate;
  9. import org.springframework.stereotype.Component;
  10. import java.util.UUID;
  11. /**
  12. * RabbitMq 生产者ACK
  13. */
  14. @Slf4j
  15. @Component
  16. public class RabbitMqProducerAck implements RabbitTemplate.ConfirmCallback , RabbitTemplate.ReturnCallback{
  17. @Autowired
  18. private RabbitTemplate rabbitTemplatenew;
  19. /**
  20. * @param message
  21. */
  22. public void send(String topicName, String routingKey, String message){
  23. //设置由于网络问题导致的连接Rabbitmq失败的重试策略
  24. RetryTemplate retryTemplate = new RetryTemplate();
  25. retryTemplate.setRetryPolicy(new SimpleRetryPolicy(3));
  26. //发送之前可以先把消息保存到数据库
  27. rabbitTemplatenew.setEncoding("UTF-8");
  28. rabbitTemplatenew.setMandatory(true);
  29. rabbitTemplatenew.setConfirmCallback(this);// 指定 ConfirmCallback
  30. rabbitTemplatenew.setReturnCallback(this);// 指定 ReturnCallback
  31. CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
  32. log.info("rabbitMqProducerAck-confirm-sender==>{}----exchange:{}--routingkey:{}", correlationData.getId(), topicName, routingKey, message);
  33. this.rabbitTemplatenew.convertAndSend(topicName, routingKey, message, correlationData);
  34. try {
  35. Thread.sleep(100);//线程休眠,为了不让方法直接结束,回调函数无法正常回调confirm方法
  36. } catch (InterruptedException e) {
  37. e.printStackTrace();
  38. }finally {
  39. message=null;//强引用设置为null,便于gc回收
  40. }
  41. }
  42. /**
  43. * @param correlationData
  44. * @param ack
  45. * @param cause
  46. */
  47. @Override
  48. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  49. log.info("rabbitMqProducerAck-confirm-successs==>消息回调confirm函数:{},ack:{},cause:{}", JSONObject.toJSONString(correlationData), ack, cause);
  50. }
  51. /**
  52. * @param message
  53. * @param replyCode
  54. * @param replyText
  55. * @param exchange
  56. * @param routingKey
  57. */
  58. @Override
  59. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  60. log.info("rabbitMqProducerAck-confirm-fail==>消息使用的交换器 exchange : {}--消息使用的路由键 routing :{}--消息主体 message : {}-replyCode : {}-replyText: {}", exchange, routingKey, message.getBody(),replyCode,replyText);
  61. try {
  62. Thread.sleep(3000l);
  63. } catch (InterruptedException e) {
  64. throw new RuntimeException(e);
  65. }
  66. //从新发送
  67. this.send(exchange, routingKey, new String(message.getBody()));
  68. }
  69. }

2.消息接收确认(消费者)

消息接收确认机制,分为消息自动确认模式和消息手动确认模式,当消息确认后,我们队列中的消息将会移除

那这两种模式要如何选择呢?

  • 如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便。好处就是可以提高吞吐量,缺点就是会丢失消息
  • 如果消息非常重要,不容丢失,则建议手动ACK,正常情况都是更建议使用手动ACK。虽然可以解决消息不会丢失的问题,但是可能会造成消费者过载

1):rabbitmq消费者默认情况下是自动确认,不再多说
2):手动确认方式:

  1. @RabbitHandler
  2. @RabbitListener(queues = RabbitMqConstant.xxx , concurrency = "1-1")
  3. public void receiveQueueCommonLocal(Channel channel, Message message) {
  4. String messageBody = new String(message.getBody());
  5. //System.out.println("messageBody===>"+messageBody);
  6. try {
  7. //todo 业务逻辑
  8. /*手动确认成功
  9. * 参数:
  10. * deliveryTag:该消息的index
  11. * multiple:是否批量处理.true:将一次性ack所有小于deliveryTag的消息
  12. * **/
  13. channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
  14. } catch (Exception e) {
  15. e.printStackTrace();
  16. log.error("receiveQueueCommonLocal=====>ERROR:{}--josn:{}", ExceptionUtil.getMessage(e), messageBody);
  17. try {
  18. //手动确认回滚 拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列。
  19. channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
  20. } catch (IOException ex) {
  21. throw new RuntimeException(ex);
  22. }
  23. }
  24. }

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发