Netty心跳机制

您所在的位置:网站首页 netty心跳检测 Netty心跳机制

Netty心跳机制

2023-09-10 11:08| 来源: 网络整理| 查看: 265

前文需求回顾

完成对红酒窖的室内温度采集及监控功能。由本地应用程序+温度传感器定时采集室内温度上报至服务器,如果温度 >20 °C 则由服务器下发重启空调指令,如果本地应用长时间不上传温度给服务器,则给户主手机发送一条预警短信。

Netty入门篇-从双向通信开始「上文」

上篇算是完成简单的双向通信了,我们接着看看 “如果本地应用长时间不上传温度给服务器…”,很明显客户端有可能挂了嘛,所以怎么实现客户端与服务端的长连接就是本文要实现的了。

什么是心跳机制

百度百科:心跳机制是定时发送一个自定义的结构体(心跳包),让对方知道自己还活着,以确保连接的有效性的机制。

简单说,这个心跳机制是由客户端主动发起的消息,每隔一段时间就向服务端发送消息,告诉服务端自己还没死,可不要给户主发送预警短信啊。

如何实现心跳机制 1、客户端代码修改

我们需要改造一下上节中客户端的代码,首先是在责任链中增加一个心跳逻辑处理类HeartbeatHandler

public class NettyClient { private static String host = "127.0.0.1"; public static void main(String[] args) { NioEventLoopGroup workerGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap // 1.指定线程模型 .group(workerGroup) // 2.指定 IO 类型为 NIO .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) // 3.IO 处理逻辑 .handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) { ch.pipeline() .addLast(new IdleStateHandler(0, 10, 0)) .addLast(new StringDecoder()) .addLast(new StringEncoder()) .addLast(new HeartbeatHandler()) .addLast(new NettyClientHandler()); } }); // 4.建立连接 bootstrap.connect(host, 8070).addListener(future -> { if (future.isSuccess()) { System.out.println("连接成功!"); } else { System.err.println("连接失败!"); } }); } }

没什么变化,主要是增加了HeartbeatHandler,我们来看看这个类:

import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import java.nio.charset.Charset; import java.time.LocalTime; public class HeartbeatHandler extends ChannelInboundHandlerAdapter { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent idleStateEvent = (IdleStateEvent) evt; if (idleStateEvent.state() == IdleState.WRITER_IDLE) { System.out.println("10秒了,需要发送消息给服务端了" + LocalTime.now()); //向服务端送心跳包 ByteBuf buffer = getByteBuf(ctx); //发送心跳消息,并在发送失败时关闭该连接 ctx.writeAndFlush(buffer).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } } else { super.userEventTriggered(ctx, evt); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("捕获的异常:" + cause.getMessage()); ctx.channel().close(); } private ByteBuf getByteBuf(ChannelHandlerContext ctx) { // 1. 获取二进制抽象 ByteBuf ByteBuf buffer = ctx.alloc().buffer(); String time = "heartbeat:客户端心跳数据:" + LocalTime.now(); // 2. 准备数据,指定字符串的字符集为 utf-8 byte[] bytes = time.getBytes(Charset.forName("utf-8")); // 3. 填充数据到 ByteBuf buffer.writeBytes(bytes); return buffer; } }

还是继承自ChannelInboundHandlerAdapter,不过这次重写的是userEventTriggered()方法,这个方法在客户端的所有ChannelHandler中,如果10s内没有发生write事件时触发,所以我们在该方法中给服务端发送心跳消息。

业务逻辑处理类NettyClientHandler没有改动,代码如下:

import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.nio.charset.Charset; import java.util.Date; import java.util.Random; public class NettyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println(new Date() + ": 客户端写出数据"); // 1. 获取数据 ByteBuf buffer = getByteBuf(ctx); // 2. 写数据 ctx.channel().writeAndFlush(buffer); } private ByteBuf getByteBuf(ChannelHandlerContext ctx) { // 1. 获取二进制抽象 ByteBuf ByteBuf buffer = ctx.alloc().buffer(); Random random = new Random(); double value = random.nextDouble() * 14 + 8; String temp = "获取室内温度:" + value; // 2. 准备数据,指定字符串的字符集为 utf-8 byte[] bytes = temp.getBytes(Charset.forName("utf-8")); // 3. 填充数据到 ByteBuf buffer.writeBytes(bytes); return buffer; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println(new Date() + ": 客户端读到数据 -> " + msg.toString()); } }

对如上代码不了解的可以回看上一节:Netty入门篇-从双向通信开始

2、服务端代码修改

服务端代码主要是开启TCP底层心跳机制支持,.childOption(ChannelOption.SO_KEEPALIVE, true) ,其他的代码并没有改动:

import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class NettyServer { public static void main(String[] args) { NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap .group(bossGroup, workerGroup) // 指定Channel .channel(NioServerSocketChannel.class) //服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数 .option(ChannelOption.SO_BACKLOG, 1024) //设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文 .childOption(ChannelOption.SO_KEEPALIVE, true) //将小的数据包包装成更大的帧进行传送,提高网络的负载 .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(new ChannelInitializer() { @Override protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new NettyServerHandler()); } }); serverBootstrap.bind(8070); } }

我们再来看看服务端的业务处理类 NettyServerHandler

import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.nio.charset.Charset; import java.util.Date; public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf byteBuf = (ByteBuf) msg; String message = byteBuf.toString(Charset.forName("utf-8")); System.out.println(new Date() + ": 服务端读到数据 -> " + message); /** 心跳数据是不发送数据 **/ if(!message.contains("heartbeat")){ ByteBuf out = getByteBuf(ctx); ctx.channel().writeAndFlush(out); } } private ByteBuf getByteBuf(ChannelHandlerContext ctx) { byte[] bytes = "我是发送给客户端的数据:请重启冰箱!".getBytes(Charset.forName("utf-8")); ByteBuf buffer = ctx.alloc().buffer(); buffer.writeBytes(bytes); return buffer; } }

对channelRead() 方法增加了一个 if 判断,判断如果包含heartbeat字符串就认为这是客户端发过来的心跳,这种判断是非常low的,因为到目前为止我们一直是用简单字符串来传递数据的,上边传递的数据就直接操作字符串;那么问题来了,如果我们想传递对象怎么搞呢?下节写。我们先来看一下如上代码客户端与服务端运行截图:

服务端 客户端

至此,整个心跳机制就完成了,这样每隔10秒客户端就会给服务端发送一个心跳消息,下节我们通过了解通协议以完善心跳机制的代码。

18年专科毕业后,我创建了一个java相关的公众号,用来记录自己的学习之路,感兴趣的小伙伴可以关注一下:小伟后端笔记



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3