定制软件开发消息确认的本质也就是为了解决RabbitMQ定制软件开发消息丢失问题,定制软件开发因为哪怕我们做了RabbitMQ持久化,定制软件开发其实也并不能保证解决定制软件开发我们的消息丢失问题
RabbitMQ定制软件开发的消息确认有两种
- 定制软件开发第一种是消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。
- 第二种是消费接收确认。这种是确认消费者是否成功消费了队列中的消息。
1.消息发送确认(生产者)
正常情况下,生产者会通过交换机发送消息至队列中,再由消费者来进行消费,但是其实RabbitMQ在接收到消息后,还需要一段时间消息才能存入磁盘,并且其实也不是每条消息都会存入磁盘,可能仅仅只保存到cache中,这时,如果RabbitMQ正巧发生崩溃,消息则就会丢失,所以为了避免该情况的发生,我们引入了生产者确认机制,rabbitmq对此提供了两种方式:
方法一:Confirm模式
通过设置生产者Channel为comfirm模式,该Channel上发布的所有消息都会被指派一个唯一ID(每次从1开始累加),当消息到达生产者指定的消息队列后,broker会返回一个确认给生产者(包含之前的ID),这样生产者就能知道哪条消息成功发送了。
代码段:
- public void sendQueue(String appId, String handleUserId, List<String> deviceIds) {
- List<Object> list = new ArrayList<>();
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(DeviceConstant.COMMAND, DELETE);
- jsonObject.put(DeviceConstant.BODY, list );
- String topicExchange = RabbitMqConstant.EXCHANGE_TOPIC_DATA;
- String routingKey = RabbitMqConstant.ROUTING_KEY_LOCAL_DATA;
- //rabbitTemplate.convertAndSend(topicExchange, routingKey, jsonObject.toJSONString());
- try {
- Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(false);
- channel.confirmSelect();
- channel.basicPublish(topicExchange, routingKey, null, jsonObject.toJSONString().getBytes());
- channel.addConfirmListener(new ConfirmListener() {
- //消息失败处理
- @Override
- public void handleNack(long deliveryTag, boolean multiple) throws IOException {
- log.info("sendQueue-ack-confirm-fail==>exchange:{}--routingkey:{}--deliveryTag:{}--multiple:{}--message:{}", topicExchange, routingKey, deliveryTag, multiple, jsonObject);
- try {
- Thread.sleep(3000l);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- //重发
- channel.basicPublish(topicExchange, routingKey, null, jsonObject.toJSONString().getBytes());
- }
- //消息成功处理
- @Override
- public void handleAck(long deliveryTag, boolean multiple) throws IOException {
- log.info("sendQueue-ack-confirm-successs==>exchange:{}--routingkey:{}--deliveryTag:{}--multiple:{}", topicExchange, routingKey, deliveryTag, multiple);
- }
- });
- } catch (Exception e) {
- log.error("sendQueue-ack-发送消息失败:{}", ExceptionUtils.getStackTrace(e));
- }
- }
方法二:手动确认,ConfirmCallback、returnCallback
代码段:
-
- import com.alibaba.fastjson.JSONObject;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.retry.policy.SimpleRetryPolicy;
- import org.springframework.retry.support.RetryTemplate;
- import org.springframework.stereotype.Component;
- import java.util.UUID;
-
- /**
- * RabbitMq 生产者ACK
- */
- @Slf4j
- @Component
- public class RabbitMqProducerAck implements RabbitTemplate.ConfirmCallback , RabbitTemplate.ReturnCallback{
-
- @Autowired
- private RabbitTemplate rabbitTemplatenew;
-
-
- /**
- * @param message
- */
- public void send(String topicName, String routingKey, String message){
- //设置由于网络问题导致的连接Rabbitmq失败的重试策略
- RetryTemplate retryTemplate = new RetryTemplate();
- retryTemplate.setRetryPolicy(new SimpleRetryPolicy(3));
- //发送之前可以先把消息保存到数据库
- rabbitTemplatenew.setEncoding("UTF-8");
- rabbitTemplatenew.setMandatory(true);
- rabbitTemplatenew.setConfirmCallback(this);// 指定 ConfirmCallback
- rabbitTemplatenew.setReturnCallback(this);// 指定 ReturnCallback
- CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
- log.info("rabbitMqProducerAck-confirm-sender==>{}----exchange:{}--routingkey:{}", correlationData.getId(), topicName, routingKey, message);
- this.rabbitTemplatenew.convertAndSend(topicName, routingKey, message, correlationData);
- try {
- Thread.sleep(100);//线程休眠,为了不让方法直接结束,回调函数无法正常回调confirm方法
- } catch (InterruptedException e) {
- e.printStackTrace();
- }finally {
- message=null;//强引用设置为null,便于gc回收
- }
- }
-
- /**
- * @param correlationData
- * @param ack
- * @param cause
- */
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- log.info("rabbitMqProducerAck-confirm-successs==>消息回调confirm函数:{},ack:{},cause:{}", JSONObject.toJSONString(correlationData), ack, cause);
- }
-
- /**
- * @param message
- * @param replyCode
- * @param replyText
- * @param exchange
- * @param routingKey
- */
- @Override
- public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
- log.info("rabbitMqProducerAck-confirm-fail==>消息使用的交换器 exchange : {}--消息使用的路由键 routing :{}--消息主体 message : {}-replyCode : {}-replyText: {}", exchange, routingKey, message.getBody(),replyCode,replyText);
- try {
- Thread.sleep(3000l);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- //从新发送
- this.send(exchange, routingKey, new String(message.getBody()));
- }
- }
2.消息接收确认(消费者)
消息接收确认机制,分为消息自动确认模式和消息手动确认模式,当消息确认后,我们队列中的消息将会移除
那这两种模式要如何选择呢?
- 如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便。好处就是可以提高吞吐量,缺点就是会丢失消息
- 如果消息非常重要,不容丢失,则建议手动ACK,正常情况都是更建议使用手动ACK。虽然可以解决消息不会丢失的问题,但是可能会造成消费者过载
1):rabbitmq消费者默认情况下是自动确认,不再多说
2):手动确认方式:
- @RabbitHandler
- @RabbitListener(queues = RabbitMqConstant.xxx , concurrency = "1-1")
- public void receiveQueueCommonLocal(Channel channel, Message message) {
- String messageBody = new String(message.getBody());
- //System.out.println("messageBody===>"+messageBody);
- try {
- //todo 业务逻辑
- /*手动确认成功
- * 参数:
- * deliveryTag:该消息的index
- * multiple:是否批量处理.true:将一次性ack所有小于deliveryTag的消息
- * **/
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
- } catch (Exception e) {
- e.printStackTrace();
- log.error("receiveQueueCommonLocal=====>ERROR:{}--josn:{}", ExceptionUtil.getMessage(e), messageBody);
- try {
- //手动确认回滚 拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列。
- channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
- } catch (IOException ex) {
- throw new RuntimeException(ex);
- }
- }
-
- }