一、初识MQ
软件系统定制开发是一个开源的消息代理软件系统定制开发和队列服务器,软件系统定制开发用来通过普通协议在完软件系统定制开发全不同的应用之间共享数据,RabbitMQ是使用Erlang软件系统定制开发语言来编写的,软件系统定制开发并且是基于AMQP协议的。
AMQP:Advanced Message Queuing Protocol,高级协议。软件系统定制开发是具有现代特征的二进制协议,软件系统定制开发是一个提供统一消息服软件系统定制开发务的应用层标准高级消软件系统定制开发息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
1、同步调用的问题
2、方案
优势一:服务解耦
优势二:性能提升,提高
优势三:服务没有强依赖,不担心级联失败问题
优势四:流量消峰
异步通讯的缺点 :
- 依赖于Broker的可靠性,安全性,吞吐能力
- 架构复杂了,业务没有明显的流程线,不好追踪管理
3、各种MQ的对比
二、RabbitMQ入门
1、基本概念
核心概念
1.1、Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key (路由键)、priority (相对于其他消息的优先权)、delivery-mode (指出该消息可能需要持久性存储)等。
1.2、Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。
1.3、Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的。
Exchange有4种类型: direct(默认), fanout, topic,和headers,不同类型的Exchange转发消息的策略有所区别
1.4、Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
1.5、Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基 于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
Exchange和Queue的绑定可以是多对多的关系。
1.6、Connection
网络连接,比如一个TCP连接。
1.7、Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。
1.8、Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
1.9、Virtual Host
虚拟主机,表示-一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是AMQP概念的基础,必须在连接时指定,
RabbitMQ默认的vhost是/。
1.10、Broker
表示消息队列服务器实体
2、安装
2、常见消息模型
2.1、基本消息类型
三、SpringAMQP
1、什么是SpringAMQP
2、简单示例
2.1、引入AMQP依赖
<!--AMQP依赖,包含RabbitMQ--><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>
- 1
- 2
- 3
- 4
- 5
2.2、配置文件
spring: rabbitmq: addresses: 192.168.100.120:5672,192.168.100.121:5672,192.168.100.122:5672 username: admin password: admin #开启消息确认模式,新版本已经弃用 #publisher-confirms: true #开启消息送达提示 publisher-returns: true # springboot.rabbitmq.publisher-confirm 新版本已被弃用,现在使用 spring.rabbitmq.publisher-confirm-type = correlated 实现相同效果 publisher-confirm-type: correlated virtual-host: / listener: type: simple simple: acknowledge-mode: auto #确认模式 prefetch: 1 #限制每次发送一条数据。 concurrency: 3 #同一个队列启动几个消费者 max-concurrency: 3 #启动消费者最大数量 #重试策略相关配置 retry: # 开启消费者(程序出现异常)重试机制,默认开启并一直重试 enabled: true # 最大重试次数 max-attempts: 5 # 重试间隔时间(毫秒) initial-interval: 3000
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
spring.rabbitmq.publisher-confirm-type新版发布确认属性有三种确认类型
/** * The type of publisher confirms to use. */public enum ConfirmType { /** * Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()} * within scoped operations. */ SIMPLE, /** * Use with {@code CorrelationData} to correlate confirmations with sent * messsages. */ CORRELATED, /** * Publisher confirms are disabled (default). */ NONE}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
NONE值是禁用发布确认模式,是默认值
CORRELATED值是发布消息成功到交换器后会触发回调方法,如1示例
SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;
————————————————
版权声明:本文为CSDN博主「OkidoGreen」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/z69183787/article/details/109371628
2.3、发送消息
在publisher服务中新建一个测试类,编写测试方法
2.4、消费消息
3、WorkQueue工作队列
3.1、模拟WorkQueue实现一个队列绑定多个消费者
3.2、消息发送
3.3、消息接受
3.4、消费预期机制prefetch
配置一次只能取一条,处理完才能取下一条
spring: rabbitmq: host: 192.168.100.120 port: 5672 username: admin password: admin #开启消息确认模式,新版本已经弃用 #publisher-confirms: true #开启消息送达提示 publisher-returns: true # springboot.rabbitmq.publisher-confirm 新版本已被弃用,现在使用 spring.rabbitmq.publisher-confirm-type = correlated 实现相同效果 publisher-confirm-type: correlated virtual-host: / listener: simple: prefetch: 1 #限制每次发送一条数据。
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
4、发布和订阅模型
4.1、概念
4.2、Fanout广播类型
4.2.1、示例
4.2.2、声明绑定
4.2.3、消费者绑定
4.2.4、生产者发送消息
4.3、DriectExchage路由类型
通过routeKey可以实现Fanout广播类型
4.3.1、示例
4.3.2、申明Exchange、Queue
@Configurationpublic class RabbitDirectConfig { @Bean public Queue directQueue(){ // 参数介绍 // 1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数 return new Queue("directQueue-One",true,false,false,null); } @Bean public Queue directQueue2(){ // 参数介绍 // 1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数 return new Queue("directQueue-Two",true,false,false,null); } @Bean public DirectExchange directExchange(){ // 参数介绍 // 1.交换器名 2.是否持久化 3.自动删除 4.其他参数 return new DirectExchange("MqSendService-One",true,false,null); } @Bean public Binding bingExchange(){ // 绑定队列 return BindingBuilder.bind(directQueue2()) // 队列绑定到哪个交换器 .to(directExchange()) // 绑定路由key,必须指定 .with("One"); } @Bean public Binding bingExchange2(){ // 绑定队列 return BindingBuilder.bind(directQueue2()) // 队列绑定到哪个交换器 .to(directExchange()) // 绑定路由key,必须指定 .with("Two"); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
4.3.3、在consumer消费者服务监听
4.3.4、总结
4.4、TopicExchange话题广播类型
4.4.1、示例
4.4.2、申明
@Configurationpublic class RabbitTopicConfig { @Bean public Queue queue(){ // 参数介绍 // 1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数 return new Queue("simple.queue",true,false,false,null); } @Bean public TopicExchange topicExchange(){ // 参数介绍 // 1.交换器名 2.是否持久化 3.自动删除 4.其他参数 return new TopicExchange("amq.topic",true,false,null); } @Bean public Binding bingExchange(){ // 绑定队列 return BindingBuilder.bind(queue()) // 队列绑定到哪个交换器 .to(topicExchange()) // 绑定路由key,必须指定 .with("simple.#"); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
4.4.3、生产者发送
5、消息转换器
spring的消息对象处理是有org.springframework.amqp.support.converter.MessageConverter来处理的,而默认实现是SpringMessageConverter,基于JDK的ObjectOutputStream完成序列化。
如果需要修改只需要定义一个MessageConverter类型的bean即可,推荐用JSON方式序列化,步骤如下
5.1、引入jackson的依赖
<dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.9.10</version></dependency>
- 1
- 2
- 3
- 4
- 5
5.2、申明Bean
@Beanpublic MessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter();}
- 1
- 2
- 3
- 4