netty+springboot实现通信服务端,客户端、心跳分离,排除无效连接

您所在的位置:网站首页 netty服务端重启端口被占用 netty+springboot实现通信服务端,客户端、心跳分离,排除无效连接

netty+springboot实现通信服务端,客户端、心跳分离,排除无效连接

2024-07-15 21:06| 来源: 网络整理| 查看: 265

netty详解 netty实现socket服务端(TCP)netty实现websocket服务端(TCP/HTTP)netty实现socket(UDP)netty实现socket客户端(TCP)

netty实现socket服务端(TCP)

1、配置启动类(第一种写法,普遍但是阻塞,不太推荐)

@Slf4j @Component public class NettyServer { @Autowired private NettyServerChannelInitializer nettyServerChannelInitializer; @PostConstruct public void start() { //创建接收请求和处理请求的实例(默认线程数为 CPU 核心数乘以2也可自定义) NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(2); try { //创建服务端启动辅助类(boostrap 用来为 Netty 程序的启动组装配置一些必须要组件,例如上面的创建的两个线程组) ServerBootstrap socketBs = new ServerBootstrap(); //channel 方法用于指定服务器端监听套接字通道 //socket配置 socketBs.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) //ChannelOption.SO_BACKLOG对应的是tcp/ip协议listen函数中的backlog参数, // 函数listen(int socketfd,int backlog)用来初始化服务端可连接队列, // 服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接, // 多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小 .option(ChannelOption.SO_BACKLOG, 1024) //快速复用,防止服务端重启端口被占用的情况发生 .option(ChannelOption.SO_REUSEADDR, true) .childHandler(nettyServerChannelInitializer) //如果TCP_NODELAY没有设置为true,那么底层的TCP为了能减少交互次数,会将网络数据积累到一定的数量后, // 服务器端才发送出去,会造成一定的延迟。在互联网应用中,通常希望服务是低延迟的,建议将TCP_NODELAY设置为true .childOption(ChannelOption.TCP_NODELAY, true) //默认的心跳间隔是7200s即2小时。Netty默认关闭该功能。 .childOption(ChannelOption.SO_KEEPALIVE, true); //默认的心跳间隔是7200s即2小时。Netty默认关闭该功能。 //.childOption(ChannelOption.SO_KEEPALIVE, true); //这里的sync()意思是异步阻塞,等待绑定成功,当绑定成功的时候会notifyAll唤醒所有线程,代码继续往下执行,此时如果注册出错,会关闭ChannelFuture ->取消socketFuture.channel().closeFuture().sync()线程阻塞 ChannelFuture socketFuture = socketBs.bind(8688).sync(); //这里的sync()意思是异步阻塞,等待通道成功关闭或者注册异常的时候的时候会notifyAll唤醒所有线程,然后执行finally里的代码,优雅的关闭线程组,去掉这里会直接关闭,具体可参考【https://gorden5566.com/post/1066.html】 socketFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { //最终会执行这里关闭线程组 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }

为什么说不太推荐,查了很多资料,也百度了很久,基本的争议是socketFuture.channel().closeFuture().sync();这行代码,这里只要不关闭,就会一直阻塞,导致之后的所有代码不会执行,而这一行代码的意义就是为了让netty更加优雅的关闭,但是在实际开发中,作为服务端来说,如果你有把netty单独停掉的需求,你可以用第一种方法,但是即使这样,为什么不写一个优雅停止的方法呢?在手动关闭netty的时候,顺便手动释放一下线程组,同样可以达到优雅关闭的目的。就像这样

try { ChannelFuture future = nettyServer.future.channel().close().sync(); future.addListener(future1 -> { if(future1.isSuccess()) { nettyServer.shutdown(); } }); } catch (InterruptedException e) { e.printStackTrace(); }

启动类第二种写法:

@Slf4j @Component public class NettyServer { @Autowired private NettyServerChannelInitializer nettyServerChannelInitializer; private NioEventLoopGroup bossGroup; private NioEventLoopGroup workerGroup; private ChannelFuture future; @PostConstruct public void start() { //创建接收请求和处理请求的实例(默认线程数为 CPU 核心数乘以2也可自定义) bossGroup = new NioEventLoopGroup(1); workerGroup = new NioEventLoopGroup(2); try { //创建服务端启动辅助类(boostrap 用来为 Netty 程序的启动组装配置一些必须要组件,例如上面的创建的两个线程组) ServerBootstrap socketBs = new ServerBootstrap(); //channel 方法用于指定服务器端监听套接字通道 //socket配置 socketBs.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) //ChannelOption.SO_BACKLOG对应的是tcp/ip协议listen函数中的backlog参数, // 函数listen(int socketfd,int backlog)用来初始化服务端可连接队列, // 服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接, // 多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小 .option(ChannelOption.SO_BACKLOG, 1024) //快速复用,防止服务端重启端口被占用的情况发生 .option(ChannelOption.SO_REUSEADDR, true) .childHandler(nettyServerChannelInitializer) //如果TCP_NODELAY没有设置为true,那么底层的TCP为了能减少交互次数,会将网络数据积累到一定的数量后, // 服务器端才发送出去,会造成一定的延迟。在互联网应用中,通常希望服务是低延迟的,建议将TCP_NODELAY设置为true .childOption(ChannelOption.TCP_NODELAY, true) //默认的心跳间隔是7200s即2小时。Netty默认关闭该功能。 .childOption(ChannelOption.SO_KEEPALIVE, true); future = socketBs.bind(8688).sync(); future.addListener(future1 -> log.info("Netty服务端启动成功")); } catch (InterruptedException e) { e.printStackTrace(); } } /** *这里可以手动关闭,同时在整个springboot应用停止的时候,这里也得以调用【@PreDestroy】 *参考 https://www.cnblogs.com/CreatorKou/p/11606870.html */ @PreDestroy public void shutdown() { // 优雅关闭两个 EventLoopGroup 对象 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); log.info("[*Netty服务端关闭成功]"); } }

2、配置channel管道规则

/** * @author: zhouwenjie * @description: 配置管道 服务端初始化,客户端与服务器端连接一旦创建,这个类中方法就会被回调,设置出站编码器和入站解码器 * @create: 2020-04-03 14:14 **/ @Component public class NettyServerChannelInitializer extends ChannelInitializer { @Autowired private NettyServerHandler nettyServerHandler; @Autowired private HeartBeatServerHandler heartBeatServerHandler; @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); //数据传输规则,防止粘包(客户端和服务端发送的消息结尾必须都要带上@_) String delimiter = "@_"; //设置心跳机制 0永不超时,三个参数分别代表 读超时,写超时,读写超时 pipeline.addLast(new IdleStateHandler(30,0,0, TimeUnit.SECONDS)); //限制每次数据传输的长度,防止违法大数据攻击,放在编码器上边 pipeline.addLast(new DelimiterBasedFrameDecoder(1024*8, Unpooled.wrappedBuffer(delimiter.getBytes()))); //通过换行符,即\n或者\r\n对数据进行处理 System.getProperty("line.separator") //pipeline.addLast(new LineBasedFrameDecoder(1024)); pipeline.addLast(new StringDecoder(Charset.forName("UTF-8"))); pipeline.addLast(new StringEncoder(Charset.forName("UTF-8"))); //一定在编码器下边,否则不生效 pipeline.addLast(new DelimiterBasedFrameEncoder(delimiter)); //心跳放在正常handler前边,先走心跳处理器 pipeline.addLast(heartBeatServerHandler); pipeline.addLast(nettyServerHandler); } }

切记:Handler根据需求可配置多个,但是先后位置一定要选择好,否则可能导致不生效,先触发执行的放在前边,被引用的放在引用的前边。 这里是将心跳(heartBeatServerHandler)的逻辑单独出来,实现更好的解耦,nettyServerHandler中可以更好的处理正常逻辑,下边分别是心跳处理器和正常逻辑处理器的代码。

心跳处理器 HeartBeatServerHandler 为什么需要心跳:当客户端异常退出或者路由器断线等特殊情况的时候,连接可能并不会自动关闭。但是对于应用而言,每一个连接都是宝贵的,所以不能让冗余的连接存在来浪费资源。

/** * @author: zhouwenjie * @description: 心跳检测处理器 * @create: 2022-03-25 16:12 **/ @Slf4j @Component @ChannelHandler.Sharable public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // 接收到心跳请求,处理心跳消息,否则进入下一处理流程,心跳规则(ping,pong) String s = (String) msg; if ("ping".equals(s)) { ctx.writeAndFlush("pong"); } else { //fireChannelRead 表示传递消息至下一个处理器 ctx.fireChannelRead(msg); } } /** * 功能描述: 心跳检测 * * @param ctx 这里的作用主要是解决断网,弱网的情况发生 * @param evt * @return void * @author zhouwenjie * @date 2020/4/3 17:02 */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { String socketString = ctx.channel().remoteAddress().toString(); if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { log.info("Client RSU: " + socketString + " READER_IDLE 读超时"); ctx.disconnect(); } } } /** * 在处理过程中引发异常时被调用 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("[server] heart response error: {}", cause.getMessage()); ctx.fireExceptionCaught(cause); } }

提示:心跳规则通过NettyServerChannelInitializer中的IdleStateHandler处理器进行配置。

正常逻辑处理器 NettyServerHandler

/** * @author: zhouwenjie * @description: 服务端业务处理类 * @create: 2020-04-03 14:13 **/ @Slf4j @Component @ChannelHandler.Sharable public class NettyServerHandler extends ChannelInboundHandlerAdapter { /** * 用于记录和管理所有客户端的channle,客户端掉线会自动移除,不用手动触发 */ public static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** * 功能描述:客户端连接时触发 */ @Override public void channelActive(ChannelHandlerContext ctx) { clients.add(ctx.channel()); System.out.println(ctx.channel().id().asLongText() + "连接成功,web客户端数量:" + clients.size() + "个"); } /** * 功能描述:客户端断开时触发 */ @Override public void channelInactive(ChannelHandlerContext ctx) { //不用手动移除,ChannelGroup可以自动从分组里移除ctx System.out.println(ctx.channel().id().asLongText() + "连接断开,剩余web客户端数量:" + clients.size() + "个"); } /** * 功能描述: 有客户端发消息会触发此函数 * * @param ctx * @param msg * @return void * @author zhouwenjie * @date 2020/4/3 16:48 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { User user = JSON.parseObject(msg.toString(), User.class); } /** * 功能描述: * * @param ctx * @param cause * @return void * @author 发生异常会触发此函数 * @date 2020/4/3 16:49 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.error("连接异常,异常信息:" + cause.getMessage()); cause.printStackTrace(); } } netty实现websocket服务端(TCP/HTTP)

启动类配置 NettyServer

/** * @author: zhouwenjie * @description: netty启动配置类 * @create: 2020-04-03 11:43 **/ @Component public class NettyServer { @Value("${netty.webSocket_port}") private int webSocketPort; @Autowired private WebSocketChannelInitializer webSocketChannelInitializer; private NioEventLoopGroup bossGroup; private NioEventLoopGroup workerGroup; public void start() { //创建接收请求和处理请求的实例(默认线程数为 CPU 核心数乘以2也可自定义) bossGroup = new NioEventLoopGroup(1); workerGroup = new NioEventLoopGroup(4); try { //创建服务端启动辅助类(boostrap 用来为 Netty 程序的启动组装配置一些必须要组件,例如上面的创建的两个线程组) ServerBootstrap webSocketBs = new ServerBootstrap(); //webSocket配置 webSocketBs.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(webSocketChannelInitializer) //ChannelOption.SO_BACKLOG对应的是tcp/ip协议listen函数中的backlog参数,函数listen(int socketfd,int backlog)用来初始化服务端可连接队列,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小 .option(ChannelOption.SO_BACKLOG, 10); // .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture webSocketFuture = webSocketBs.bind(webSocketPort).sync(); webSocketFuture .addListener(future1 -> log.info("Netty服务端启动成功")); } catch (InterruptedException e) { e.printStackTrace(); } } @PreDestroy public void shutdown() { // 优雅关闭两个 EventLoopGroup 对象 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); log.info("[*Netty服务端关闭成功]"); } }

channel管道配置 WebSocketChannelInitializer

/** * @author: zhouwenjie * @description: websocket * @create: 2020-10-29 16:31 **/ @Component public class WebSocketChannelInitializer extends ChannelInitializer { @Autowired private TextWebSocketHandler textWebSocketHandler; @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // webSocket 基于http协议,所以要有http编解码器 //以下配置放在最下方不生效 //这里没做心跳,需要的话可以参照socket的做法来做心跳,或者直接在下方userEventTriggered中编写心跳逻辑 //pipeline.addLast(new IdleStateHandler(3,0,0, TimeUnit.SECONDS)); pipeline.addLast(new HttpServerCodec()); // 对写大数据流的支持 pipeline.addLast(new ChunkedWriteHandler()); //用于将http数据聚合到一起发送一个请求 fullHttpRequest pipeline.addLast(new HttpObjectAggregator(8192)); //设置成true,开启前缀【/ws/alarmHost】检查,否则无法获取uri参数,排除无效连接 pipeline.addLast(new WebSocketServerProtocolHandler("/ws/alarmHost",true)); pipeline.addLast(textWebSocketHandler); } }

处理器 TextWebSocketHandler

/** * @author: zhouwenjie * @description: * @create: 2020-10-29 16:35 **/ @Component @ChannelHandler.Sharable public class TextWebSocketHandler extends SimpleChannelInboundHandler { /** * 用于记录和管理所有客户端的channle */ public static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("连接首先进入 ====================="); } @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) { String content = msg.text(); System.out.println("接受到的数据:" + content); //转发消息注意项,见下方注明 ctx.writeAndFlush(new TextWebSocketFrame(content)); } /** * 此方法在连接之后可以第一时间拿到请求参数及请求头信息,所以可以在这里对连接客户端进行保存等处理 * 另外此方法也可用于心跳事件处理 */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { System.out.println("连接之后进入 ====================="); //websocket握手成功触发 if (evt instanceof HandshakeComplete) { clients.add(ctx.channel()); HandshakeComplete handshakeComplete= (HandshakeComplete) evt; String s = handshakeComplete.requestUri(); System.out.println(ctx.channel().id().asLongText() + "连接ffff成功,web客户端数量:" + clients.size() + "个"+s); } super.userEventTriggered(ctx,evt); } /** * 功能描述:客户端断开时触发 */ @Override public void handlerRemoved(ChannelHandlerContext ctx) { //不用手动一处,ChannelGroup可以自动从分组里移除ctx System.out.println(ctx.channel().id().asLongText() + "连接断开,剩余web客户端数量:" + clients.size() + "个"); } /** * 功能描述:客户端异常时触发 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } /* * 这个方法也可以拿到用户传递的参数,但是触发条件是必须在通信时才能获取 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //通过这个方法可以拿到web客户端传递过来的参数以及请求头(heads) if (msg != null && msg instanceof DefaultHttpRequest) { HttpRequest request = (HttpRequest) msg; String uri = request.uri(); String id = uri.split("/")[2]; idCtxMap.put(id,ctx); request.setUri("/ws"); } super.channelRead(ctx,msg); } }

websocket收到消息,转发消息注意事项 这里举个例子,加入我们由两个ChannelHandlerContext =>ctx1、ctx2

protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) { String content = msg.text(); ctx1.writeAndFlush(msg);//不起作用,因为msg已经被上边一行代码读过了,会报引用计数器异常,需要用下变得代码复制一份 TextWebSocketFrame frame = msg.retainedDuplicate(); ctx1.writeAndFlush(frame);//不起作用 ctx2.writeAndFlush(frame);//起作用 ctx1.writeAndFlush(new TextWebSocketFrame(content));//起作用,推荐使用这一种,上边的方法如果群发只有一个客户端能接收到 } } netty实现socket(UDP)

启动类 NettyUdpApplication

UDP协议的特点: 1、UDP传送数据前并不与对方建立连接,即UDP是无连接的。 2、UDP接收到的数据报不发送确认信号,发送端不知道数据是否被正确接收。 3、UDP传送数据比TCP快,系统开销也少。 UDP的客户端个服务端基本一样,主要是在handler的逻辑处理上

public void start() { try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup) .channel(NioDatagramChannel.class) .option(ChannelOption.SO_BROADCAST, true) .handler(obuUdpClientHandler); //这里的bind(receivePort)的意思是监听receivePort端口,端口只要有消息就获取 bind(0)这里的bootstrap只作为发送方存在,不接收回复 log.info("[Netty UDP客户端启动成功]"); bootstrap.bind(receivePort).sync(); } catch (InterruptedException e) { e.printStackTrace(); } } @PreDestroy public void shutdown() { eventLoopGroup.shutdownGracefully(); log.info("[*Netty关闭成功]"); }

处理器 UdpClientHandler

@Component @Slf4j @ChannelHandler.Sharable public class ObuUdpClientHandler extends SimpleChannelInboundHandler { @Override public void channelActive(ChannelHandlerContext ctx) { } @Override protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) { //String msg=packet.content().toString(StandardCharsets.UTF_8) String msg; try { ByteBuf byteBuf = packet.content(); byte[] bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); msg = new String(bytes); JSONObject jsonObject = JSON.parseObject(msg); } catch (IllegalArgumentException e) { e.printStackTrace(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); cause.printStackTrace(); } }

UDP发送消息

#监听接收地址填写255.255.255.255,因为是局域网。 receiveHost: 255.255.255.255 #监听接收端口 receivePort: 7301 #发送端口 sendHost: 255.255.255.255 #发送端口 sendPort: 7302 ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(msg, Charset.forName("utf-8")), new InetSocketAddress(sendHost, sendPort))); netty实现socket客户端(TCP)

客户端需要做断线重连机制

private NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); private Channel channel; private Bootstrap bootstrap; public void run() { bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .remoteAddress(host, port) //如果TCP_NODELAY没有设置为true,那么底层的TCP为了能减少交互次数,会将网络数据积累到一定的数量后, //服务器端才发送出去,会造成一定的延迟。在互联网应用中,通常希望服务是低延迟的,建议将TCP_NODELAY设置为true //建议只是客户端设置此参数,服务端设置会有无效警告 .option(ChannelOption.TCP_NODELAY, true) //快速复用,防止服务端重启端口被占用的情况发生 .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) { //客户端初始化 String delimiter = "@_"; socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024*8, Unpooled.wrappedBuffer(delimiter.getBytes()))); socketChannel.pipeline().addLast(new StringDecoder(Charset.forName("utf-8"))); socketChannel.pipeline().addLast(new StringEncoder(Charset.forName("utf-8"))); //socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024)); socketChannel.pipeline().addLast(new DelimiterBasedFrameEncoder(delimiter)); socketChannel.pipeline().addLast(nettyClientHandler); } }); //用于一台电脑多网口的情况下指定客户端的IP地址,端口为0,不指定 //bootstrap.localAddress("127.0.0.1",0); //连接netty服务器 reconnect(); } /** * 功能描述: 断线重连,客户端有断线重连机制,就更不能使用异步阻塞了 * @param * @return void * @author zhouwenjie * @date 2021/3/19 14:53 */ public void reconnect() { ChannelFuture channelFuture = bootstrap.connect(); //使用最新的ChannelFuture -> 开启最新的监听器 channelFuture.addListener((ChannelFutureListener) future -> { if (future.cause() != null) { log.error("连接失败。。。"); future.channel().eventLoop().schedule(() -> reconnect(), 2, TimeUnit.SECONDS); } else { log.info("连接成功。。。"); } }); } /** * 关闭 client */ @PreDestroy public void shutdown() { // 优雅关闭 EventLoopGroup 对象 eventLoopGroup.shutdownGracefully(); log.info("[*Netty客户端关闭]"); }

handler代码

/** * @author: zhouwenjie * @description: 客户端处理类 * @create: 2020-04-03 17:45 **/ @Component @Slf4j @ChannelHandler.Sharable public class NettyClientHandler extends SimpleChannelInboundHandler { /** * 注入NettyClient */ @Autowired private NettyClient nettyClient; /** * 连接成功 */ @Override public void channelActive(ChannelHandlerContext ctx) { } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.error("[*The netty server suspends service...]"); super.channelInactive(ctx); ctx.fireChannelInactive(); nettyClient.reconnect(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); log.error("[* Netty connection exception]:{}", cause.toString()); cause.printStackTrace(); } @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { } }

当然,文中如果有理解不对的地方,欢迎指出!!!



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3