网站建设定制开发rabbitmq消息异常处理

提示:网站建设定制开发文章写完后,网站建设定制开发目录可以自动生成,网站建设定制开发如何生成可参考右边的帮助文档

文章目录


前言

在使用时,网站建设定制开发会因为各种原因(网络波动,系统宕机,程序异常等)导致消息发送失败。rabbitmq也提供了相应的处理机制。


提示:以下是本篇文章正文内容,下面案例可供参考

一、rabbitmq消息发送失败处理机制

生产法发送失败
配置回调器。
yml配置开启确认和返回机制
confirm:发送给exchange时的回调,不管是否成功发送给队列。
return:消息没有发送给exchange时的回调。

#成功发送到exchange时的回调spring.rabbitmq.publisher-confirm-type=correlated#exchange未发送到队列时回调spring.rabbitmq.publisher-returns=true
  • 1
  • 2
  • 3
  • 4

回调函数配置方式

@Component@Slf4jpublic class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {    /**     * 交换机不管是否收到消息的一个回调方法     *     * @param correlationData 消息相关数据     * @param ack             交换机是否收到消息     * @param cause           未收到消息的原因     */    @Override    public void confirm(CorrelationData correlationData, boolean ack, String cause) {        String id = correlationData != null ? correlationData.getId() : "";        if (ack) {            log.info("交换机已经收到 id 为:{}的消息", id);        } else {        //重发处理,可以入库,死信队列处理....            log.info("交换机还未收到 id 为:{}消息,原因:{}", id, cause);        }    }    //当消息无法路由的时候触发回调方法    @Override    public void returnedMessage(ReturnedMessage returned) {         //重发处理,可以入库,死信队列处理....        log.error("消息:{},被交换机 {} 退回,原因:{},路由key:{},code:{}",                new String(returned.getMessage().getBody()), returned.getExchange(),                returned.getReplyText(), returned.getRoutingKey(),                returned.getReplyCode());    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

rabbitmqTemplate在启动时注入:

@Autowired    private RabbitTemplate rabbitTemplate;    @Autowired    private RabbitTemplate.ConfirmCallback confirmCallback;    @Autowired    private RabbitTemplate.ReturnsCallback returnsCallback;    //依赖注入 rabbitTemplate 之后再设置它的回调对象    @PostConstruct    public void init() {        rabbitTemplate.setConfirmCallback(confirmCallback);        rabbitTemplate.setReturnsCallback(returnsCallback);    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

消费者消费失败后处理
通过自动ack+retry配置+私信队列方式实现

spring:  rabbitmq:    host: 127.0.0.1    port: 5672    username: guest    password: guest    listener:      simple:#        acknowledge-mode: manual   # 配置该消费者的ack方式为手动        acknowledge-mode: auto   # 配置该消费者的ack方式为自动        default-requeue-rejected: false        #设置消费失败后重发        retry:          #重发次数          max-attempts: 3          #开启重发          enabled: true          # 重试间隔(ms)          initial-interval: 5000
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = QUEUE_NAME1, durable = “true”, autoDelete = “false”,
arguments = {@Argument(name = “x-dead-letter-exchange”, value = “dead-exchange”),
@Argument(name = “x-dead-letter-routing-key”, value = “dead-routing-key”),
@Argument(name = “x-message-ttl”, value = “1000”,type = “java.lang.Long”)
}),
exchange = @Exchange(value = “first_exchange”, type = ExchangeTypes.DIRECT),
key = “queue_one_key1”))
public void handleMessage1(Message message, Channel channel) throws IOException {
log.info(“OrderConsumer handleMessage {} , error:”, message);
//模拟消费异常,自动进入私信队列
throw new RuntimeException(“抛出异常,模拟消费失败,触发spring-retry”);
}

/** * 死信队列消费者 * * @param data * @param channel * @throws Exception */@RabbitListener(queues = "dead-queue")public void consumeDL(String data, Channel channel) throws Exception {    //处理消费失败的消息    log.info(">>>> 死信队列消费 tag = {},消息内容 : {}", data);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

// channel.basicNack(tag, false, false);
}

通过手动ack+私信队列实现(不要配置retry!!!)

spring.rabbitmq.publisher-returns=truespring.rabbitmq.listener.simple.acknowledge-mode=manual
  • 1
  • 2
    @RabbitListener(queues = CONFIRM_QUEUE_NAME)    public void receiveMsg(String data, Message message, Channel channel) throws IOException {        String msg = new String(message.getBody());        log.info("接受到队列 confirm.queue 消息:{}", msg);//        throw new RuntimeException("抛出异常,模拟消费失败,触发spring-retry");        //模拟消费失败,重发n次后仍然失败。调用basicNack 抛给私信队列处理        channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);    } //接收消息    @RabbitListener(queues = "QD")    public void receiveD(Message message, Channel channel) throws IOException {        String msg = new String(message.getBody());        log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

总结

rabbitmq消息失败处理需要谨慎对待,因为容易产生资源消耗殆尽的问题!!!

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发