Spring Boot 整合Netty 登录、心跳、自定义编解码、重连

您所在的位置:网站首页 netty与tomcat区别 Spring Boot 整合Netty 登录、心跳、自定义编解码、重连

Spring Boot 整合Netty 登录、心跳、自定义编解码、重连

2024-06-12 10:03| 来源: 网络整理| 查看: 265

什么是Netty?

Netty 是一个利用 Java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架。 Netty 是一个广泛使用的 Java 网络编程框架(Netty 在 2011 年获得了Duke's Choice Award,见https://www.java.net/dukeschoice/2011)。它活跃和成长于用户社区,像大型公司 Facebook 和 Instagram 以及流行 开源项目如 Infinispan, HornetQ, Vert.x, Apache Cassandra 和 Elasticsearch 等,都利用其强大的对于网络抽象的核心代码。

Netty和Tomcat有什么区别?

Netty和Tomcat最大的区别就在于通信协议,Tomcat是基于Http协议的,他的实质是一个基于http协议的web容器,但是Netty不一样,他能通过编程自定义各种协议,因为netty能够通过codec自己来编码/解码字节流,完成类似redis访问的功能,这就是netty和tomcat最大的不同。

有人说netty的性能就一定比tomcat性能高,其实不然,tomcat从6.x开始就支持了nio模式,并且后续还有APR模式——一种通过jni调用apache网络库的模式,相比于旧的bio模式,并发性能得到了很大提高,特别是APR模式,而netty是否比tomcat性能更高,则要取决于netty程序作者的技术实力了。

Netty是一款受到大公司青睐的框架,在我看来,netty能够受到青睐的原因有三: 并发高传输快封装好 Netty为什么并发高

Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,对比于BIO(Blocking I/O,阻塞IO),他的并发性能得到了很大提高.

NIO的单线程能处理连接的数量比BIO要高出很多,而为什么单线程能处理更多的连接呢?原因就是Selector。 当一个连接建立之后,他有两个步骤要做,第一步是接收完客户端发过来的全部数据,第二步是服务端处理完请求业务之后返回response给客户端。NIO和BIO的区别主要是在第一步。 在BIO中,等待客户端发数据这个过程是阻塞的,这样就造成了一个线程只能处理一个请求的情况,而机器能支持的最大线程数是有限的,这就是为什么BIO不能支持高并发的原因。 而NIO中,当一个Socket建立好之后,Thread并不会阻塞去接受这个Socket,而是将这个请求交给Selector,Selector会不断的去遍历所有的Socket,一旦有一个Socket建立完成,他会通知Thread,然后Thread处理完数据再返回给客户端——这个过程是不阻塞的,这样就能让一个Thread处理更多的请求了。

Netty为什么传输快

Netty的传输快其实也是依赖了NIO的一个特性——零拷贝。我们知道,Java的内存有堆内存、栈内存和字符串常量池等等,其中堆内存是占用内存空间最大的一块,也是Java对象存放的地方,一般我们的数据如果需要从IO读取到堆内存,中间需要经过Socket缓冲区,也就是说一个数据会被拷贝两次才能到达他的的终点,如果数据量大,就会造成不必要的资源浪费。 Netty针对这种情况,使用了NIO中的另一大特性——零拷贝,当他需要接收数据的时候,他会在堆内存之外开辟一块内存,数据就直接从IO读到了那块内存中去,在netty里面通过ByteBuf可以直接对这些数据进行直接操作,从而加快了传输速度。

简单了解一下Netty,下边进入正题,Netty 与Spring Boot 进行整合

贴一下Pom.xml 

4.0.0 org.springframework.boot spring-boot-starter-parent 2.1.6.RELEASE com.fyrt fyrtim 0.0.1-SNAPSHOT fyrtim Demo project for Spring Boot 1.8 4.1.37.Final 1.2.46 0.6.12 3.1.1 org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test test io.netty netty-all ${netty.version} org.projectlombok lombok true com.alibaba fastjson ${fastjson.version} org.msgpack msgpack ${msgpack.version} cn.hutool hutool-all 5.7.14 cn.hutool hutool-all 5.7.14 org.springframework.boot spring-boot-maven-plugin

 其中netty 版本是4.1.37.Final  springboot 版本是2.1.6.RELEASE

服务端:NettyServer

@Slf4j @Component public class NettyServer { EventLoopGroup bossGroup = new NioEventLoopGroup(2); EventLoopGroup workGroup = new NioEventLoopGroup(2); @Autowired private ServerChannelInitializer serverChannelInitializer; public void start(Integer port) { log.info("NettyServer 启动..."); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(this.bossGroup, this.workGroup) .channel(NioServerSocketChannel.class) .childHandler(this.serverChannelInitializer) .option(ChannelOption.SO_BACKLOG, 1024) .childOption(ChannelOption.SO_KEEPALIVE, true) .handler(new LoggingHandler(LogLevel.INFO)); try { ChannelFuture future = bootstrap.bind(port).sync(); log.info("服务器启动开始监听端口: {}", port); future.channel().closeFuture().sync(); if (future.isSuccess()) { log.info("启动 Netty Server"); } } catch (InterruptedException var4) { var4.printStackTrace(); } } @PreDestroy public void destroy() { this.bossGroup.shutdownGracefully(); this.workGroup.shutdownGracefully(); log.info("关闭Netty"); } }

ClientChannelInitializer

@Component public class ServerChannelInitializer extends ChannelInitializer { @Autowired private ChatRespHandler chatRespHandler; @Autowired private HeartBeatRespHandler heartBeatRespHandler; @Autowired private LoginAuthRespHandler loginAuthRespHandler; @Override protected void initChannel(SocketChannel socketChannel) { ChannelPipeline pipeline = socketChannel.pipeline(); //用户每次请求都会从第一个Handler开始 //设置超时时间 pipeline.addLast(new IdleStateHandler(30, 30, 60, TimeUnit.SECONDS)); // //定长解码器 pipeline.addLast(new LengthFieldBasedFrameDecoder(1024*100, 0, 2, 0, 2)); //增加解码器 pipeline.addLast(new MsgPackDecoder()); // //这里设置读取报文的包头长度来避免粘包 pipeline.addLast(new LengthFieldPrepender(2)); //增加编码器 pipeline.addLast(new MsgPackEncoder()); //心跳续约 pipeline.addLast(this.heartBeatRespHandler); pipeline.addLast(this.loginAuthRespHandler); pipeline.addLast(this.chatRespHandler); } }

登录处理器

@Slf4j @Sharable @Component public class LoginAuthRespHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { log.info("LoginAuthRespHandler channelRead :[{}]", msg); String msgData = (String) msg; //判断是否登录 if (NettyConstants.userIdChannelMap.size() == 0) { //判断是否是登录请求 if (msg.toString().contains(MessageType.REQLOGINALARM.getMsgType())) { String ackLoginAlarmMsg; if (msgData.contains(NettyConstants.ALARM_LOGIN_REQ_SUCCESS)) { NettyConstants.userIdChannelMap.put(msgData, ctx.channel()); ackLoginAlarmMsg = NettyConstants.ACK_LOGIN_ALARM_RESULT_OK; } else { ackLoginAlarmMsg = NettyConstants.ACK_LOGIN_ALARM_RESULT_FAIL; } NettyMsg nettyMsg = new NettyMsg(MessageType.ACKLOGINALARM.getCode(), NettyMsg.getCurrentTimeStamp(), ackLoginAlarmMsg); nettyMsg.getMsgHead().setMsgType((byte) MessageType.ACKLOGINALARM.getCode()); log.info("登陆响应信息:[{}]", nettyMsg); ctx.writeAndFlush(nettyMsg); } else { log.info("用户未登录发送的请求:[{}],不予处理..", msgData); } } else { log.info("用户已登录,收到信息Msg:[{}]", msgData); ctx.fireChannelRead(msgData); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }

心跳处理器 

@Slf4j @Sharable @Component public class HeartBeatRespHandler extends SimpleChannelInboundHandler { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { log.info("HeartBeatRespHandler channelRead :[{}]", msg); String msgData = (String) msg; if (NettyConstants.userIdChannelMap.size() > 0) { //是否是心跳请求 if (msgData.contains(MessageType.REQHEARTBEAT.getMsgType())) { log.info("收到客户端心跳请求,响应心跳请求.."); String ackMsg = msgData.replace(NettyConstants.REQ_HEART_BEAT, NettyConstants.ACK_HEART_BEAT); NettyMsg nettyMsg = new NettyMsg(MessageType.ACKHEARTBEAT.getCode(), NettyMsg.getCurrentTimeStamp(), ackMsg); nettyMsg.getMsgHead().setMsgType((byte) MessageType.ACKHEARTBEAT.getCode()); String ackHeartBeatMsg = NettyMsg.buildNettyMsg(nettyMsg); log.info("响应心跳请求信息:[{}]", ackHeartBeatMsg); ctx.writeAndFlush(nettyMsg); } else { ctx.fireChannelRead(msgData); } } else { ctx.fireChannelRead(msgData); } } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) { } @Override public void channelInactive(ChannelHandlerContext ctx) { NettyConstants.userIdChannelMap.clear(); log.info("服务端已关闭"); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object obj) { if (obj instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) obj; if (event.state() == IdleState.READER_IDLE) { log.info("客户端读超时"); } else if (event.state() == IdleState.WRITER_IDLE) { log.info("客户端写超时"); } else if (event.state() == IdleState.ALL_IDLE) { log.info("客户端所有操作超时"); } } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }

对话处理器

@Slf4j @Sharable @Component public class ChatRespHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { log.info("ChatRespHandler channelRead 接收聊天信息:[{}]", msg); ctx.writeAndFlush(msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }

public class NettyConstants { public static final String ACCOUNT = "123456"; public static final String PASSWORD = "123456"; public static final Map userIdChannelMap = new ConcurrentHashMap(); public static final String ACK_LOGIN_ALARM = "ackLoginAlarm;"; public static final String ACK_SYNC_ALARMMSG = "ackSyncAlarmMsg;"; public static final String ACK_HEART_BEAT = "ackHeartBeat;"; public static final String REQ_LOGIN_ALARM = "reqLoginAlarm;"; public static final String REQ_SYNC_ALARMMSG = "reqSyncAlarmMsg;"; public static final String REQ_HEART_BEAT = "reqHeartBeat;"; public static final String ACK_LOGIN_ALARM_RESULT_OK = StrUtil.concat(Boolean.TRUE, ACK_LOGIN_ALARM, "result=succ;resDesc=null"); public static final String ACK_LOGIN_ALARM_RESULT_FAIL = StrUtil.concat(Boolean.TRUE, ACK_LOGIN_ALARM, "result=fail;resDesc=username-or-key-error"); /** * 正确的登录验证字符串 */ public static final String ALARM_LOGIN_REQ_SUCCESS = StrUtil.concat( Boolean.TRUE, REQ_LOGIN_ALARM, "user=", ACCOUNT, ";key=", PASSWORD, ";type=msg" ); }

自定义编码器

@Slf4j public class MsgPackEncoder extends MessageToByteEncoder { @Override protected void encode(ChannelHandlerContext ctx, NettyMsg msg, ByteBuf out) { log.info("MsgPackEncoder encode:{}", msg); // 写入开头的标志 out.writeShort(msg.getMsgHead().getStartSign()); // 写入消息类型 out.writeByte(msg.getMsgHead().getMsgType()); // 写入秒时间戳 out.writeInt(msg.getMsgHead().getTimeStamp()); byte[] bytes = msg.getMsg().getBytes(); // 写入长度 out.writeShort((short)bytes.length); // 写入消息主体 out.writeBytes(bytes); } }

自定义解码器

@Slf4j public class MsgPackDecoder extends ByteToMessageDecoder { /** * 协议开始标志 */ private short headData = (short) 0xFFFF; private int headLength = 9; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { log.info("MsgPackDecoder decode...."); // 保证魔法词和数组长度有效 if (in.readableBytes() < headLength) { return; } in.markReaderIndex(); short startIndex = in.readShort(); log.info("startIndex:{}", startIndex); if (startIndex != headData) { in.resetReaderIndex(); throw new CorruptedFrameException("无效开始标志: " + startIndex); } byte msgType = in.readByte(); log.info("msgType:{}", msgType); in.markReaderIndex(); int timeStamp = in.readInt(); log.info("timeStamp:{}", timeStamp); in.markReaderIndex(); short lenOfBody = in.readShort(); log.info("lenOfBody:{}", lenOfBody); if (in.readableBytes() < lenOfBody) { in.resetReaderIndex(); return; } in.markReaderIndex(); // 消息的长度 byte[] msgByte = new byte[lenOfBody]; in.readBytes(msgByte); String msgContent = new String(msgByte); log.info("收到的字节码转成字符串:{}", msgContent); out.add(msgContent); } }

自定义消息格式如下:按实际情况自定义

 客户端,我这里为了测试使用main 启动:

@Slf4j public class AlarmNettyClient { public static AtomicInteger REQID = new AtomicInteger(1); EventLoopGroup group = new NioEventLoopGroup(); public void connect(int port, String host) throws InterruptedException { ChannelFuture future = null; try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new LoggingHandler(LogLevel.INFO)) .handler(new ClientChannelInitializer()); future = bootstrap.connect(host, port).sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); log.info("服务端连接失败,msg:{}", e.getMessage()); } finally { // group.shutdownGracefully(); if (null != future) { if (future.channel() != null && future.channel().isOpen()) { future.channel().close(); } } log.info("【{}】准备重连...", DateUtil.now()); Thread.sleep(3000); connect(port, host); log.info("【{}】重连成功...", DateUtil.now()); } } public static void main(String[] args) throws InterruptedException { new AlarmNettyClient().connect(NettyConstants.LOCAL_PORT, NettyConstants.LOCAL_PORT); } } public class ClientChannelInitializer extends ChannelInitializer { @Override protected void initChannel(SocketChannel ch) throws Exception { //设置超时时间 ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new IdleStateHandler(30, 30, 60, TimeUnit.SECONDS)); //定长解码器 pipeline.addLast(new LengthFieldBasedFrameDecoder(1024*100, 0, 2, 0, 2)); pipeline.addLast(new MsgPackDecoder()); //这里设置读取报文的包头长度来避免粘包 pipeline.addLast(new LengthFieldPrepender(2)); pipeline.addLast(new MsgPackEncoder()); //心跳续约 pipeline.addLast(new HeartBeatReqHandler()); //登录校验 pipeline.addLast(new LoginAuthReqHandler()); //消息处理 pipeline.addLast(new ChatReqHandler()); } }

客户端登录处理器 

@Slf4j public class LoginAuthReqHandler extends ChannelInboundHandlerAdapter { /** * 通道激活 发送登录请求信息 * * @param ctx */ @Override public void channelActive(ChannelHandlerContext ctx) { NettyMsg nettyMsg = new NettyMsg(null, null, NettyConstants.ALARM_LOGIN_REQ_SUCCESS); nettyMsg.getMsgHead().setMsgType((byte) MessageType.REQLOGINALARM.getCode()); log.info("LoginAuthReqHandler channelActive,发送登录信息:{}", nettyMsg); ctx.writeAndFlush(nettyMsg); } /** * channel读取数据 * * @param ctx * @param msg */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { log.info("LoginAuthReqHandler channelRead 收到信息:{}", msg); String msgDate = msg.toString(); if (msgDate.contains(MessageType.ACKLOGINALARM.getMsgType())) { if (msgDate.contains(NettyConstants.ACK_LOGIN_ALARM_RESULT_OK)) { //标记为已登录状态 LoginUtil.markAsLogin(ctx.channel()); log.info("登录成功"); // 一行代码实现逻辑的删除 删除LoginAuthReqHandler ctx.pipeline().remove(this); // 将事件继续传播下去 ctx.fireChannelActive(); } else { log.info("登录失败"); ctx.channel().close(); } } else { // 将事件继续传播下去 ctx.fireChannelActive(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } /** * channel (拦截器)移除 处理器方法执行完 * * @return */ @Override public void handlerRemoved(ChannelHandlerContext ctx) { if (LoginUtil.hasLogin(ctx.channel())) { System.out.println("当前连接登录验证完毕,无需再次验证"); } else { System.out.println("无登录验证,强制关闭连接!"); } } }

客户端心跳处理器 

@Slf4j public class HeartBeatReqHandler extends SimpleChannelInboundHandler { @Override public void channelActive(ChannelHandlerContext ctx) { //直接跳转到下个handler请求 log.info("HeartBeatReqHandler channelActive..."); ctx.fireChannelActive(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { log.info("HeartBeatReqHandler channelRead 收到信息:{}", msg); String msgData = (String) msg; log.info("收到的字节码转成字符串:{}", msgData); if (LoginUtil.hasLogin(ctx.channel())) { //判断是否是心跳请求 if (msgData.contains(MessageType.ACKHEARTBEAT.getMsgType())) { log.info("收到心跳响应:{}", msgData); }else{ //没有登录 直接跳转到下个handler请求 ctx.fireChannelRead(msgData); } } else { //没有登录 直接跳转到下个handler请求 ctx.fireChannelRead(msgData); } } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) { } /** * 退出登录 */ @Override public void channelInactive(ChannelHandlerContext ctx) { UserChannelUtil.unBindUser(ctx.channel()); LoginUtil.logoOut(ctx.channel()); System.out.println("客户端已关闭"); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object obj) { if (obj instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) obj; if (event.state() == IdleState.READER_IDLE) { log.info("客户端读超时"); } else if (event.state() == IdleState.WRITER_IDLE) { log.info("客户端写超时"); } else if (event.state() == IdleState.ALL_IDLE) { log.info("客户端所有操作超时"); } NettyMsg nettyMsg = new NettyMsg(null, null, getHeartBeatReqMsg()); nettyMsg.getMsgHead().setMsgType((byte) MessageType.REQHEARTBEAT.getCode()); log.info("发送心跳请求:{}", nettyMsg); ctx.writeAndFlush(nettyMsg); } } /** * 构建心跳请求信息 * @return */ private String getHeartBeatReqMsg() { int reqId = AlarmNettyClient.REQID.incrementAndGet(); String reqHeartBeatMsg = StrUtil.concat( Boolean.TRUE, NettyConstants.REQ_HEART_BEAT, "reqId=", reqId + ""); return reqHeartBeatMsg; } }

客户端回话处理器:

@Slf4j public class ChatReqHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { log.info("ChatReqHandler channelActive,IM系统准备就绪,请发送消息: "); new Thread(() -> { while (true) { //读取用户在命令行输入的各种数据类型 Terminal控制台输入数据 Scanner sc = new Scanner(System.in); //此扫描器执行当前行,并返回跳过的输入信息 String line = sc.nextLine(); log.info("控制台输入:{}", line); NettyMsg nettyMsg = new NettyMsg( MessageType.REQSYNCALARMMSG.getCode(), NettyMsg.getCurrentTimeStamp(), line ); nettyMsg.getMsgHead().setMsgType((byte) MessageType.REQSYNCALARMMSG.getCode()); log.info("发送消息:{}", nettyMsg); //发送数据并刷新 ctx.writeAndFlush(nettyMsg); } }).start(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // NettyMsg nettyMsg = NettyMsg.nettyMsgParse(msg.toString()); log.info("ChatReqHandler channelRead 收到信息:{}", msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }

客户端常量

public class NettyConstants { public static int LOCAL_PORT = 53535; public static String LOCAL_IP = "127.0.0.1"; public static final String ACCOUNT="123456"; public static final String PASSWORD="123456"; /** * 登录请求响应消息名 */ public static final String ACK_LOGIN_ALARM = "ackLoginAlarm;"; /** * 登录响应-成功 */ public static final String ACK_LOGIN_ALARM_RESULT_OK = StrUtil.concat( Boolean.TRUE, ACK_LOGIN_ALARM, "result=succ;resDesc=null" ); /** * 登录响应-失败 */ public static final String ACK_LOGIN_ALARM_RESULT_FAIL = StrUtil.concat( Boolean.TRUE, ACK_LOGIN_ALARM, "result=fail;resDesc=username-or-key-error" ); /** * 心跳响应消息名 */ public static final String ACK_HEART_BEAT = "ackHeartBeat;"; /** * 登录请求消息名 */ public static final String REQ_LOGIN_ALARM = "reqLoginAlarm;"; /** * 心跳请求消息名 */ public static final String REQ_HEART_BEAT = "reqHeartBeat;"; /** * 正确的登录验证字符串 */ public static final String ALARM_LOGIN_REQ_SUCCESS = StrUtil.concat( Boolean.TRUE, NettyConstants.REQ_LOGIN_ALARM, "user=", ACCOUNT, ";key=", PASSWORD, ";type=msg" ); }

 客户端登录状态工具类

** * 用于设置以及判断是否有标志位,如果有标志位,不管标志为值是什么,都表示已经成功登录过。 */ public class LoginUtil { /** * 给通道设置LOGIN的值 登录了为true * * @param channel 通道 */ public static void markAsLogin(Channel channel) { channel.attr(Attributes.LOGIN).set(true); } /** * 查询LOGIN的值 登录了为true * * @param channel 通道 * @return 状态 */ public static boolean hasLogin(Channel channel) { Attribute loginAttr = channel.attr(Attributes.LOGIN); return loginAttr.get() != null ; } /** * 用户退出登录状态 * @param channel 通道 */ public static void logoOut(Channel channel) { channel.attr(Attributes.LOGIN).set(null); } }

 消息类型

NoArgsConstructor @AllArgsConstructor @Getter public enum MessageType { REALTIMEALARM("realTimeAlarm", 0), REQLOGINALARM("reqLoginAlarm", 1), ACKLOGINALARM("ackLoginAlarm", 2), REQSYNCALARMMSg("reqSyncAlarmMsg", 3), ACKSYNCALARMMSg("ackSyncAlarmMsg", 4), REQHEARTBEAT("reqHeartBeat", 8), ACKHEARTBEAT("ackHeartBeat", 9), CLOSECONNALARM("closeConnAlarm", 10); private String msgType; private int code; }

消息实体 

@Slf4j @AllArgsConstructor @Data public class NettyMsg { private Integer msgTypeCode; private Long timeStamp; private String msg; private NettyMsgHead msgHead; public NettyMsg(Integer msgTypeCode, Long timeStamp, String msg) { this.msgTypeCode = msgTypeCode; this.timeStamp = timeStamp; this.msg = msg; this.msgHead=new NettyMsgHead(); } public static long getCurrentTimeStamp() { return DateUtil.date().getTime() / 1000; } public static String buildNettyMsg(NettyMsg nettyMsg) { return StrUtil.concat(Boolean.TRUE, nettyMsg.getMsgTypeCode() == null ? "" : nettyMsg.getMsgTypeCode() + ",", nettyMsg.getTimeStamp() == null ? "" : nettyMsg.getTimeStamp() + ",", nettyMsg.getMsg() ); } }

消息头,所有消息都会添加请求头,在处理中通过消息类型进行消息处理

@Data public class NettyMsgHead { private short startSign = (short) 0xFFFF; private byte msgType; private int timeStamp; public NettyMsgHead() { this.timeStamp = (int) NettyMsg.getCurrentTimeStamp(); } }

sever端与springboot 整合启动类实现CommandLineRunner 接口复写run方法

@SpringBootApplication public class EvercmBusinessApp implements CommandLineRunner { @Value("${netty.socket.port}") private Integer port; @Autowired private NettyServer nettyServer; public static void main(String[] args) { SpringApplication.run(EvercmBusinessApp.class, args); } @Override public void run(String... args) { this.nettyServer.start(this.port); } }

在配置文件中定义端口

netty.socket.port=10809



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3