前言
物联网是现在比较热门的软件领域,众多物联网厂商都有自己的物联网平台,而物联网平台其中一个核心的模块就是 MQTT 网关。
java 领域中使用 netty 搭建高性能服务器是一个常见的选择,通常情况下使用netty开发的流程如下
编写编解码器 --> 集成实现业务逻辑
mqtt协议是非常出名的协议,Netty已经有了编解码器的实现。我们可以很容易地在Server中插入Mqtt的编解码handler,利用netty已经编写好的模块帮助我们做mqtt的编解码。这样子,我们仅需自己处理mqtt协议的逻辑,比如,登录,推送,订阅等。
接下来就让我们,把带有mqtt编解码器的NettyServer运行起来吧
MqttserverStarter.java
package com.github.shoothzj.mqtt;
public class MqttServerStarter {
public static void main(String[] args) throws Exception {
new MqttServer().start();
}
}
MqttServer.java
package com.github.shoothzj.mqtt;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel. socket .SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MqttServer {
public void start() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
// decoder
p.addLast(new MqttDecoder());
p.addLast(MqttEncoder.INSTANCE);
}
});
// Start the server.
ChannelFuture f = b.bind(1883).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup. shutdown Gracefully();
workerGroup.shutdownGracefully();
}
}
}
客户端使用 eclipse mqtt client进行测试
package com.github.shoothzj.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
@Slf4j
public class Mqtt client Ex {
public static void main(String[] args) throws Exception {
String topic = "MQTT Examples";
String content = "Message from MqttPublishSample";
int qos = 2;
String broker = "tcp:// 127.0.0.1 :1883";
String clientId = "JavaSample";
Memory persistence persistence = new MemoryPersistence();
try {
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.println("Connecting to broker: " + broker);
sampleClient.connect(connOpts);
System.out.println("Connected");
System.out.println("Publishing message: " + content);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
sampleClient.publish(topic, message);
System.out.println("Message published");
sampleClient.disconnect();
System.out.println("Disconnected");
System.exit(0);
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}
然后我们先运行MqttServer,再运行MqttClient,发现MqttClient卡住了
Connecting to broker: tcp://127.0.0.1:1883
这是为什么呢,我们通过抓包发现仅仅只有客户端发送了Mqtt connect信息,服务端并没有响应
但是根据mqtt标准协议,发送Connect消息,必须要有ConnAck响应
所以我们需要在接收到Connect后,返回connAck消息