Android Netty的使用

您所在的位置:网站首页 通信框架netty Android Netty的使用

Android Netty的使用

2023-09-14 04:30| 来源: 网络整理| 查看: 265

前言

很久没更新博客了,主要是 最近发生的事情太多了。开始学习~ @TOC

简要

Netty是业界最流行的NIO框架之一,它的健壮性、功能、性能、可定制性和可扩展性在同类框架中都是首屈一指的,它已经得到成百上千的商用项目验证,例如Hadoop的RPC框架avro使用Netty作为底层通信框架;很多其他业界主流的RPC框架,也使用Netty来构建高性能的异步通信能力。通过对Netty的分析,我们将它的优点总结如下。

API使用简单,开发门槛低; 功能强大,预置了多种编解码功能,支持多种主流协议; 定制能力强,可以通过ChannelHandler对通信框架进行灵活地扩展; 性能高,通过与其他业界主流的NIO框架对比, Netty的综合性能最优 成熟、稳定, Netty修复了已经发现的所有JDK NIO BUG,业务开发人员不需要再为NIO的BUG而烦恼; 社区活跃,版本迭代周期短,发现的BUG可以被及时修复,同时,更多的新功能会加入 经历了大规模的商业应用考验,质量得到验证。在互联网、大数据、网络游戏、企业应用、电信软件等众多行业得到成功商用,证明了它已经完全能够满足不同行业的商业应用了。 正是因为这些优点, Netty逐渐成为Java NIO编程的首选框架。 解码器介绍(需要了解的) 粘包/拆包

TCP是个“流”协议,所谓流,就是没有界限的一串数据。大家可以想想河里的流水,是连成一片的,其间并没有分界线。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。

LineBasedFrameDecoder(行解码器)

LineBasedFrameDecoder的工作原理是它依次遍历ByteBuf中的可读字节,判断看是否有"\n"或者"\r\n",如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。它是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度。如果连续读取到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。

DelimiterBasedFrameDecoder(特殊分隔符解码器)

DelimiterBasedFrameDecoder用于对使用分隔符结尾的消息进行自动解码

FixedLengthFrameDecoder(固定长度解码器)

FixedLengthFrameDecoder用于对固定长度的消息进行自动解码。

StringDecoder(字符串解码器)

StringDecoder的功能非常简单,就是将接收到的对象转换成字符串,然后继续调用后面的handlero LineBasedFrameDecoder + StringDecoder组合就是按行切换的文本解码器,它被设计用来支持TCP的粘包和拆包。

备注

有了上述解码器,再结合其他的解码器,如字符串解码器等,可以轻松地完成对很多消息的自动解码,而且不再需要考虑TCP粘包/拆包导致的读半包问题,极大地提升了开发效率。应用DelimiterBasedFrameDecoder和FixedLengthFrameDecoder进行开发非常简单,在绝大数情况下,只要将DelimiterBasedFrameDecoder或FixedLengthFrameDecoder添加到对应ChannelPipeline的起始位即可。

使用步骤(代码中集合了三种案例) 服务端 EchoServer1.class 准备工作类

jar 包:netty-all-5.0.0.Alpha2.jar

/** * Netty 准备工作类,负责初始化,及一些参数配置 */ public class EchoServer1 { public void bind(int port) throws Exception{ //配置服务端的Nio线程组 //两个线程组,专门用于网络事件的处理 实际上是reactor线程组 //这里创建两个。一个用户服务端接受客户端的连接,一个用于进行SocketChannel的网络赌侠 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); System.out.println(" bind:"); try { //辅助启动类 ServerBootstrap b = new ServerBootstrap(); //传入进来 b.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) //将bloglog 设置1024 //① 特殊分隔符 解码器 .option(ChannelOption.SO_BACKLOG,1024) //② 固定长度解码器 // .option(ChannelOption.SO_BACKLOG,100) .handler(new LoggingHandler(LogLevel.INFO)) //处理网络的IO事件 .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { // ① 特殊分隔符 解码器 // ByteBuf delimiter= Unpooled.copiedBuffer("$_".getBytes("UTF-8")); // ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));//标识符解码器 //② 固定长度解码器 // ch.pipeline().addLast(new FixedLengthFrameDecoder(20)); //③ 行 解码器 ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new EchoServerHandler());//处理 } }); //绑定端口。同步等待成功 //同步阻塞方法 ChannelFuture f=b.bind(port).sync(); //等待服务端监听端口关闭 /主要用于异步操作的回调通知 , f.channel().closeFuture().sync(); }finally { System.out.println(" 退出:"); //退出 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } EchoServerHandler.class 处理发送 接收消息 /** * EchoServerHandler 负责 处理发送 接收消息 */ public class EchoServerHandler extends ChannelHandlerAdapter { int counter = 0; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //接受也在这里 String body = (String) msg; // 前面已经对消息进行了自动解码。 System.out.println("This is " + ++counter + "tiems recive client:" + body); //① 特殊符号 解码器 // body += "$_"; // String sendClientMsg = "服務端過來得消息" + "$_"; //② 固定长度 // String sendClientMsg = "a b c d e f g h i j k i t" + ""; //③ 行 解码器 String line = "ASSSSSSS" + System.getProperty("line.separator");//换行 //这里是对数据的处理 ByteBuf echo = Unpooled.copiedBuffer(line.getBytes()); //这个是返回到 原始状态,把消息返还给 客户端。 ctx.writeAndFlush(echo); System.out.println("发送给客户端:"); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); cause.printStackTrace(); ctx.close(); } } 启动 public class Main { /** * 开启 * @param args */ public static void main(String[] args) { int port=8888; try { new EchoServer1().bind(port); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } 客户端 EchoClient 客户端准备工作类 implementation 'io.netty:netty-all:5.0.0.Alpha2' /** * 客户端准备工作 */ class EchoClient { fun connet(port: Int, host: String) { var group: EventLoopGroup = NioEventLoopGroup() try { var b: Bootstrap = Bootstrap() b.group(group).channel(NioSocketChannel::class.java) .option(ChannelOption.TCP_NODELAY, true) .handler(LoggingHandler(LogLevel.INFO)) .handler(object : ChannelInitializer() { override fun initChannel(ch: SocketChannel?) { //①特殊标识 解码器 // var delimiter=Unpooled.copiedBuffer(("$"+"_").toByteArray()) // ch!!.pipeline().addLast(DelimiterBasedFrameDecoder(1024,delimiter)) //②长度 解码器 // ch!!.pipeline().addLast(FixedLengthFrameDecoder(20)) //③ 换行解码器 ch!!.pipeline().addLast(LineBasedFrameDecoder(1024)) ch!!.pipeline().addLast(StringDecoder(Charsets.UTF_8)) ch!!.pipeline().addLast(EchoClientHandler()) } }) //发起异步连接操作 var f: ChannelFuture = b.connect(host, port).sync() //等待客户端链路关闭 f.channel().closeFuture().sync() } finally { //优雅关闭线程组 group.shutdownGracefully(); } } EchoClientHandler 客户端接受发送类 /** * 客户端接受发送类 */ class EchoClientHandler : ChannelHandlerAdapter() { var counter: Int = 0 //① 固定长度 ② 特殊符号 // val ECHO_REQ = " 你好 , 我是客户端。啊$" + "_" //③ 行 解析 val ECHO_REQ = ("你好 ,我是客户端" + System.getProperty("line.separator")).toByteArray() override fun channelActive(ctx: ChannelHandlerContext?) { var msg: ByteBuf println("channelActive") for (i in 1..100) { //发送服务端 //① 固定长度 ② 特殊符号 // ctx!!.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.toByteArray())) // ③ 行 解析 ctx!!.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ)); } } override fun channelRead(ctx: ChannelHandlerContext?, msg: Any?) { counter++ //读取服务端消息 println("This is " + "$counter" + "times receive server:$msg") } override fun channelReadComplete(ctx: ChannelHandlerContext?) { println("channelReadComplete") //读取结束 ctx!!.flush() } override fun exceptionCaught(ctx: ChannelHandlerContext?, cause: Throwable?) { cause!!.printStackTrace() ctx!!.close() } } //启动 startBtn.setOnClickListener { var port =8888 EchoClient().connet(port,"192.168.1.28") } 结尾 具体注释都在代码里 ①②③ 分别注释即可。 参考《Netty权威指南》 服务端采用JAVA 客户端 采用kotlin 后续增加 进阶篇


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3