TCP粘包

您所在的位置:网站首页 netty解决tcp粘包和拆包 TCP粘包

TCP粘包

2024-01-27 02:40| 来源: 网络整理| 查看: 265

原文地址:TCP粘包|拆包和解决方案 – 编程屋

1 产生原因

TCP是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务端)都要有一一成对的socket,因此,发送端为了将多个发给接收端的包,更有效的发给对方,使用了优化算法(Nagle算法),将许多数据量小且间隔小的数据,合并成了一个大数据块,然后进行封包,这样做虽然提升了效率,但是接收端就难于分辨出完成的数据包,因此面向流的通信是无消息保护边界的。

因为TCP无消息保护边界,需要在接收端处理消息边界问题,也就是所说的粘包和拆包问题。如下:

2 演示代码(TCP产生粘包|拆包)

客户端向服务端发送10条消息,看服务端如何接受。

服务端Server:

package com.liubujun.tcp; import com.liubujun.netty.NettyServerHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * @Author: liubujun * @Date: 2023/2/26 13:35 */ public class MyServer { public static void main(String[] args) throws Exception { //1.创建2个线程组bossGroup和workerGroup //2 bossGroup只是处理连接请求,workerGroup真正的和客户端进行业务处理 //3 两个都是无限循环 NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { //创建服务器端的启动对象,配置参数 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup,workerGroup)//设置两个线程组 .channel(NioServerSocketChannel.class) //使用nioSocketChannel作为服务器的通道实现 .childHandler(new MyServerInitializer());//给workerGroup的EventLoop对应的管道设置处理器 //绑定一个端口并且同步,生成了一个ChannelFuture对象 //启动服务器(并绑定端口) ChannelFuture cf = bootstrap.bind(7000).sync(); //对关联通道进行监听 cf.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }

服务端Handle:

package com.liubujun.tcp; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import java.nio.charset.Charset; import java.util.UUID; /** * @Author: liubujun * @Date: 2023/2/26 13:50 */ public class MyServerHandle extends SimpleChannelInboundHandler { private int count; @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { byte[] buffer = new byte[msg.readableBytes()]; msg.readBytes(buffer); //将buffer转成字符串 String message = new String(buffer, Charset.forName("utf-8")); System.out.println("服务器端接收到数据"+message); System.out.println("服务器接收消息量="+(++this.count)); //服务器回送数据给客户端,回送一个随机id ByteBuf responseByteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString(), Charset.forName("utf-8")); ctx.writeAndFlush(responseByteBuf); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }

服务端Initializer:

package com.liubujun.tcp; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; /** * @Author: liubujun * @Date: 2023/2/26 13:49 */ public class MyServerInitializer extends ChannelInitializer { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new MyServerHandle()); } }

客户端:

package com.liubujun.tcp; import com.liubujun.netty.NettyClientHandler; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; /** * @Author: liubujun * @Date: 2023/2/26 13:34 */ public class MyClient { public static void main(String[] args) throws Exception{ //客户端需要一个循环组 EventLoopGroup group = new NioEventLoopGroup(); try { //创建客户端的启动对象 //注意客户端使用的是Bootstrap不是ServerBootstrap Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) //设置线程组 .channel(NioSocketChannel.class) //设置客户端通道的实现类 .handler(new MyClientInitializer()); System.out.println("客户端 ok ..."); //启动客户端连接服务器 ChannelFuture涉及到netty的异步模型 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 7000).sync(); //给关闭通道进行监听 channelFuture.channel().closeFuture().sync(); }finally { group.shutdownGracefully(); } } }

客户端Handle:

package com.liubujun.tcp; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import java.nio.charset.Charset; /** * @Author: liubujun * @Date: 2023/2/26 13:40 */ public class MyClientHandle extends SimpleChannelInboundHandler { private int count; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //使用客户端发送十条数据hello,server for (int i = 0; i < 10; ++i) { ByteBuf buffer = Unpooled.copiedBuffer("hello,server" + i, Charset.forName("utf-8")); ctx.writeAndFlush(buffer); } } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { byte[] buffer = new byte[msg.readableBytes()]; msg.readBytes(buffer); String message = new String(buffer, Charset.forName("utf-8")); System.out.println("客户端接收消息="+message); System.out.println("客户端接收消息数量="+(++this.count)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }

客户端Initializer:

package com.liubujun.tcp; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; /** * @Author: liubujun * @Date: 2023/2/26 13:37 */ public class MyClientInitializer extends ChannelInitializer { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new MyClientHandle()); } }

启动服务端和客户端:

服务端控制台:一次全部接受

客户端控制台:只接受一次

发现进行了粘包

再次启动一个客户端:

服务端控制台:

客户端控制台:一次接受

发现进行了拆包。

3 TCP粘包|拆包解决方案

使用自定义协议+编码器解决

关键是解决服务器端每次读取数据长度的问题,这个问题解决,就不会出现服务器多读或少读数据的问题,从而避免TCP的拆包和粘包。

演示案例:

客户端发送5个Message对象,客户端每次发送一个message对象,服务器端就会每次接受一个message,分5次进行解码,每次读取到一个message,会回复一个message对象给客户端。

数据包:

//协议包 public class MessageProtocol { private int len; private byte[] content; public int getLen() { return len; } public void setLen(int len) { this.len = len; } public byte[] getContent() { return content; } public void setContent(byte[] content) { this.content = content; } }

客户端代码:

public class MyClient { public static void main(String[] args) throws Exception{ //客户端需要一个循环组 EventLoopGroup group = new NioEventLoopGroup(); try { //创建客户端的启动对象 //注意客户端使用的是Bootstrap不是ServerBootstrap Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) //设置线程组 .channel(NioSocketChannel.class) //设置客户端通道的实现类 .handler(new MyClientInitializer()); System.out.println("客户端 ok ..."); //启动客户端连接服务器 ChannelFuture涉及到netty的异步模型 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 7000).sync(); //给关闭通道进行监听 channelFuture.channel().closeFuture().sync(); }finally { group.shutdownGracefully(); } } }

客户端处理器:

public class MyClientHandle extends SimpleChannelInboundHandler { private int count; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //使用客户端发送十条数据今天天气冷,来吃火锅 for (int i = 0; i < 5; ++i) { String msg = "今天天气冷,来吃火锅"; byte[] content = msg.getBytes(Charset.forName("utf-8")); int length = content.length; MessageProtocol messageProtocol = new MessageProtocol(); messageProtocol.setLen(length); messageProtocol.setContent(content); ctx.writeAndFlush(messageProtocol); } } @Override protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception { int len = msg.getLen(); byte[] content = msg.getContent(); System.out.println("客户端接收到消息如下"); System.out.println("长度:"+len); System.out.println("内容:"+new String(content,Charset.forName("utf-8"))); System.out.println("客户端接收消息数量="+(++this.count)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("异常消息:"+cause.getMessage()); ctx.close(); } }

客户端初始化:

public class MyClientInitializer extends ChannelInitializer { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new MyMessageEncoder()); //加入编码器 pipeline.addLast(new MyClientHandle()); pipeline.addLast(new MyMessageDecoder()); } }

编码器:

public class MyMessageEncoder extends MessageToByteEncoder { @Override protected void encode(ChannelHandlerContext channelHandlerContext, MessageProtocol msg, ByteBuf out) throws Exception { System.out.println("MyMessageEncoder encoder 方法被调用"); out.writeInt(msg.getLen()); out.writeBytes(msg.getContent()); } }

解码器:

public class MyMessageDecoder extends ReplayingDecoder { @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List list) throws Exception { System.out.println("MyMessageDecoder decoder 被调用"); //需要将得到的二进制字节码->MessageProtocol 数据包(对象) int length = in.readInt(); byte[] content = new byte[length]; in.readBytes(content); //封装成MessageProtocol 对象,放入out,传递下一个handle业务处理 MessageProtocol messageProtocol = new MessageProtocol(); messageProtocol.setLen(length); messageProtocol.setContent(content); list.add(messageProtocol); } }

服务端:

public class MyServer { public static void main(String[] args) throws Exception { //1.创建2个线程组bossGroup和workerGroup //2 bossGroup只是处理连接请求,workerGroup真正的和客户端进行业务处理 //3 两个都是无限循环 NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { //创建服务器端的启动对象,配置参数 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup,workerGroup)//设置两个线程组 .channel(NioServerSocketChannel.class) //使用nioSocketChannel作为服务器的通道实现 .childHandler(new MyServerInitializer());//给workerGroup的EventLoop对应的管道设置处理器 //绑定一个端口并且同步,生成了一个ChannelFuture对象 //启动服务器(并绑定端口) ChannelFuture cf = bootstrap.bind(7000).sync(); //对关联通道进行监听 cf.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }

以上只是部分内容,为了便于维护,本文已迁移到新地址:TCP粘包|拆包和解决方案 – 编程屋



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3