客户管理系统开发定制springboot 整合 mqtt

springboot 整合 mqtt

最近由于iot越来越火, 客户管理系统开发定制物联网的需求越来越多, 客户管理系统开发定制那么理所当然的使用mqtt客户管理系统开发定制的场景也就越来越多,
客户管理系统开发定制接下来是我使用springboot整合mqtt的过程, 客户管理系统开发定制以及踩过的一些坑.

mqtt客户管理系统开发定制服务器使用的是 EMQX, 官网 :

客户管理系统开发定制搭建的时候如果你使用的是集群 客户管理系统开发定制记得开放以下端口:

好了, 客户管理系统开发定制搭建成功下一步就是我们的java程序要与mqtt连接, 客户管理系统开发定制这里有两种方式(客户管理系统开发定制其实不止两种)进行连接.
一是 直接使用 MQTT Java 客户端库,详情可以查看官方的例子: 我就跳过了

二是使用 spring integration mqtt也是比较推荐的一种,也是我们主讲这种.

第一步 添加 maven dependency

        <dependency>            <groupId>org.springframework.integration</groupId>            <artifactId>spring-integration-mqtt</artifactId>            <version>5.5.14</version>        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

第二步 添加配置

1 先写好一些基本配置

mqtt: username: test                        # 账号 password: 123456                      # 密码 host-url: tcp://127.0.0.1:1883        # mqtt连接tcp地址 in-client-id: ${random.value}         # 随机值,使出入站 client ID 不同 out-client-id: ${random.value} client-id: ${random.int}                   # 客户端Id,不能相同,采用随机数 ${random.value} default-topic: test/#,topic/+/+/up         # 默认主题 timeout: 60                                # 超时时间 keepalive: 60                              # 保持连接 clearSession: true                         # 清除会话(设置为false,断开连接,重连后使用原来的会话 保留订阅的主题,能接收离线期间的消息)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  1. 然后写一个对应的类MqttProperties
import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;/** * MqttProperties  * * @author hengzi * @date 2022/8/23 */@Componentpublic class MqttProperties {    /**     * 用户名     */    @Value("${mqtt.username}")    private String username;    /**     * 密码     */    @Value("${mqtt.password}")    private String password;    /**     * 连接地址     */    @Value("${mqtt.host-url}")    private String hostUrl;    /**     * 进-客户Id     */    @Value("${mqtt.in-client-id}")    private String inClientId;    /**     * 出-客户Id     */    @Value("${mqtt.out-client-id}")    private String outClientId;    /**     * 客户Id     */    @Value("${mqtt.client-id}")    private String clientId;    /**     * 默认连接话题     */    @Value("${mqtt.default-topic}")    private String defaultTopic;    /**     * 超时时间     */    @Value("${mqtt.timeout}")    private int timeout;    /**     * 保持连接数     */    @Value("${mqtt.keepalive}")    private int keepalive;    /**是否清除session*/    @Value("${mqtt.clearSession}")    private boolean clearSession;	// ...getter and setter}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75

接下来就是配置一些乱七八糟的东西, 这里有很多概念性的东西 比如 管道channel, 适配器 adapter, 入站Inbound, 出站Outbound,等等等等, 看起来是非常头痛的

好吧,那就一个一个来,

首先连接mqtt需要一个客户端, 那么我们就开一个客户端工厂, 这里可以产生很多很多的客户端

    @Bean    public MqttPahoClientFactory mqttPahoClientFactory(){        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();        MqttConnectOptions options = new MqttConnectOptions();        options.setServerURIs(mqttProperties.getHostUrl().split(","));        options.setUserName(mqttProperties.getUsername());        options.setPassword(mqttProperties.getPassword().toCharArray());        factory.setConnectionOptions(options);        return factory;    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

然后再搞两根管子(channel),一个出站,一个入站

    //出站消息管道,    @Bean    public MessageChannel mqttOutboundChannel(){        return new DirectChannel();    }    // 入站消息管道    @Bean    public MessageChannel mqttInboundChannel(){        return new DirectChannel();    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

为了使这些管子能流通 就需要一个适配器(adapter)

    // Mqtt 管道适配器    @Bean    public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){        return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

然后定义消息生产者

    // 消息生产者    @Bean    public MessageProducer mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter){        adapter.setCompletionTimeout(5000);        adapter.setConverter(new DefaultPahoMessageConverter());        //入站投递的通道        adapter.setOutputChannel(mqttInboundChannel());        adapter.setQos(1);        return adapter;    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

那我们收到消息去哪里处理呢,答案是这里:

    @Bean    //使用ServiceActivator 指定接收消息的管道为 mqttInboundChannel,投递到mqttInboundChannel管道中的消息会被该方法接收并执行    @ServiceActivator(inputChannel = "mqttInboundChannel")    public MessageHandler handleMessage() {    	// 这个 mqttMessageHandle 其实就是一个 MessageHandler 的实现类(这个类我放下面)        return mqttMessageHandle;		// 你也可以这样写//        return new MessageHandler() {//            @Override//            public void handleMessage(Message<?> message) throws MessagingException {//                // do something//            }//        };    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

到这里我们其实已经可以接受到来自mqtt的消息了

接下来配置向mqtt发送消息

配置 出站处理器

    // 出站处理器    @Bean    @ServiceActivator(inputChannel = "mqttOutboundChannel")    public MessageHandler mqttOutbound(MqttPahoClientFactory factory){        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);        handler.setAsync(true);        handler.setConverter(new DefaultPahoMessageConverter());        handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);        return handler;    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

这个 出站处理器 在我看来就是让别人 (MqttPahoMessageHandler)处理了, 我就不处理了,我只管我要发送什么,至于怎么发送,由MqttPahoMessageHandler来完成

接下来我们定义一个接口即可

import org.springframework.integration.annotation.MessagingGateway;import org.springframework.integration.mqtt.support.MqttHeaders;import org.springframework.messaging.handler.annotation.Header;import org.springframework.stereotype.Component;/** * MqttGateway * * @author hengzi * @date 2022/8/23 */@Component@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")public interface MqttGateway {    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer Qos, String data);}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

我们直接调用这个接口就可以向mqtt 发送数据


到目前为止,整个配置文件长这样:

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.integration.annotation.ServiceActivator;import org.springframework.integration.channel.DirectChannel;import org.springframework.integration.core.MessageProducer;import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;import org.springframework.integration.mqtt.core.MqttPahoClientFactory;import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;import org.springframework.messaging.Message;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.MessageHandler;import org.springframework.messaging.MessagingException;/** * MqttConfig * * @author hengzi * @date 2022/8/23 */@Configurationpublic class MqttConfig {    /**     *  以下属性将在配置文件中读取     **/    @Autowired    private MqttProperties mqttProperties;    //Mqtt 客户端工厂    @Bean    public MqttPahoClientFactory mqttPahoClientFactory(){        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();        MqttConnectOptions options = new MqttConnectOptions();        options.setServerURIs(mqttProperties.getHostUrl().split(","));        options.setUserName(mqttProperties.getUsername());        options.setPassword(mqttProperties.getPassword().toCharArray());        factory.setConnectionOptions(options);        return factory;    }    // Mqtt 管道适配器    @Bean    public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){        return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));    }    // 消息生产者    @Bean    public MessageProducer mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter){        adapter.setCompletionTimeout(5000);        adapter.setConverter(new DefaultPahoMessageConverter());        //入站投递的通道        adapter.setOutputChannel(mqttInboundChannel());        adapter.setQos(1);        return adapter;    }    // 出站处理器    @Bean    @ServiceActivator(inputChannel = "mqttOutboundChannel")    public MessageHandler mqttOutbound(MqttPahoClientFactory factory){        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);        handler.setAsync(true);        handler.setConverter(new DefaultPahoMessageConverter());        handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);        return handler;    }    @Bean    //使用ServiceActivator 指定接收消息的管道为 mqttInboundChannel,投递到mqttInboundChannel管道中的消息会被该方法接收并执行    @ServiceActivator(inputChannel = "mqttInboundChannel")    public MessageHandler handleMessage() {        return mqttMessageHandle;    }    //出站消息管道,    @Bean    public MessageChannel mqttOutboundChannel(){        return new DirectChannel();    }    // 入站消息管道    @Bean    public MessageChannel mqttInboundChannel(){        return new DirectChannel();    }    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99

处理消息的 MqttMessageHandle

@Componentpublic class MqttMessageHandle implements MessageHandler {    @Override    public void handleMessage(Message<?> message) throws MessagingException {         }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

在进一步了解之后,发现可以优化的地方,比如channel 的类型是有很多种的, 这里使用的DirectChannel,是Spring Integration默认的消息通道,它将消息发送给为一个订阅者,然后阻碍发送直到消息被接收,传输方式都是同步的方式,都是由一个线程来运行的.

这里我们可以将入站channel改成 ExecutorChannel一个可以使用多线程的channel

    @Bean    public ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor()    {        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();        // 最大可创建的线程数        int maxPoolSize = 200;        executor.setMaxPoolSize(maxPoolSize);        // 核心线程池大小        int corePoolSize = 50;        executor.setCorePoolSize(corePoolSize);        // 队列最大长度        int queueCapacity = 1000;        executor.setQueueCapacity(queueCapacity);        // 线程池维护线程所允许的空闲时间        int keepAliveSeconds = 300;        executor.setKeepAliveSeconds(keepAliveSeconds);        // 线程池对拒绝任务(无线程可用)的处理策略        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());        return executor;    }    // 入站消息管道    @Bean    public MessageChannel mqttInboundChannel(){        // 用线程池        return new ExecutorChannel(mqttThreadPoolTaskExecutor());    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

到这里其实可以运行了.

但是这样配置其实还是有点多, 有点乱, 于是我查找官网, f发现一种更简单的配置方法 叫 Java DSL, 官网连接:

我们参考官网,稍微改一下,使用 DSL的方式进行配置:

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.integration.channel.ExecutorChannel;import org.springframework.integration.dsl.IntegrationFlow;import org.springframework.integration.dsl.IntegrationFlows;import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;import org.springframework.integration.mqtt.core.MqttPahoClientFactory;import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.ThreadPoolExecutor;/** * MqttConfigV2 * * @author hengzi * @date 2022/8/24 */@Configurationpublic class MqttConfigV2 {    @Autowired    private MqttProperties mqttProperties;    @Autowired    private MqttMessageHandle mqttMessageHandle;    //Mqtt 客户端工厂 所有客户端从这里产生    @Bean    public MqttPahoClientFactory mqttPahoClientFactory(){        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();        MqttConnectOptions options = new MqttConnectOptions();        options.setServerURIs(mqttProperties.getHostUrl().split(","));        options.setUserName(mqttProperties.getUsername());        options.setPassword(mqttProperties.getPassword().toCharArray());        factory.setConnectionOptions(options);        return factory;    }    // Mqtt 管道适配器    @Bean    public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){        return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));    }    // 消息生产者 (接收,处理来自mqtt的消息)    @Bean    public IntegrationFlow mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter) {        adapter.setCompletionTimeout(5000);        adapter.setQos(1);        return IntegrationFlows.from( adapter)                .channel(new ExecutorChannel(mqttThreadPoolTaskExecutor()))                .handle(mqttMessageHandle)                .get();    }    @Bean    public ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor()    {        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();        // 最大可创建的线程数        int maxPoolSize = 200;        executor.setMaxPoolSize(maxPoolSize);        // 核心线程池大小        int corePoolSize = 50;        executor.setCorePoolSize(corePoolSize);        // 队列最大长度        int queueCapacity = 1000;        executor.setQueueCapacity(queueCapacity);        // 线程池维护线程所允许的空闲时间        int keepAliveSeconds = 300;        executor.setKeepAliveSeconds(keepAliveSeconds);        // 线程池对拒绝任务(无线程可用)的处理策略        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());        return executor;    }    // 出站处理器 (向 mqtt 发送消息)    @Bean    public IntegrationFlow mqttOutboundFlow(MqttPahoClientFactory factory) {        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);        handler.setAsync(true);        handler.setConverter(new DefaultPahoMessageConverter());        handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);        return IntegrationFlows.from( "mqttOutboundChannel").handle(handler).get();    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95

这样看起来真的简单多了, 头也没那么大了, 我要是早知道多好.

好了以上就是配置相关的, 到这里其实是已经完成springboot 与 mqtt 的整合了.


但其实我一直有个想法, 就是我们接收的消息 都是在 handleMessage这个方法里面执行的,

	@Override    public void handleMessage(Message<?> message) throws MessagingException {     			    }
  • 1
  • 2
  • 3
  • 4

所以我就有了一个想法, 能不能根据 我订阅的主题,在不同的方法执行, 对于这个问题,其实你用if ... else ...也能实现, 但很明显,如果我订阅的主题很多的话, 那写起来就很头痛了.

对于这个问题,有两种思路, 一个是添加Spring Integration的路由 router,根据不同topic路由到不同的channel, 这个我也知道能不能实现, 我这里就不讨论了.

第二种是, 我也不知道名字改如何叫, 我是参考了 spring@Controller的设计, 暂且叫他注解模式.

众所周知,我们的接口都是在类上加 @Controller这个注解, 就代表这个类是 http 接口, 再在方法加上 @RequestMapping就能实现不同的 url 调用不同的方法.

参数这个设计 我们在类上面加 @MqttService就代表这个类是专门处理mqtt消息的服务类
同时 在这个类的方法上 加上 @MqttTopic就代表 这个主题由这个方法处理.

OK, 理论有了,接下来就是 实践.

先定义 两个注解

import org.springframework.core.annotation.AliasFor;import org.springframework.stereotype.Component;import java.lang.annotation.*;@Documented@Target({ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)@Componentpublic @interface MqttService {    @AliasFor(            annotation = Component.class    )    String value() default "";}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

加上 @Component注解 spring就会扫描, 并注册到IOC容器里

import java.lang.annotation.ElementType;import java.lang.annotation.Retention;import java.lang.annotation.RetentionPolicy;import java.lang.annotation.Target;@Target(ElementType.METHOD)@Retention(RetentionPolicy.RUNTIME)public @interface MqttTopic {    /**     * 主题名字     */    String value() default "";}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

参考 @RequestMapping我们使用起来应该是这样的:

import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.messaging.Message;/** * MqttTopicHandle * * @author hengzi * @date 2022/8/24 */@MqttServicepublic class MqttTopicHandle {    public static final Logger log = LoggerFactory.getLogger(MqttTopicHandle.class);	// 这里的 # 号是通配符    @MqttTopic("test/#")    public void test(Message<?> message){        log.info("test="+message.getPayload());    }		// 这里的 + 号是通配符    @MqttTopic("topic/+/+/up")    public void up(Message<?> message){        log.info("up="+message.getPayload());    }	// 注意 你必须先订阅    @MqttTopic("topic/1/2/down")    public void down(Message<?> message){        log.info("down="+message.getPayload());    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

OK 接下来就是实现这样的使用

分析 :

当我们收到消息时, 我们从IOC容器中 找到所有 带 @MqttService注解的类

然后 遍历这些类, 找到带有 @MqttTopic的方法

接着 把 @MqttTopic注解的的值 与 接受到的topic 进行对比

如果一致则执行这个方法

废话少说, 上代码

import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.messaging.Message;import org.springframework.messaging.MessageHandler;import org.springframework.messaging.MessagingException;import org.springframework.stereotype.Component;import java.lang.reflect.InvocationTargetException;import java.lang.reflect.Method;import java.util.Map;/** * MessageHandleService * * @author hengzi * @date 2022/8/24 */@Componentpublic class MqttMessageHandle implements MessageHandler {    public static final Logger log = LoggerFactory.getLogger(MqttMessageHandle.class);    // 包含 @MqttService注解 的类(Component)    public static Map<String, Object> mqttServices;    /**     * 所有mqtt到达的消息都会在这里处理     * 要注意这个方法是在线程池里面运行的     * @param message message     */    @Override    public void handleMessage(Message<?> message) throws MessagingException {        getMqttTopicService(message);    }    public Map<String, Object> getMqttServices(){        if(mqttServices==null){            mqttServices = SpringUtils.getBeansByAnnotation(MqttService.class);        }        return mqttServices;    }    public void getMqttTopicService(Message<?> message){        // 在这里 我们根据不同的 主题 分发不同的消息        String receivedTopic = message.getHeaders().get("mqtt_receivedTopic",String.class);        if(receivedTopic==null || "".equals(receivedTopic)){            return;        }        for(Map.Entry<String, Object> entry : getMqttServices().entrySet()){        	// 把所有带有 @MqttService 的类遍历            Class<?> clazz = entry.getValue().getClass();            // 获取他所有方法            Method[] methods = clazz.getDeclaredMethods();            for ( Method method: methods ){                if (method.isAnnotationPresent(MqttTopic.class)){                	// 如果这个方法有 这个注解                    MqttTopic handleTopic = method.getAnnotation(MqttTopic.class);                    if(isMatch(receivedTopic,handleTopic.value())){                    	// 并且 这个 topic 匹配成功                        try {                            method.invoke(SpringUtils.getBean(clazz),message);                            return;                        } catch (IllegalAccessException e) {                            e.printStackTrace();                            log.error("代理炸了");                        } catch (InvocationTargetException e) {                            log.error("执行 {} 方法出现错误",handleTopic.value(),e);                        }                    }                }            }        }    }    /**     * mqtt 订阅的主题与我实际的主题是否匹配     * @param topic 是实际的主题     * @param pattern 是我订阅的主题 可以是通配符模式     * @return 是否匹配     */    public static boolean isMatch(String topic, String pattern){        if((topic==null) || (pattern==null) ){            return false;        }        if(topic.equals(pattern)){            // 完全相等是肯定匹配的            return true;        }        if("#".equals(pattern)){            // # 号代表所有主题  肯定匹配的            return true;        }        String[] splitTopic = topic.split("/");        String[] splitPattern = pattern.split("/");        boolean match = true;        // 如果包含 # 则只需要判断 # 前面的        for (int i = 0; i < splitPattern.length; i++) {            if(!"#".equals(splitPattern[i])){                // 不是# 号 正常判断                if(i>=splitTopic.length){                    // 此时长度不相等 不匹配                    match = false;                    break;                }                if(!splitTopic[i].equals(splitPattern[i]) && !"+".equals(splitPattern[i])){                    // 不相等 且不等于 +                    match = false;                    break;                }            }            else {                // 是# 号  肯定匹配的                break;            }        }        return match;    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129

工具类 SpringUtils

import org.springframework.aop.framework.AopContext;import org.springframework.beans.BeansException;import org.springframework.beans.factory.NoSuchBeanDefinitionException;import org.springframework.beans.factory.config.BeanFactoryPostProcessor;import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;import org.springframework.context.ApplicationContext;import org.springframework.context.ApplicationContextAware;import org.springframework.stereotype.Component;import java.util.Map;/** * spring工具类 方便在非spring管理环境中获取bean *  */@Componentpublic final class SpringUtils implements BeanFactoryPostProcessor, ApplicationContextAware {    /** Spring应用上下文环境 */    private static ConfigurableListableBeanFactory beanFactory;    private static ApplicationContext applicationContext;    public static Map<String, Object> getBeansByAnnotation(Class clsName) throws BeansException{        return beanFactory.getBeansWithAnnotation(clsName);    }    @Override    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException     {        SpringUtils.beanFactory = beanFactory;    }    @Override    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException     {        SpringUtils.applicationContext = applicationContext;    }    /**     * 获取对象     *     * @param name     * @return Object 一个以所给名字注册的bean的实例     * @throws org.springframework.beans.BeansException     *     */    @SuppressWarnings("unchecked")    public static <T> T getBean(String name) throws BeansException    {        return (T) beanFactory.getBean(name);    }    /**     * 获取类型为requiredType的对象     *     * @param clz     * @return     * @throws org.springframework.beans.BeansException     *     */    public static <T> T getBean(Class<T> clz) throws BeansException    {        T result = (T) beanFactory.getBean(clz);        return result;    }    /**     * 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true     *     * @param name     * @return boolean     */    public static boolean containsBean(String name)    {        return beanFactory.containsBean(name);    }    /**     * 判断以给定名字注册的bean定义是一个singleton还是一个prototype。 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException)     *     * @param name     * @return boolean     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException     *     */    public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException    {        return beanFactory.isSingleton(name);    }    /**     * @param name     * @return Class 注册对象的类型     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException     *     */    public static Class<?> getType(String name) throws NoSuchBeanDefinitionException    {        return beanFactory.getType(name);    }    /**     * 如果给定的bean名字在bean定义中有别名,则返回这些别名     *     * @param name     * @return     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException     *     */    public static String[] getAliases(String name) throws NoSuchBeanDefinitionException    {        return beanFactory.getAliases(name);    }    /**     * 获取aop代理对象     *      * @param invoker     * @return     */    @SuppressWarnings("unchecked")    public static <T> T getAopProxy(T invoker)    {        return (T) AopContext.currentProxy();    }    /**     * 获取当前的环境配置,无配置返回null     *     * @return 当前的环境配置     */    public static String[] getActiveProfiles()    {        return applicationContext.getEnvironment().getActiveProfiles();    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142

OK, 大功告成. 终于舒服了, 终于不用写if...else...了, 个人感觉这样处理起来会更加优雅. 写代码最重要是什么, 是优雅~

以上!

参考文章:

附:
动态添加主题方式:

import org.springframework.beans.factory.annotation.Autowired;import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;import org.springframework.stereotype.Service;import java.util.Arrays;/** * MqttService * * @author hengzi * @date 2022/8/25 */@Servicepublic class MqttService {    @Autowired    private MqttPahoMessageDrivenChannelAdapter adapter;    public void addTopic(String topic) {        addTopic(topic, 1);    }    public void addTopic(String topic,int qos) {        String[] topics = adapter.getTopic();        if(!Arrays.asList(topics).contains(topic)){            adapter.addTopic(topic,qos);        }    }    public void removeTopic(String topic) {        adapter.removeTopic(topic);    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

直接调用就行

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发