netty(三)LengthFieldBasedFrameDecoder解码器的使用例子

您所在的位置:网站首页 netty的应用场景对接 netty(三)LengthFieldBasedFrameDecoder解码器的使用例子

netty(三)LengthFieldBasedFrameDecoder解码器的使用例子

2024-07-11 07:42| 来源: 网络整理| 查看: 265

1.需求 :发送数据接收数据个格式

/** * 发送数据格式 接收数据的格式 * +---2----+--2--+---4--+--4--+----512---+---512-----+-----+-------+-------+-------+---------+ * type1 type2 len1 len2 data1[] data2[] | type1 type2 len1 len2 data1[] data2[] * short short int int byte[] byte[] | type1 type2 len1 len2 data1 data2 *+-------+---------+-----------+---------+----------+------------+-------------------------+ */

由于这个·数据解析格式跟官方给出的都不一样,所以只能查看LengthFieldBasedFrameDecoder源码来看看到底应该解析这个数据

最后构造器中几个参数取值如下:

lengthFieldOffset=8:开始的4个字节是meta,然后才是长度域为4个字节,所以长度域偏移为8。lengthFieldLength=4:长度域4个字节。lengthAdjustment=0:长度域指定的长度位数据长度,所以数据长度不需要修正。initialBytesToStrip=0:发送和接收数据相同,不需要跳过数据

重新继承LengthFieldBasedFrameDecoder的解码器代码如下:

import io.netty.buffer.ByteBuf; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import java.nio.ByteOrder; public class CustomizeDecoder extends LengthFieldBasedFrameDecoder { public CustomizeDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) { super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast); } @Override protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) { buf = buf.order(order); long frameLength; frameLength = buf.getUnsignedInt(offset) + buf.getUnsignedInt(offset + 4); return frameLength; } /** * 发送数据格式 接收数据的格式 * +---2----+--2--+---4--+--4--+----512---+---512-----+-----+-------+-------+-------+---------+ * type1 type2 len1 len2 data1[] data2[] | type1 type2 len1 len2 data1[] data2[] * short short int int byte[] byte[] | type1 type2 len1 len2 data1 data2 *+-------+---------+-----------+---------+----------+------------+-------------------------+ */ }

    通过看博客发现很多人都是@Override decode()方法,其实这样是不太合适的,这样会一步就把消息解析成我们想要的,

而不是通过ByteBuf来在channel通道中传递,不便于我们后面Handler对消息进行处理,正确的应该是@Override 重写

getUnadjustedFrameLength()方法 ,然后在自己定义Handler(以ByteBuf来在channel通道中传递好处理)里面对消息进行逻辑处理

服务器端代码如下:

public class SimpleChatServer { private final int port; //定义服务器端监听的端口 public SimpleChatServer(int port){ this.port = port; } public void start() throws Exception{ EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); try { serverBootstrap.group(boss, worker) .channel(NioServerSocketChannel.class)//指定使用一个NIO传输Channel .option(ChannelOption.SO_BACKLOG, 128) .childHandler(new SimpleChatServerInitializer()); //异步的绑定服务器,sync()一直等到绑定完成. ChannelFuture future = serverBootstrap.bind(this.port).sync(); System.out.println(SimpleChatServerHandler.class.getName()+" started and listen on '"+ future.channel().localAddress()); future.channel().closeFuture().sync();//获得这个channel的CloseFuture,阻塞当前线程直到关闭操作完成 } finally { boss.shutdownGracefully().sync(); worker.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { new SimpleChatServer(8000).start(); } } public class SimpleChatServerInitializer extends ChannelInitializer { private static final int MAX_FRAME_LENGTH = 1024 * 1024; private static final int LENGTH_FIELD_LENGTH = 8; private static final int LENGTH_FIELD_OFFSET = 4; private static final int LENGTH_ADJUSTMENT = 0; private static final int INITIAL_BYTES_TO_STRIP = 0; @Override protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new CustomizeDecoder(MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH, LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP, true)); pipeline.addLast(new SimpleChatServerHandler()); } } public class CustomizeDecoder extends LengthFieldBasedFrameDecoder { public CustomizeDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) { super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast); } @Override protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) { buf = buf.order(order); long frameLength; frameLength = buf.getUnsignedInt(offset) + buf.getUnsignedInt(offset + 4); return frameLength; } /** * 发送数据格式 接收数据的格式 * +---2----+--2--+---4--+--4--+----512---+---512-----+-----+-------+-------+-------+---------+ * type1 type2 len1 len2 data1[] data2[] | type1 type2 len1 len2 data1[] data2[] * short short int int byte[] byte[] | type1 type2 len1 len2 data1 data2 *+-------+---------+-----------+---------+----------+------------+-------------------------+ */ } public class SimpleChatServerHandler extends SimpleChannelInboundHandler { private static AtomicLong count = new AtomicLong(0); private static long start = System.currentTimeMillis(); @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace();//打印异常的堆栈跟踪信息 ctx.close();//关闭这个channel } @Override protected void messageReceived(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception { if (!(msg instanceof ByteBuf)) return; ByteBuf byteBuf = (ByteBuf) msg; short type1 = byteBuf.readShort(); short type2 = byteBuf.readShort(); int len1 = byteBuf.readInt(); int len2 = byteBuf.readInt(); byte[] data1 = new byte[len1]; byte[] data2 = new byte[len2]; byteBuf.readBytes(data1); byteBuf.readBytes(data2); CustomMsg customMsg = new CustomMsg(type1, type2, len1, len2, data1, data2); if ((count.incrementAndGet() % 1000000) == 0) { System.out.println((1000000 / (System.currentTimeMillis() - start) * 1000)); start = System.currentTimeMillis(); } //System.out.println("total news is "+(counter++)+ " " +customMsg.toString()); } } public class CustomMsg { short type1; short type2; int len1; int len2; byte[] data1 ; byte[] data2 ; public CustomMsg(short type1, short type2, int len1, int len2, byte[] data1, byte[] data2) { this.type1 = type1; this.type2 = type2; this.len1 = len1; this.len2 = len2; this.data1 = data1; this.data2 = data2; } public void setType1(short type1) { this.type1 = type1; } public void setType2(short type2) { this.type2 = type2; } public void setLen1(int len1) { this.len1 = len1; } public void setLen2(int len2) { this.len2 = len2; } public void setData1(byte[] data1) { this.data1 = data1; } public void setData2(byte[] data2) { this.data2 = data2; } public short getType1() { return type1; } public short getType2() { return type2; } public int getLen1() { return len1; } public int getLen2() { return len2; } public byte[] getData1() { return data1; } public byte[] getData2() { return data2; } @Override public String toString() { return "CustomMsg{" + "type1=" + type1 + ", type2=" + type2 + ", len1=" + len1 + ", len2=" + len2 + ", data1=" + Arrays.toString(data1) + ", data2=" + Arrays.toString(data2) + '}'; } }

由于只需要客服端给服务器端发送消息,我这儿采取nio的方式来写的客服端:

使用多线程来模拟发送数据给服务器端,服务器端接收消息,测量qps

public class Client { private static final int HEAD_LEN = 2 + 2 + 4 + 4;//short + short + int + int private static final int BODY_LEN = 1024; private static String serverIp; private static int serverPort; public static void main(String[] args) { System.out.println(Arrays.asList(args)); //TODO serverIp = args[0]; serverPort = Integer.parseInt(args[1]); int sendThreadNum = Integer.parseInt(args[2]); for (int i = 0; i < sendThreadNum; i++) { Thread thread = new Thread(new SendThread(), "SendThread_" + i); thread.setDaemon(false); thread.start(); } } public static class SendThread implements Runnable { @Override public void run() { while (true) { SocketChannel channel = null; ByteBuffer byteBuffer = ByteBuffer.allocate(HEAD_LEN + BODY_LEN); byte[] bytes = new byte[BODY_LEN]; for (int i = 0; i < BODY_LEN; i++) { bytes[i] = (byte) i; } byteBuffer.putShort((short) 1).putShort((short) 1).putInt(512).putInt(512).put(bytes); try { channel = SocketChannel.open(); channel.configureBlocking(true); if (channel.connect(new InetSocketAddress(serverIp, serverPort))) { while (true) { byteBuffer.flip();//必须flip channel.write(byteBuffer); } } TimeUnit.SECONDS.sleep(1); } catch (Exception e) { if (channel != null) { try { channel.close(); } catch (IOException e1) { e1.printStackTrace(); } } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e1) { e1.printStackTrace(); } e.printStackTrace(); } } } } }

 

完整代码见github:https://github.com/anglesun/netty-LengthFieldBasedFrameDecoder



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3