定制化开发Spring Boot异步消息之AMQP讲解及实战(附源码)

定制化开发觉得有帮助请点赞关注收藏~~~

AMQP(定制化开发高级消定制化开发息队列协议)定制化开发是一个提供统一消息服定制化开发务的应用层标准高级消息队列协议。是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可 传递消息,并不受客户端/中间件的不同产品,不同开发语言等条件的限制。

下面实现主要用RabbitMQ讲解AMQP实例,因此需要事先安装RabbitMQ和erlang语言

erlang下载地址 https://www.erlang.org/downloads

RabbitMQ下载地址 https://www.rabbitmq.com/download.html 

使用RabbitMQ实现发布/订阅异步消息模式

1:创建发布者应用ch8_2Sender

2:在pom.xml文件中添加依赖

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. -<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0">
  3. <modelVersion>4.0.0</modelVersion>
  4. -<parent>
  5. <groupId>org.springframework.boot</groupId>
  6. <artifactId>spring-boot-starter-parent</artifactId>
  7. <version>2.1.8.RELEASE</version>
  8. <relativePath/>
  9. <!-- lookup parent from repository -->
  10. </parent>
  11. <groupId>com.ch</groupId>
  12. <artifactId>ch8_2Sender</artifactId>
  13. <version>0.0.1-SNAPSHOT</version>
  14. <name>ch8_2Sender</name>
  15. <description>Demo project for Spring Boot</description>
  16. +<properties>
  17. -<dependencies>
  18. -<dependency>
  19. <groupId>org.springframework.boot</groupId>
  20. <artifactId>spring-boot-starter-amqp</artifactId>
  21. </dependency>
  22. +<dependency>
  23. -<dependency>
  24. <groupId>org.springframework.boot</groupId>
  25. <artifactId>spring-boot-starter-test</artifactId>
  26. <scope>test</scope>
  27. </dependency>
  28. -<dependency>
  29. <groupId>org.springframework.amqp</groupId>
  30. <artifactId>spring-rabbit-test</artifactId>
  31. <scope>test</scope>
  32. </dependency>
  33. </dependencies>
  34. -<build>
  35. -<plugins>
  36. -<plugin>
  37. <groupId>org.springframework.boot</groupId>
  38. <artifactId>spring-boot-maven-plugin</artifactId>
  39. </plugin>
  40. </plugins>
  41. </build>
  42. </project>

 3:创建Weather实体类

  1. package com.ch.ch8_2Sender.entity;
  2. import java.io.Serializable;
  3. public class Weather implements Serializable{
  4. private static final long serialVersionUID = -8221467966772683998L;
  5. private String id;
  6. private String city;
  7. private String weatherDetail;
  8. public String getCity() {
  9. return city;
  10. }
  11. public void setCity(String city) {
  12. this.city = city;
  13. }
  14. public String getWeatherDetail() {
  15. return weatherDetail;
  16. }
  17. public void setWeatherDetail(String weatherDetail) {
  18. this.weatherDetail = weatherDetail;
  19. }
  20. public String getId() {
  21. return id;
  22. }
  23. public void setId(String id) {
  24. this.id = id;
  25. }
  26. @Override
  27. public String toString() {
  28. return "Weather [id=" + id + ", city=" + city + ", weatherDetail=" + weatherDetail + "]";
  29. }
  30. }

4:重写Ch82SenderApplication主类

  1. package com.ch.ch8_2Sender;
  2. import org.springframework.amqp.core.Message;
  3. import org.springframework.amqp.core.MessageBuilder;
  4. import org.springframework.amqp.core.MessageDeliveryMode;
  5. import org.springframework.amqp.rabbit.connection.CorrelationData;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.boot.CommandLineRunner;
  10. import org.springframework.boot.SpringApplication;
  11. import org.springframework.boot.autoconfigure.SpringBootApplication;
  12. import com.ch.ch8_2Sender.entity.Weather;
  13. import com.fasterxml.jackson.databind.ObjectMapper;
  14. @SpringBootApplication
  15. public class Ch82SenderApplication implements CommandLineRunner{
  16. @Autowired
  17. private ObjectMapper objectMapper;
  18. @Autowired
  19. RabbitTemplate rabbitTemplate;
  20. public static void main(String[] args) {
  21. SpringApplication.run(Ch82SenderApplication.class, args);
  22. }
  23. /**
  24. * 定义发布者
  25. */
  26. @Override
  27. public void run(String... args) throws Exception {
  28. //定义消息对象
  29. Weather weather = new Weather();
  30. weather.setId("010");
  31. weather.setCity("北京");
  32. weather.setWeatherDetail("今天晴到多云,南风5-6级,温度19-26°C");
  33. //指定Json转换器,Jackson2JsonMessageConverter默认将消息转换成byte[]类型的消息
  34. rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
  35. //objectMapper将weather对象转换为JSON字节数组
  36. Message msg=MessageBuilder.withBody(objectMapper.writeValueAsBytes(weather))
  37. .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
  38. .build();
  39. // 消息唯一ID
  40. CorrelationData correlationData = new CorrelationData(weather.getId());
  41. //使用已封装好的convertAndSend(String exchange , String routingKey , Object message, CorrelationData correlationData)
  42. //将特定的路由key发送消息到指定的交换机
  43. rabbitTemplate.convertAndSend(
  44. "weather-exchange", //分发消息的交换机名称
  45. "weather.message", //用来匹配消息的路由Key
  46. msg, //消息体
  47. correlationData);
  48. }
  49. }

5:创建订阅者应用ch8_2Receiver-1

  1. package com.ch.ch8_2Receiver1;
  2. import org.springframework.amqp.rabbit.annotation.Exchange;
  3. import org.springframework.amqp.rabbit.annotation.Queue;
  4. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  5. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  6. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.messaging.handler.annotation.Payload;
  9. import org.springframework.stereotype.Component;
  10. import com.fasterxml.jackson.databind.ObjectMapper;
  11. /**
  12. * 定义订阅者Receiver1
  13. */
  14. @Component
  15. public class Receiver1 {
  16. @Autowired
  17. private ObjectMapper objectMapper;
  18. @RabbitListener(
  19. bindings =
  20. @QueueBinding(
  21. //队列名weather-queue1保证和别的订阅者不一样
  22. value = @Queue(value = "weather-queue1",durable = "true"),
  23. //weather-exchange与发布者的交换机名相同
  24. exchange = @Exchange(value = "weather-exchange",durable = "true",type = "topic"),
  25. //weather.message与发布者的消息的路由Key相同
  26. key = "weather.message"
  27. )
  28. )
  29. @RabbitHandler
  30. public void receiveWeather(@Payload byte[] weatherMessage)throws Exception{
  31. System.out.println("-----------订阅者Receiver1接收到消息--------");
  32. //将JSON字节数组转换为Weather对象
  33. Weather w=objectMapper.readValue(weatherMessage, Weather.class);
  34. System.out.println("Receiver1收到的消息内容:"+w);
  35. }
  36. }

6:创建订阅者应用ch8_2Receiver-2

  1. package com.ch.ch8_2Receiver1;
  2. import org.springframework.amqp.rabbit.annotation.Exchange;
  3. import org.springframework.amqp.rabbit.annotation.Queue;
  4. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  5. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  6. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.messaging.handler.annotation.Payload;
  9. import org.springframework.stereotype.Component;
  10. import com.fasterxml.jackson.databind.ObjectMapper;
  11. /**
  12. * 定义订阅者Receiver2
  13. */
  14. @Component
  15. public class Receiver2 {
  16. @Autowired
  17. private ObjectMapper objectMapper;
  18. @RabbitListener(
  19. bindings =
  20. @QueueBinding(
  21. //队列名weather-queue2保证和别的订阅者不一样
  22. value = @Queue(value = "weather-queue2",durable = "true"),
  23. //weather-exchange与发布者的交换机名相同
  24. exchange = @Exchange(value = "weather-exchange",durable = "true",type = "topic"),
  25. //weather.message与发布者的消息的路由Key相同
  26. key = "weather.message"
  27. )
  28. )
  29. @RabbitHandler
  30. public void receiveWeather(@Payload byte[] weatherMessage)throws Exception{
  31. System.out.println("-----------订阅者Receiver2接收到消息--------");
  32. Weather w=objectMapper.readValue(weatherMessage, Weather.class);
  33. //将JSON字节数组转换为Weather对象
  34. System.out.println("Receiver2收到的消息内容:"+w);
  35. }
  36. }

接下来分别运行发布者和订阅者的主类即可,发现一个发布者发布的消息可以被多个订阅者订阅。

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发