基于SpringBoot整合Netty开发MQTT服务端

您所在的位置:网站首页 mqtt协议搭建 基于SpringBoot整合Netty开发MQTT服务端

基于SpringBoot整合Netty开发MQTT服务端

2023-09-11 04:10| 来源: 网络整理| 查看: 265

Netty认知

Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,相比传统Socket,在并发性方面有着很大的提升。关于NIO,BIO,AIO之间的区别,可以参考这篇博客:NIO 、 BIO与AIO之间的区别_谁念西风独自凉-CSDN博客

 MQTT服务端实现

首先我们启动一个tcp服务,这里我用到了Redis与RabbitMQ,主要是与分布式WEB平台之间好对接

@Component public class ApplicationEventListener implements CommandLineRunner { @Value("${spring.application.name}") private String nodeName; @Value("${gnss.mqttserver.tcpPort}") private int tcpPort; @Override public void run(String... args) throws Exception { //启动TCP服务 startTcpServer(); //清除Redis所有此节点的在线终端 RedisService redisService = SpringBeanService.getBean(RedisService.class); redisService.deleteAllOnlineTerminals(nodeName); //将所有此节点的终端设置为离线 RabbitMessageSender messageSender = SpringBeanService.getBean(RabbitMessageSender.class); messageSender.noticeAllOffline(nodeName); } /** * 启动TCP服务 * * @throws Exception */ private void startTcpServer() throws Exception { //计数器,必须等到所有服务启动成功才能进行后续的操作 final CountDownLatch countDownLatch = new CountDownLatch(1); //启动TCP服务 TcpServer tcpServer = new TcpServer(tcpPort, ProtocolEnum.MqttCommon, countDownLatch); tcpServer.start(); //等待启动完成 countDownLatch.await(); } }

接下来我们编写一个TcpServer类实现TCP服务

@Slf4j public class TcpServer extends Thread{ private int port; private ProtocolEnum protocolType; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; private ServerBootstrap serverBootstrap = new ServerBootstrap(); private CountDownLatch countDownLatch; public TcpServer(int port, ProtocolEnum protocolType, CountDownLatch countDownLatch) { this.port = port; this.protocolType = protocolType; this.countDownLatch = countDownLatch; bossGroup = new NioEventLoopGroup(1); workerGroup = SpringBeanService.getBean("workerGroup", EventLoopGroup.class); final EventExecutorGroup executorGroup = SpringBeanService.getBean("executorGroup", EventExecutorGroup.class); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new IdleStateHandler(MqttConstant.READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS)); ch.pipeline().addLast("encoder", MqttEncoder.INSTANCE); ch.pipeline().addLast("decoder", new MqttDecoder()); ch.pipeline().addLast(executorGroup, MqttBusinessHandler.INSTANCE); } }); } @Override public void run() { bind(); } /** * 绑定端口启动服务 */ private void bind() { serverBootstrap.bind(port).addListener(future -> { if (future.isSuccess()) { log.info("{} MQTT服务器启动,端口:{}", protocolType, port); countDownLatch.countDown(); } else { log.error("{} MQTT服务器启动失败,端口:{}", protocolType, port, future.cause()); System.exit(-1); } }); } /** * 关闭服务端 */ public void shutdown() { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); log.info("{} TCP服务器关闭,端口:{}", protocolType, port); } }

编写一个解码器MqttBusinessHandler,实现对MQTT消息接收与处理

@Slf4j @ChannelHandler.Sharable public class MqttBusinessHandler extends SimpleChannelInboundHandler { public static final MqttBusinessHandler INSTANCE = new MqttBusinessHandler(); private MqttMsgBack mqttMsgBack; private MqttBusinessHandler() { mqttMsgBack= MqttMsgBack.INSTANCE; } /** * 接收到消息后处理 * @param ctx * @param msg * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { if (null != msg) { MqttMessage mqttMessage = (MqttMessage) msg; MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader(); Channel channel = ctx.channel(); if(mqttFixedHeader.messageType().equals(MqttMessageType.CONNECT)){ //在一个网络连接上,客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接 //建议connect消息单独处理,用来对客户端进行认证管理等 这里直接返回一个CONNACK消息 mqttMsgBack.connectionAck(ctx, mqttMessage); } switch (mqttFixedHeader.messageType()){ //客户端发布消息 case PUBLISH: mqttMsgBack.publishAck(ctx, mqttMessage); break; //发布释放 case PUBREL: mqttMsgBack.publishComp(ctx, mqttMessage); break; //订阅主题 case SUBSCRIBE: mqttMsgBack.subscribeAck(ctx, mqttMessage); break; //取消订阅主题 case UNSUBSCRIBE: mqttMsgBack.unsubscribeAck(ctx, mqttMessage); break; //客户端发送心跳报文 case PINGREQ: mqttMsgBack.pingResp(ctx, mqttMessage); break; //客户端主动断开连接 case DISCONNECT: break; default: break; } } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("终端关闭连接,IP信息:{}", CommonUtil.getClientAddress(ctx)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); log.error("终端连接异常,IP信息:{}", CommonUtil.getClientAddress(ctx), cause); } /** * 服务端当读超时时会调用这个方法 */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception, IOException { ctx.close(); log.error("读超时,IP信息:{}", CommonUtil.getClientAddress(ctx), evt); }

我们对接收到的消息进行业务处理

@Slf4j public class MqttMsgBack { public static final MqttMsgBack INSTANCE = new MqttMsgBack(); private RedisService redisService; private RabbitMessageSender messageSender; private Environment environment; private MessageServiceProvider messageServiceProvider; private MqttMsgBack() { redisService = SpringBeanService.getBean(RedisService.class); messageSender = SpringBeanService.getBean(RabbitMessageSender.class); environment = SpringBeanService.getBean(Environment.class); messageServiceProvider = SpringBeanService.getBean(MessageServiceProvider.class); } /** * 确认连接请求 * @param ctx * @param mqttMessage */ public void connectionAck (ChannelHandlerContext ctx, MqttMessage mqttMessage) { MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage; MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader(); MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader(); //构建返回报文, 可变报头 MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeaderInfo.isCleanSession()); //构建返回报文, 固定报头 MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.CONNACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02); //构建连接回复消息体 MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack); ctx.writeAndFlush(connAck); //获取连接者的ClientId String clientIdentifier = mqttConnectMessage.payload().clientIdentifier(); //查询终端号码有无在平台注册 TerminalProto terminalInfo = redisService.getTerminalInfoByTerminalNum(clientIdentifier); if (terminalInfo == null) { log.error("终端登录失败,未找到终端信息,终端号:{},IP信息:{}", clientIdentifier, CommonUtil.getClientAddress(ctx)); ctx.close(); return; } //设置节点名 terminalInfo.setNodeName(environment.getProperty("spring.application.name")); //保存终端信息和消息流水号到上下文属性中 Session session = new Session(terminalInfo); ChannelHandlerContext oldCtx = SessionUtil.bindSession(session, ctx); if (oldCtx == null) { log.info("终端登录成功,终端ID:{},终端号:{},IP信息:{}", terminalInfo.getTerminalStrId(), clientIdentifier, CommonUtil.getClientAddress(ctx)); } else { log.info("终端重复登录关闭上一个连接,终端ID:{},终端号:{},IP信息:{}", terminalInfo.getTerminalStrId(), clientIdentifier, CommonUtil.getClientAddress(ctx)); oldCtx.close(); } //通知上线 messageSender.noticeOnline(terminalInfo); log.info("终端登录成功,终端号:{},IP信息:{}", clientIdentifier, CommonUtil.getClientAddress(ctx)); } /** * 根据qos发布确认 * @param ctx * @param mqttMessage */ public void publishAck (ChannelHandlerContext ctx, MqttMessage mqttMessage) { MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage; MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader(); MqttQoS qos = (MqttQoS) mqttFixedHeaderInfo.qosLevel(); //得到主题 String topicName = mqttPublishMessage.variableHeader().topicName(); //获取消息体 ByteBuf msgBodyBuf = mqttPublishMessage.payload(); log.info("收到:{}", ByteBufUtil.hexDump(msgBodyBuf)); MqttCommonMessage msg=new MqttCommonMessage(); msg.setTerminalNum(SessionUtil.getTerminalInfo(ctx).getTerminalNum()); msg.setStrMsgId(topicName); //根据主题获取对应的主题消息处理器 BaseMessageService messageService = messageServiceProvider.getMessageService(topicName); try { Object result = messageService.process(ctx, msg, msgBodyBuf); log.info("收到{}({}),终端ID:{},内容:{}", messageService.getDesc(), topicName,msg.getTerminalNum(), msg.getMsgBodyItems()); } catch (Exception e) { log.error("收到{}({}),消息异常,终端ID:{},消息体:{}", messageService.getDesc(), topicName,msg.getTerminalNum(),ByteBufUtil.hexDump(msgBodyBuf), e); } switch (qos) { //至多一次 case AT_MOST_ONCE: break; //至少一次 case AT_LEAST_ONCE: //构建返回报文, 可变报头 MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId()); //构建返回报文, 固定报头 MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02); //构建PUBACK消息体 MqttPubAckMessage pubAck = new MqttPubAckMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack); log.info("Qos:AT_LEAST_ONCE:{}",pubAck.toString()); ctx.writeAndFlush(pubAck); break; //刚好一次 case EXACTLY_ONCE: //构建返回报文,固定报头 MqttFixedHeader mqttFixedHeaderBack2 = new MqttFixedHeader(MqttMessageType.PUBREC,false, MqttQoS.AT_LEAST_ONCE,false,0x02); //构建返回报文,可变报头 MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack2 = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId()); MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack2,mqttMessageIdVariableHeaderBack2); log.info("Qos:EXACTLY_ONCE回复:{}"+mqttMessageBack.toString()); ctx.writeAndFlush(mqttMessageBack); break; default: break; } } /** * 发布完成 qos2 * @param ctx * @param mqttMessage */ public void publishComp (ChannelHandlerContext ctx, MqttMessage mqttMessage) { MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader(); //构建返回报文, 固定报头 MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBCOMP,false, MqttQoS.AT_MOST_ONCE,false,0x02); //构建返回报文, 可变报头 MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId()); MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack,mqttMessageIdVariableHeaderBack); log.info("发布完成回复:{}"+mqttMessageBack.toString()); ctx.writeAndFlush(mqttMessageBack); } /** * 订阅确认 * @param ctx * @param mqttMessage */ public void subscribeAck(ChannelHandlerContext ctx, MqttMessage mqttMessage) { MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) mqttMessage; MqttMessageIdVariableHeader messageIdVariableHeader = mqttSubscribeMessage.variableHeader(); //构建返回报文, 可变报头 MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId()); Set topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(mqttTopicSubscription -> mqttTopicSubscription.topicName()).collect(Collectors.toSet()); List grantedQoSLevels = new ArrayList(topics.size()); for (int i = 0; i < topics.size(); i++) { grantedQoSLevels.add(mqttSubscribeMessage.payload().topicSubscriptions().get(i).qualityOfService().value()); } // 构建返回报文 有效负载 MqttSubAckPayload payloadBack = new MqttSubAckPayload(grantedQoSLevels); // 构建返回报文 固定报头 MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2+topics.size()); // 构建返回报文 订阅确认 MqttSubAckMessage subAck = new MqttSubAckMessage(mqttFixedHeaderBack,variableHeaderBack, payloadBack); log.info("订阅回复:{}", subAck.toString()); ctx.writeAndFlush(subAck); } /** * 取消订阅确认 * @param ctx * @param mqttMessage */ public void unsubscribeAck(ChannelHandlerContext ctx, MqttMessage mqttMessage) { MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader(); // 构建返回报文 可变报头 MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId()); // 构建返回报文 固定报头 MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2); // 构建返回报文 取消订阅确认 MqttUnsubAckMessage unSubAck = new MqttUnsubAckMessage(mqttFixedHeaderBack,variableHeaderBack); log.info("取消订阅回复:{}",unSubAck.toString()); ctx.writeAndFlush(unSubAck); } /** * 心跳响应 * @param ctx * @param mqttMessage */ public void pingResp (ChannelHandlerContext ctx, MqttMessage mqttMessage) { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttMessage mqttMessageBack = new MqttMessage(fixedHeader); log.info("心跳回复:{}", mqttMessageBack.toString()); ctx.writeAndFlush(mqttMessageBack); } }

我们可以根据客户端发布消息的主题匹配不同的处理器

 最后,我们在对应的处理器里面实现对主题消息的处理逻辑,比如:定位消息,指令消息等等,比如简单实现对定位数据Location主题的消息处理

@Slf4j @MessageService(strMessageId = "Location", desc = "定位") public class LocationMessageService extends BaseMessageService { @Autowired private RabbitMessageSender messageSender; @Override public Object process(ChannelHandlerContext ctx, MqttCommonMessage msg, ByteBuf msgBodyBuf) throws Exception { byte[] msgByteArr = new byte[msgBodyBuf.readableBytes()]; msgBodyBuf.readBytes(msgByteArr); String data = new String(msgByteArr); msg.putMessageBodyItem("位置", data); return null; } }  后续

目前仅仅是实现MQTT服务端消息接收与消息回复,后续可以根据接入的物联网设备进行对应主题消息的业务处理

有物联网兴趣的同学可以对加我微信一起交流学习

请添加图片描述



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3