定制化开发觉得有帮助请点赞关注收藏~~~
AMQP(定制化开发高级消定制化开发息队列协议)定制化开发是一个提供统一消息服定制化开发务的应用层标准高级消息队列协议。是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可 传递消息,并不受客户端/中间件的不同产品,不同开发语言等条件的限制。
下面实现主要用RabbitMQ讲解AMQP实例,因此需要事先安装RabbitMQ和erlang语言
erlang下载地址 https://www.erlang.org/downloads
RabbitMQ下载地址 https://www.rabbitmq.com/download.html
使用RabbitMQ实现发布/订阅异步消息模式
1:创建发布者应用ch8_2Sender
2:在pom.xml文件中添加依赖
- <?xml version="1.0" encoding="UTF-8"?>
-
- -<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0">
-
- <modelVersion>4.0.0</modelVersion>
-
-
- -<parent>
-
- <groupId>org.springframework.boot</groupId>
-
- <artifactId>spring-boot-starter-parent</artifactId>
-
- <version>2.1.8.RELEASE</version>
-
- <relativePath/>
-
- <!-- lookup parent from repository -->
-
-
- </parent>
-
- <groupId>com.ch</groupId>
-
- <artifactId>ch8_2Sender</artifactId>
-
- <version>0.0.1-SNAPSHOT</version>
-
- <name>ch8_2Sender</name>
-
- <description>Demo project for Spring Boot</description>
-
-
- +<properties>
-
-
- -<dependencies>
-
-
- -<dependency>
-
- <groupId>org.springframework.boot</groupId>
-
- <artifactId>spring-boot-starter-amqp</artifactId>
-
- </dependency>
-
-
- +<dependency>
-
-
- -<dependency>
-
- <groupId>org.springframework.boot</groupId>
-
- <artifactId>spring-boot-starter-test</artifactId>
-
- <scope>test</scope>
-
- </dependency>
-
-
- -<dependency>
-
- <groupId>org.springframework.amqp</groupId>
-
- <artifactId>spring-rabbit-test</artifactId>
-
- <scope>test</scope>
-
- </dependency>
-
- </dependencies>
-
-
- -<build>
-
-
- -<plugins>
-
-
- -<plugin>
-
- <groupId>org.springframework.boot</groupId>
-
- <artifactId>spring-boot-maven-plugin</artifactId>
-
- </plugin>
-
- </plugins>
-
- </build>
-
- </project>
3:创建Weather实体类
- package com.ch.ch8_2Sender.entity;
- import java.io.Serializable;
- public class Weather implements Serializable{
- private static final long serialVersionUID = -8221467966772683998L;
- private String id;
- private String city;
- private String weatherDetail;
- public String getCity() {
- return city;
- }
- public void setCity(String city) {
- this.city = city;
- }
- public String getWeatherDetail() {
- return weatherDetail;
- }
- public void setWeatherDetail(String weatherDetail) {
- this.weatherDetail = weatherDetail;
- }
- public String getId() {
- return id;
- }
- public void setId(String id) {
- this.id = id;
- }
- @Override
- public String toString() {
- return "Weather [id=" + id + ", city=" + city + ", weatherDetail=" + weatherDetail + "]";
- }
- }
4:重写Ch82SenderApplication主类
- package com.ch.ch8_2Sender;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessageBuilder;
- import org.springframework.amqp.core.MessageDeliveryMode;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.CommandLineRunner;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
-
- import com.ch.ch8_2Sender.entity.Weather;
- import com.fasterxml.jackson.databind.ObjectMapper;
- @SpringBootApplication
- public class Ch82SenderApplication implements CommandLineRunner{
- @Autowired
- private ObjectMapper objectMapper;
- @Autowired
- RabbitTemplate rabbitTemplate;
- public static void main(String[] args) {
- SpringApplication.run(Ch82SenderApplication.class, args);
- }
- /**
- * 定义发布者
- */
- @Override
- public void run(String... args) throws Exception {
- //定义消息对象
- Weather weather = new Weather();
- weather.setId("010");
- weather.setCity("北京");
- weather.setWeatherDetail("今天晴到多云,南风5-6级,温度19-26°C");
- //指定Json转换器,Jackson2JsonMessageConverter默认将消息转换成byte[]类型的消息
- rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
- //objectMapper将weather对象转换为JSON字节数组
- Message msg=MessageBuilder.withBody(objectMapper.writeValueAsBytes(weather))
- .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
- .build();
- // 消息唯一ID
- CorrelationData correlationData = new CorrelationData(weather.getId());
- //使用已封装好的convertAndSend(String exchange , String routingKey , Object message, CorrelationData correlationData)
- //将特定的路由key发送消息到指定的交换机
- rabbitTemplate.convertAndSend(
- "weather-exchange", //分发消息的交换机名称
- "weather.message", //用来匹配消息的路由Key
- msg, //消息体
- correlationData);
- }
- }
5:创建订阅者应用ch8_2Receiver-1
- package com.ch.ch8_2Receiver1;
- import org.springframework.amqp.rabbit.annotation.Exchange;
- import org.springframework.amqp.rabbit.annotation.Queue;
- import org.springframework.amqp.rabbit.annotation.QueueBinding;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.messaging.handler.annotation.Payload;
- import org.springframework.stereotype.Component;
- import com.fasterxml.jackson.databind.ObjectMapper;
- /**
- * 定义订阅者Receiver1
- */
- @Component
- public class Receiver1 {
- @Autowired
- private ObjectMapper objectMapper;
- @RabbitListener(
- bindings =
- @QueueBinding(
- //队列名weather-queue1保证和别的订阅者不一样
- value = @Queue(value = "weather-queue1",durable = "true"),
- //weather-exchange与发布者的交换机名相同
- exchange = @Exchange(value = "weather-exchange",durable = "true",type = "topic"),
- //weather.message与发布者的消息的路由Key相同
- key = "weather.message"
- )
- )
- @RabbitHandler
- public void receiveWeather(@Payload byte[] weatherMessage)throws Exception{
- System.out.println("-----------订阅者Receiver1接收到消息--------");
- //将JSON字节数组转换为Weather对象
- Weather w=objectMapper.readValue(weatherMessage, Weather.class);
- System.out.println("Receiver1收到的消息内容:"+w);
- }
- }
6:创建订阅者应用ch8_2Receiver-2
- package com.ch.ch8_2Receiver1;
- import org.springframework.amqp.rabbit.annotation.Exchange;
- import org.springframework.amqp.rabbit.annotation.Queue;
- import org.springframework.amqp.rabbit.annotation.QueueBinding;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.messaging.handler.annotation.Payload;
- import org.springframework.stereotype.Component;
- import com.fasterxml.jackson.databind.ObjectMapper;
- /**
- * 定义订阅者Receiver2
- */
- @Component
- public class Receiver2 {
- @Autowired
- private ObjectMapper objectMapper;
- @RabbitListener(
- bindings =
- @QueueBinding(
- //队列名weather-queue2保证和别的订阅者不一样
- value = @Queue(value = "weather-queue2",durable = "true"),
- //weather-exchange与发布者的交换机名相同
- exchange = @Exchange(value = "weather-exchange",durable = "true",type = "topic"),
- //weather.message与发布者的消息的路由Key相同
- key = "weather.message"
- )
- )
- @RabbitHandler
- public void receiveWeather(@Payload byte[] weatherMessage)throws Exception{
- System.out.println("-----------订阅者Receiver2接收到消息--------");
- Weather w=objectMapper.readValue(weatherMessage, Weather.class);
- //将JSON字节数组转换为Weather对象
- System.out.println("Receiver2收到的消息内容:"+w);
-
- }
- }
接下来分别运行发布者和订阅者的主类即可,发现一个发布者发布的消息可以被多个订阅者订阅。