定制软件RabbitMQ 进阶 -- SpringBoot 集成 RabbitMQ实现生产者与消费者模式

📢📢📢📣📣📣

哈喽!大家好,我是【Bug 终结者,【CSDNJava定制软件领域优质创作者】🏆,定制软件阿里云专家博主🏆,51CTO人气博主🏆,InfoQ写作专家🏆

定制软件一位上进心十足,定制软件拥有极强学习力的【Java领域博主】😜😜😜

🏅【Bug 终结者】定制软件博客的领域是【定制软件面向后端技术】的学习,定制软件未来会持续更新更多的【后端技术】以及【学习心得】。 定制软件偶尔会分享些前端基础知识,定制软件会更新实战项目,定制软件面向企业级开发应用
🏅 如果有对【后端技术】、【前端领域】感兴趣的【小可爱】,欢迎关注【Bug 终结者】💞💞💞


❤️❤️❤️ 定制软件感谢各位大可爱小可爱! ❤️❤️❤️

文章目录

一、什么是 AMQP?

AMQP,即Advanced Message Queuing Protocol,定制软件一个提供统一消息服务定制软件的应用层标准高级队列协议,定制软件是应用层协议的一个开放标准,定制软件为面向消息的中间件设计。定制软件基于此协议的客户端与定制软件消息中间件可传递消息,定制软件并不受客户端/不同产品,定制软件不同的开发语言等条件的限制。定制软件中的实现有等。

AMQP 基于TCP定制软件协议之上再次封装的协议,AMQP定制软件定义了合适的服务器端域模型,定制软件规范服务器的行为(AMQP定制软件的服务器端称broker),

☁️AMQP的主要功能

消息中间件的主要功能就是消息的 路由(routing) 和 缓存(Buffering)

AMQP提供了两个重要的模型,Exchange(交换机) 和 Queue (队列)

Exchange的作用

Exchange接收Producer发送的Message,根据不同的路由算法,将Message发送给Message Queue.

Message Queue的作用

  • Message Queue 在 Message没有被 Consumer消费时,缓存这些Message,具体的缓存策略由实现者决定
  • 当Message Queue 与 Message Consumer之间连接畅通时,Message Queue 则需要将消息转发给 Consumer进行消费

注意,如果队列没有指定交换机,则使用 Default 默认交换机

二、的核心组成

核心概念

  • Server:又称Broker ,接受客户端的连接,实现AMQP实体服务。 安装rabbitmq-server
  • Connection:连接,应用程序与Broker的网络连接 TCP/IP/三次握手和四次挥手
  • Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。
  • Message :消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
  • Virtual Host 虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange
  • Exchange:交换机,接受消息,根据路由键发送消息到绑定的队列。(不具备消息存储的能力)
  • Bindings:Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key.
  • Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。
  • Queue:队列:也成为Message Queue,消息队列,保存消息并将它们转发给消费者。

三、RabbitMQ的运行流程

上图为生产者生产消息与消费者订阅并消费消息的大致流程图

四、RabbitMQ支持的消息模式

具体的模式案例请参考官网:

工作队列和发布订阅/广播模式用的比较多! 路由模式会消耗一定的内存,要加where筛选过滤

五、RabbitMQ使用场景

解耦、削峰、异步

⛅同步异步问题

串行和并行

串行方式: 将订单信息写入数据库成功后,发送注册邮件,再发送注册短信,以上三个任务全部完成后,返回给客户端

并行方式 异步线程池

并发方式:将订单信息写入数据库成功后,发送注册邮件的同时,发送注册短信,以上三个任务全部完成后,返回给客户端,与串行的差别是,并行的方式可以提高处理的时间

存在的问题

  • 耦合度高
  • 需要自己写线程池 维护成本太高
  • 如果消息出现了丢失,需要自己做消息补偿
  • 如果保证可靠性,需要自己去写
  • 如果服务器承载不了,需要自己去写高可用

⚡异步消息队列

使用MQ异步消息队列的好处

  • 完全解耦,用MQ建立桥接

  • 有独立的线程池和运行模型

  • 出现了消息丢失,MQ有持久化功能

  • 如何保证消息的可靠性,死信队列和消息转移的等

  • 如果服务器承载不了,你需要自己去写高可用,HA镜像模型高可用。

按照以上约定,用户的响应时间相当于是订单信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍

因此MQ消息队列适用于

  • 分布式事务的可靠消费和可靠生产
  • 索引、缓存、静态化处理的数据同步
  • 流量监控
  • 日志监控(ELK)
  • 下单、订单分发、抢票

MQ消息队列可达到 高内聚、低耦合

六、SpringBoot 整合RabbitMQ实现消息的生产与消费

RabbitMQ是Spring家族开发的产品,Spring 天然支持RabbitMQ,快速方便引入RabbitMQ!

这里我们介绍 SpringBoot 整合RabbitMQ 实现消息的生产与消费(广播模式/发布订阅模式)

✅创建Maven聚合工程

File —> New —> Project —> Maven —> 直接Next 进入下一步创建普通的Maven工程即可

创建一个默认的Maven聚合工程,将src文件夹删除,该工程就是一个Maven聚合工程

😃引入共有依赖

引入依赖如下:

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <groupId>com.wanshi</groupId>    <artifactId>springboot-rabbitmq</artifactId>    <packaging>pom</packaging>    <version>1.0-SNAPSHOT</version>    <modules>        <module>rabbitmq-order-producer</module>        <module>rabbitmq-order-consumer</module>    </modules>    <parent>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-parent</artifactId>        <version>2.5.5</version>        <relativePath /> <!-- lookup parent from repository -->    </parent>    <dependencyManagement>        <dependencies>            <dependency>                <groupId>org.springframework.boot</groupId>                <artifactId>spring-boot-starter-web</artifactId>            </dependency>            <dependency>                <groupId>org.springframework.boot</groupId>                <artifactId>spring-boot-starter-test</artifactId>                <scope>test</scope>            </dependency>            <dependency>                <groupId>org.springframework.boot</groupId>                <artifactId>spring-boot-starter-amqp</artifactId>            </dependency>            <dependency>                <groupId>org.springframework.boot</groupId>                <artifactId>spring-boot-starter-web</artifactId>            </dependency>        </dependencies>    </dependencyManagement></project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

⏳创建生产者

在项目内,新建一个Moudle,rabbitmq-order-producer 默认Maven工程,下一步即可

引入依赖

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <parent>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-parent</artifactId>        <version>2.5.5</version>        <relativePath /> <!-- lookup parent from repository -->    </parent>    <modelVersion>4.0.0</modelVersion>    <artifactId>rabbitmq-order-producer</artifactId>    <dependencies>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-test</artifactId>            <scope>test</scope>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-amqp</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>        </dependency>        <dependency>            <groupId>junit</groupId>            <artifactId>junit</artifactId>            <scope>test</scope>        </dependency>    </dependencies></project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

⌛创建消费者

在项目内,新建一个Moudle,rabbitmq-order-cousumer 默认Maven工程,下一步即可

引入依赖

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <parent>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-parent</artifactId>        <version>2.5.5</version>        <relativePath /> <!-- lookup parent from repository -->    </parent>    <modelVersion>4.0.0</modelVersion>    <artifactId>rabbitmq-order-producer</artifactId>    <dependencies>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-test</artifactId>            <scope>test</scope>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-amqp</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>        </dependency>    </dependencies></project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

Maven聚合工程创建完成图

Maven依赖图

自行手写MainApplication即可

创建完成!

♨️核心源码

application.yml

# 服务端口server:  port: 8080# 配置rabbitmq服务spring:  rabbitmq:    username: admin    password: admin    virtual-host: /    host: 8.130.28.198    port: 5672
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

生产者

OrderService

package com.wanshi.service;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import java.util.UUID;/** * @author whc * @date 2022/5/23 18:50 */@Servicepublic class OrderService {    @Autowired    private RabbitTemplate rabbitTemplate;    public void makeOrder() {        String orderId = UUID.randomUUID().toString();        System.out.println("订单生成成功:" + orderId);        String exchange_name = "fanout_order_exchange";        String routeingKey = "";        rabbitTemplate.convertAndSend(exchange_name, routeingKey, orderId);    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

消费者

交换机的声明与队列我们放在消费者端,因为消费者是先开启的,如果没有交换机和队列,则会报错!

RabbitMQConfiguration

package com.wanshi.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.FanoutExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * @author whc * @date 2022/5/23 10:18 */@Configurationpublic class RabbitMQConfiguration {    //1.声明注册fanout模式的交换机    @Bean    public FanoutExchange fanoutExchange() {        return new FanoutExchange("fanout_order_exchange", true, false);    }    //2.声明队列,sms.fanout.queue email.fanout.queue msg.fanout.queue    @Bean    public Queue smsQueue() {        return new Queue("sms.fanout.queue", true);    }    @Bean    public Queue emailQueue() {        return new Queue("email.fanout.queue", true);    }    @Bean    public Queue msgQueue() {        return new Queue("msg.fanout.queue", true);    }    //3.完成绑定关系(队列与交换机完成绑定关系)    @Bean    public Binding smsBind() {        return BindingBuilder.bind(smsQueue()).to(fanoutExchange());    }    @Bean    public Binding emailBind() {        return BindingBuilder.bind(emailQueue()).to(fanoutExchange());    }    @Bean    public Binding msgBind() {        return BindingBuilder.bind(msgQueue()).to(fanoutExchange());    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

编写具体业务消费类

FanoutEmailConsumer

package com.wanshi.service;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;/** * @author whc * @date 2022/5/23 18:53 */@RabbitListener(queues = "email.fanout.queue")@Componentpublic class FanoutEmailConsumer {    @RabbitHandler    public void messageService(String message) {        System.out.println("fanout email ==>" + message);    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

FanoutMsgConsumer

package com.wanshi.service;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;/** * @author whc * @date 2022/5/23 18:55 */@RabbitListener(queues = "msg.fanout.queue")@Componentpublic class FanoutMsgConsumer {    @RabbitHandler    public void messageService(String message) {        System.out.println("fanout msg ==>" + message);    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

FanoutSmsConsumer

package com.wanshi.service;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;/** * @author whc * @date 2022/5/23 18:54 */@RabbitListener(queues = "sms.fanout.queue")@Componentpublic class FanoutSmsConsumer {    @RabbitHandler    public void messageService(String message) {        System.out.println("fanout sms ==> " + message);    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

编写完成!

七、测试消息的生产与消费

启动客户端监听查看消息队列的绑定情况

启动客户端

查看RabbitMQ的交换机与队列绑定情况

交换机声明

队列声明

绑定关系

下面生产者投递消息

生产者端建立测试类

package com.wanshi;import com.wanshi.service.OrderService;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;/** * @author whc * @date 2022/5/23 18:55 */@RunWith(SpringRunner.class)@SpringBootTestpublic class MainApplicationTest {    @Autowired    private OrderService orderService;    @Test    public void test1() {        orderService.makeOrder();    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

启动,投递成功

查看消息者是否成功消费消息

成功完成 SpringBoot 与RabbitMQ的整合,并通过发布订阅/广播模式实现

⛵小结

以上就是【Bug 终结者】对 RabbitMQ 进阶 – SpringBoot 集成 RabbitMQ实现生产者与消费者模式简单的概述, RabbitMQ是一种消息队列中间件,引入RabbitMQ后,可大大提升程序的性能,从而拥有更高的吞吐量,达到高内聚,低耦合

如果这篇【文章】有帮助到你,希望可以给【Bug 终结者】点个赞👍,创作不易,如果有对【后端技术】、【前端领域】感兴趣的小可爱,也欢迎关注❤️❤️❤️ 【Bug 终结者】❤️❤️❤️,我将会给你带来巨大的【收获与惊喜】💝💝💝!

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发