电商商城定制开发MQTT的Java代码实现

MQTT的Java代码实现

MQTT(电商商城定制开发消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)电商商城定制开发下基于发布/订阅]电商商城定制开发范式的消息协议。它工作在 TCP/IP协议族上,电商商城定制开发是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一个消息中间件 。

为rabbit开启mqtt

1.在yml文件中添加一个mqtt的端口映射1883:1883

restart: alwayscontainer_name: rabbitmqports:  - 5672:5672  - 15672:15672  - 1883:1883 #mqttvolumes:  - ./data:/var/lib/rabbitmq
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

2.进入rabbit的docker容器内部

 docker exec -it rabbitmq bash
  • 1

3.rabbit内运行

rabbitmq-plugins enable rabbitmq_mqtt
  • 1

4.在网页视图中查看mqtt

使用MQTT软件测试mqtt

1.连接mqtt

2.在MQTT软中添加订阅


在RabbitMQ的队列中查看

3.测试

方法一、在Rabbitmq网页发送消息

方法二、自己给自己发

一、发送消息

  1. 创建springBoot项目,在xml中导入springBoot项目所需要配置以及相关依赖包
<parent>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-parent</artifactId>    <version>2.6.8</version></parent><dependencies>    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-web</artifactId>    </dependency>    <!--使用spring集成启动器,Spring集成提供了对消息传递和其他传输(如HTTP、TCP等)的抽象。-->    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-integration</artifactId>    </dependency>    <dependency>        <groupId>org.springframework.integration</groupId>        <artifactId>spring-integration-mqtt</artifactId>    </dependency>    <dependency>        <groupId>org.springframework.integration</groupId>        <artifactId>spring-integration-stream</artifactId>    </dependency>    <dependency>        <groupId>org.projectlombok</groupId>        <artifactId>lombok</artifactId>    </dependency></dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

注意寻找依赖包:spring.io->projects->LEARN(右边)->2.6.10 GA Refence Doc.->->7.Messaging->Spring Integration:

  1. 创建配置类(2步骤可忽略,此地只为推导使用,配置类的正确使用方式见步骤6)
//修正官网后的(这个配置文件还不可以使用,正确的使用方式见在后面--此地是视频课程讲的讲解中推导步骤,可以省略不看)@Configurationpublic class MqttConfig {    @Bean    public MessageChannel mqttInputChannel() {        return new DirectChannel();    }    /**     * 连接mqtt服务器的工厂     * @return     */    @Bean    public MqttPahoClientFactory mqttClientFactory() {        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();        MqttConnectOptions options = new MqttConnectOptions();        options.setServerURIs(new String[] { "tcp://10.9.48.165:1883" });        options.setUserName("guest");        options.setPassword("guest".toCharArray());        factory.setConnectionOptions(options);        return factory;    }    @Bean    public MessageProducer inbound(MessageChannel mqttInputChannel,MqttPahoClientFactory mqttClientFactory) {        MqttPahoMessageDrivenChannelAdapter adapter =                new MqttPahoMessageDrivenChannelAdapter("springclient",mqttClientFactory,                        "zheshisha");        adapter.setCompletionTimeout(5000);        adapter.setConverter(new DefaultPahoMessageConverter());        //设置一次需要应答        adapter.setQos(1);        //设置对外的通道        adapter.setOutputChannel(mqttInputChannel);        return adapter;    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
//官方文档@Beanpublic MessageChannel mqttInputChannel() {    return new DirectChannel();}@Beanpublic MessageProducer inbound() {    MqttPahoMessageDrivenChannelAdapter adapter =            new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient",                                             "topic1", "topic2");    adapter.setCompletionTimeout(5000);    adapter.setConverter(new DefaultPahoMessageConverter());    adapter.setQos(1);    //问题所在:mqttInputChannel()这个是调用方法,而在这个方法上面加一一个注解@Bean相当于白加~~~    adapter.setOutputChannel(mqttInputChannel());    return adapter;}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  1. 创建接口
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")public interface MyGateway {    void sendToMqtt(String data);}
  • 1
  • 2
  • 3
  • 4
  1. 启动类
@SpringBootApplication//扫描整合的注解@IntegrationComponentScanpublic class MqttStartApp {    public static void main(String[] args) {        SpringApplication.run(MqttStartApp.class, args);    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  1. 编写controller类测试
@RestControllerpublic class MqttController {    private MyGateway myGateway;    @Autowired    public void setMyGateway(MyGateway myGateway) {        this.myGateway = myGateway;    }    @PostMapping("/msg")    public String sendMsg(String msg){        myGateway.sendToMqtt(msg);        return "success";    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  1. 修改后的配置类
@Configurationpublic class MqttConfig {    /**     * 连接mqtt服务器的工厂     * @return     */    @Bean    public MqttPahoClientFactory mqttClientFactory() {        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();        MqttConnectOptions options = new MqttConnectOptions();        options.setServerURIs(new String[] { "tcp://10.9.48.165:1883" });        options.setUserName("guest");        options.setPassword("guest".toCharArray());        factory.setConnectionOptions(options);        return factory;    }    @Bean    public MessageChannel mqttOutboundChannel() {        return new DirectChannel();    }    @Bean    @ServiceActivator(inputChannel = "mqttOutboundChannel") //inputChannel的名字必须和上面的MessageChannel的方法名保持一致    public MessageHandler mqttOutbound() {        MqttPahoMessageHandler messageHandler =                new MqttPahoMessageHandler("testClient", mqttClientFactory());        messageHandler.setAsync(true);        messageHandler.setDefaultTopic("zheshisha");        return messageHandler;    }    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

二、收消息

在配配置文件中加入

/** * 收消息的通道,注意实际开发中和发的可能不在一起 * @return */@Beanpublic MessageChannel mqttInputChannel() {    return new DirectChannel();}@Beanpublic MessageProducer inbound() {    MqttPahoMessageDrivenChannelAdapter adapter =            new MqttPahoMessageDrivenChannelAdapter("tcp://10.9.48.165:1883", "testClient",                    "chixihua");    adapter.setCompletionTimeout(5000);    adapter.setConverter(new DefaultPahoMessageConverter());    adapter.setQos(1);    adapter.setOutputChannel(mqttInputChannel());    return adapter;}/** * 收消息的处理器,用于如何处理消息 * mqttInputChannel 代表的是收消息的通道对象的id * @return */@Bean@ServiceActivator(inputChannel = "mqttInputChannel")public MessageHandler handler() {    return new MessageHandler() {        @Override        public void handleMessage(Message<?> message) throws MessagingException {            System.out.println(message.getPayload());        }    };}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

三、SpringBoot整合MQTT

  1. 导入依赖包
<!--使用spring集成启动器,Spring集成提供了对消息传递和其他传输(如HTTP、TCP等)的抽象。--><dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-integration</artifactId></dependency><dependency>    <groupId>org.springframework.integration</groupId>    <artifactId>spring-integration-mqtt</artifactId></dependency><dependency>    <groupId>org.springframework.integration</groupId>    <artifactId>spring-integration-stream</artifactId></dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  1. 创建配置类
@Configurationpublic class MqttConfig {    @Bean    public MqttConnectOptions options(){        MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();        mqttConnectOptions.setServerURIs(new String[] { "tcp://10.9.48.190:1883" });        mqttConnectOptions.setUserName("dc3");        mqttConnectOptions.setPassword("dc3".toCharArray());        return mqttConnectOptions;    }    /**     * 创建连接工厂     * @param options     * @return     */    @Bean    public MqttPahoClientFactory mqttPahoClientFactory(MqttConnectOptions options){        DefaultMqttPahoClientFactory defaultMqttPahoClientFactory=new DefaultMqttPahoClientFactory();        defaultMqttPahoClientFactory.setConnectionOptions(options);        return defaultMqttPahoClientFactory;    }    @Bean    public MessageChannel messageInputChannel(){        return new DirectChannel();    }    @Bean    public MessageProducer  mqttInbound(MessageChannel messageInputChannel, MqttPahoClientFactory mqttPahoClientFactory){        MqttPahoMessageDrivenChannelAdapter adapter =                new MqttPahoMessageDrivenChannelAdapter("testClient",mqttPahoClientFactory, "chixihua");        adapter.setCompletionTimeout(5000);        adapter.setConverter(new DefaultPahoMessageConverter());        adapter.setQos(1);        adapter.setOutputChannel(messageInputChannel);        return adapter;    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  1. 配置消息处理的类
@Configurationpublic class MessageReceiverHandler {    /**     * 收到设备发送来的上行数据的时候执行,具体怎么做取决于业务,比如这里面可能是设备发来的一些传感器数据,我们需要保存并发送到统计平台     * @return     */    @Bean    @ServiceActivator(inputChannel = "messageInputChannel")    public MessageHandler messageHandler(){        return message -> {            //获取到消息正文            Object payload = message.getPayload();            System.err.println(payload);            //处理消息            System.err.println("等下就处理消息");        };    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  1. 在启动类添加注解
@SpringBootApplication@IntegrationComponentScanpublic class MqttStartApp {    public static void main(String[] args) {        SpringApplication.run(MqttStartApp.class, args);    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发