您的位置 首页 java

java实现一个Mqtt broker01启用NettyServer

前言

物联网是现在比较热门的软件领域,众多物联网厂商都有自己的物联网平台,而物联网平台其中一个核心的模块就是 MQTT 网关。

java 领域中使用 netty 搭建高性能服务器是一个常见的选择,通常情况下使用netty开发的流程如下

 编写编解码器 --> 集成实现业务逻辑  

mqtt协议是非常出名的协议,Netty已经有了编解码器的实现。我们可以很容易地在Server中插入Mqtt的编解码handler,利用netty已经编写好的模块帮助我们做mqtt的编解码。这样子,我们仅需自己处理mqtt协议的逻辑,比如,登录,推送,订阅等。

接下来就让我们,把带有mqtt编解码器的NettyServer运行起来吧

MqttserverStarter.java

 package com.github.shoothzj.mqtt;

public class MqttServerStarter {

    public  static   void  main(String[] args) throws  Exception  {
        new MqttServer().start();
    }

}  

MqttServer.java

 package com.github.shoothzj.mqtt;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel. socket .SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MqttServer {

    public void start() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            // decoder
                            p.addLast(new MqttDecoder());
                            p.addLast(MqttEncoder.INSTANCE);
                        }
                    });

            // Start the server.
            ChannelFuture f = b.bind(1883).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup. shutdown Gracefully();
            workerGroup.shutdownGracefully();
        }
    }

}  

客户端使用 eclipse mqtt client进行测试

 package com.github.shoothzj.mqtt;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

@Slf4j
public class Mqtt client Ex {

    public static void main(String[] args) throws Exception {
        String topic = "MQTT Examples";
        String content = "Message from MqttPublishSample";
        int qos = 2;
         String  broker = "tcp:// 127.0.0.1 :1883";
        String clientId = "JavaSample";
        Memory persistence  persistence = new MemoryPersistence();

        try {
            MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);
            System.out.println("Connecting to broker: " + broker);
            sampleClient.connect(connOpts);
            System.out.println("Connected");
            System.out.println("Publishing message: " + content);
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            sampleClient.publish(topic, message);
            System.out.println("Message published");
            sampleClient.disconnect();
            System.out.println("Disconnected");
            System.exit(0);
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        }
    }

}  

然后我们先运行MqttServer,再运行MqttClient,发现MqttClient卡住了

 Connecting to broker: tcp://127.0.0.1:1883

  

这是为什么呢,我们通过抓包发现仅仅只有客户端发送了Mqtt connect信息,服务端并没有响应

但是根据mqtt标准协议,发送Connect消息,必须要有ConnAck响应

所以我们需要在接收到Connect后,返回connAck消息

参考

文章来源:智云一二三科技

文章标题:java实现一个Mqtt broker01启用NettyServer

文章地址:https://www.zhihuclub.com/201340.shtml

关于作者: 智云科技

热门文章

网站地图