app开发定制公司RabbitMQ 死信队列详解

一、app开发定制公司死信的概念

死信,app开发定制公司顾名思义就是无法被消费的消息。一般来说,Producer app开发定制公司将消息投递到 Broker app开发定制公司或者直接到 Queue 里了,Consumer 从 Queue app开发定制公司取出消息进行消费,app开发定制公司但某些时候由于特定的原因导致 Queue app开发定制公司中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 的死信队列机制,档消息消费发生异常时,将消息投入到死信队列中。还有比如说:用户在商城下单成功并点击支付后再指定时间未支付时自动失效。

二、死信的来源

  • 消息 TTL 过期
  • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false(不再重新入队)

三、死信实战

3.1 代码架构图

3.2 消息 TTL 过期

生产者

public class DeadLetterProducer {    private static String EXCHANGE_NAME = "normal_exchange";    public static void main(String[] args) throws IOException, TimeoutException {        Channel channel = RabbitMqUtil.getChannel();        // 声明一个交换机        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);        // 设置消息 TTL 过期时间        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();        String message = "info";        channel.basicPublish(EXCHANGE_NAME, "zhangsan", properties, message.getBytes());        System.out.println("消息发送完成:" + message);    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

消费者1

public class DeadLetterConsumer1 {    private static String NORMAL_EXCHANGE_NAME = "normal_exchange";    private static String NORMAL_QUEUE_NAME = "normal-queue";    private static String DEAD_EXCHANGE_NAME = "dead_exchange";    private static String DEAD_QUEUE_NAME = "dead-queue";    public static void main(String[] args) throws IOException, TimeoutException {        Channel channel = RabbitMqUtil.getChannel();        // 声明一个死信队列        channel.queueDeclare(DEAD_QUEUE_NAME, false, false, false, null);        // 声明一个死信交换机        channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);        // 死信队列与死信交换机绑定        channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, "lisi");        // 正常队列与死信交换机的绑定关系        Map<String, Object> deadLetterParams = new HashMap<>(2);        deadLetterParams.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);        deadLetterParams.put("x-dead-letter-routing-key","lisi");        // 声明一个正常队列        channel.queueDeclare(NORMAL_QUEUE_NAME, false, false, false, deadLetterParams);        // 声明一个正常交换机        channel.exchangeDeclare(NORMAL_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);        // 把队列和交换机进行绑定        channel.queueBind(NORMAL_QUEUE_NAME, NORMAL_EXCHANGE_NAME, "zhangsan");        System.out.println("C1消费者启动等待消费消息:");        channel.basicConsume(NORMAL_QUEUE_NAME, true, (consumerTag, delivery) -> {            String receivedMessage = new String(delivery.getBody());            System.out.println("消费者接收到消息:" + receivedMessage);        },(consumerTag) -> {            System.out.println(consumerTag + "消费者取消消费消息");        });    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

消费者2

public class DeadLetterConsumer2 {    private static String NORMAL_EXCHANGE_NAME = "normal_exchange";    private static String DEAD_QUEUE_NAME = "dead-queue";    public static void main(String[] args) throws IOException, TimeoutException {        Channel channel = RabbitMqUtil.getChannel();        channel.exchangeDeclare(NORMAL_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);        System.out.println("C2消费者启动等待消费消息:");        channel.basicConsume(DEAD_QUEUE_NAME, true, (consumerTag, delivery) -> {            String receivedMessage = new String(delivery.getBody());            System.out.println("消费者接收到死信:" + receivedMessage);        },(consumerTag) -> {            System.out.println(consumerTag + "消费者取消消费消息");        });    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

先启动消费者1,将正常交换机、死信交换机、正常队列、死信队列创建出来,否则会报错。接着启动消费者2,然后在启动生产者,观察控制台。
消费者1启动后进入RabbitMQ系统后台,可以看到队列 normal-queue 的 features 一列多了两个信息。其中 DLX 表示死信交换机,DLK 表示死信交换机的路由键(RoutingKey)。

此时由于消费者1可以正常消费消息,所以在消费者2中,死信队列是接收不到消息的。控制台情况如下:



将消费者1和消费者2的服务停止,重新运行生产者,10s 后消息会被进入到死信队列


再来看下后台系统:
生产者未发送消息

生产者发送了 1 条消息,此时正常队列中有 1 条未消费消息

时间过去 10 秒,正常队列里面的消息由于没有被消费,消息进入死信队列。

3.3 队列达到最大长度

生产者

public class DeadLetterLengthProducer {    private static String EXCHANGE_NAME = "normal_exchange";    public static void main(String[] args) throws IOException, TimeoutException {        Channel channel = RabbitMqUtil.getChannel();        // 声明一个交换机        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);        // 设置消息 TTL 过期时间        for (int i = 0; i < 10; i++) {            String message = "info" + i;            channel.basicPublish(EXCHANGE_NAME, "zhangsan", null, message.getBytes());        }        System.out.println("消息发送完成");    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

消费者1

public class DeadLetterLengthConsumer1 {    private static String NORMAL_EXCHANGE_NAME = "normal_exchange";    private static String NORMAL_QUEUE_NAME = "normal-queue";    private static String DEAD_EXCHANGE_NAME = "dead_exchange";    private static String DEAD_QUEUE_NAME = "dead-queue";    public static void main(String[] args) throws IOException, TimeoutException {        Channel channel = RabbitMqUtil.getChannel();        // 声明一个死信队列        channel.queueDeclare(DEAD_QUEUE_NAME, false, false, false, null);        // 声明一个死信交换机        channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);        // 死信队列与死信交换机绑定        channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, "lisi");        // 正常队列与死信交换机的绑定关系        Map<String, Object> deadLetterParams = new HashMap<>(2);        deadLetterParams.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);        deadLetterParams.put("x-dead-letter-routing-key","lisi");        deadLetterParams.put("x-max-length", 6);        // 声明一个正常队列        channel.queueDeclare(NORMAL_QUEUE_NAME, false, false, false, deadLetterParams);        // 声明一个正常交换机        channel.exchangeDeclare(NORMAL_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);        // 把队列和交换机进行绑定        channel.queueBind(NORMAL_QUEUE_NAME, NORMAL_EXCHANGE_NAME, "zhangsan");        System.out.println("C1消费者启动等待消费消息:");        channel.basicConsume(NORMAL_QUEUE_NAME, true, (consumerTag, delivery) -> {            String receivedMessage = new String(delivery.getBody());            System.out.println("消费者接收到消息:" + receivedMessage);        },(consumerTag) -> {            System.out.println(consumerTag + "消费者取消消费消息");        });    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

消费者2

public class DeadLetterLengthConsumer2 {    private static String NORMAL_EXCHANGE_NAME = "normal_exchange";    private static String DEAD_QUEUE_NAME = "dead-queue";    public static void main(String[] args) throws IOException, TimeoutException {        Channel channel = RabbitMqUtil.getChannel();        channel.exchangeDeclare(NORMAL_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);        System.out.println("C2消费者启动等待消费消息:");        channel.basicConsume(DEAD_QUEUE_NAME, true, (consumerTag, delivery) -> {            String receivedMessage = new String(delivery.getBody());            System.out.println("消费者接收到死信:" + receivedMessage);        },(consumerTag) -> {            System.out.println(consumerTag + "消费者取消消费消息");        });    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

由于消费者1中修改了队列参数,所以启动前需要先将原先的队列删除,然后再启动消费者1,创建相关的队列及交换机。接着关闭消费者 1,启动生产者。打开后台系统:

普通队列中有 6 条消息未消费,超出队列长度的 4 条消息进入到了死信队列。

然后启动消费者1 和消费者2

3.4 消息被拒

生产者和消费者2 的代码不需要修改,修改消费者1 的代码,修改后的代码如下:

消费者2

public class DeadLetterRejectConsumer1 {    private static String NORMAL_EXCHANGE_NAME = "normal_exchange";    private static String NORMAL_QUEUE_NAME = "normal-queue";    private static String DEAD_EXCHANGE_NAME = "dead_exchange";    private static String DEAD_QUEUE_NAME = "dead-queue";    public static void main(String[] args) throws IOException, TimeoutException {        Channel channel = RabbitMqUtil.getChannel();        // 声明一个死信队列        channel.queueDeclare(DEAD_QUEUE_NAME, false, false, false, null);        // 声明一个死信交换机        channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);        // 死信队列与死信交换机绑定        channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, "lisi");        // 正常队列与死信交换机的绑定关系        Map<String, Object> deadLetterParams = new HashMap<>(2);        deadLetterParams.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);        deadLetterParams.put("x-dead-letter-routing-key","lisi");        // 声明一个正常队列        channel.queueDeclare(NORMAL_QUEUE_NAME, false, false, false, deadLetterParams);        // 声明一个正常交换机        channel.exchangeDeclare(NORMAL_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);        // 把队列和交换机进行绑定        channel.queueBind(NORMAL_QUEUE_NAME, NORMAL_EXCHANGE_NAME, "zhangsan");        System.out.println("C1消费者启动等待消费消息:");        channel.basicConsume(NORMAL_QUEUE_NAME, false, (consumerTag, delivery) -> {            String receivedMessage = new String(delivery.getBody());            if ("info5".equals(receivedMessage)) {                System.out.println("C1接收到消息:" + receivedMessage+"并且拒绝签收了");                // 禁止重新入队                channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);            } else {                System.out.println("消费者接收到消息:" + receivedMessage);                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);            }        },(consumerTag) -> {            System.out.println(consumerTag + "消费者取消消费消息");        });    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

将原先的队列删除,重新启动消费者2,接着启动生产者


最后启动消费者2

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发