netty无缝切换rabbitmq、activemq、roc

您所在的位置:网站首页 rabbitmq心跳检测 netty无缝切换rabbitmq、activemq、roc

netty无缝切换rabbitmq、activemq、roc

#netty无缝切换rabbitmq、activemq、roc| 来源: 网络整理| 查看: 265

netty无缝切换rabbitmq、activemq、roc 发布时间:2020-07-27 08:43:20 来源:网络 阅读:409 作者:sq5d41a7a774d48 栏目:建站服务器

netty无缝切换rabbitmq、activemq、roc

netty无缝切换rabbitmq、activemq、roc

netty的pipeline处理链上的handler:需要IdleStateHandler心跳检测channel是否可以,以及处理登录认证的UserAuthHandler和消息处理MessageHandler

protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(defLoopGroup, //编码解码器 new HttpServerCodec(), //将多个消息转换成单一的消息对象 new HttpObjectAggregator(65536), //支持异步发送大的码流,一般用于发送文件流 new ChunkedWriteHandler(), //检测链路是否读空闲,配合心跳handler检测channel是否正常 new IdleStateHandler(60, 0, 0), //处理握手和认证 new UserAuthHandler(), //处理消息的发送 new MessageHandler() ); }

对于所有连进来的channel,我们需要保存起来,往后的群发消息需要依靠这些channel

public static void addChannel(Channel channel) { String remoteAddr = NettyUtil.parseChannelRemoteAddr(channel); System.out.println("addChannel:" + remoteAddr); if (!channel.isActive()) { logger.error("channel is not active, address: {}", remoteAddr); } UserInfo userInfo = new UserInfo(); userInfo.setAddr(remoteAddr); userInfo.setChannel(channel); userInfo.setTime(System.currentTimeMillis()); userInfos.put(channel, userInfo); }

登录后,channel就变成有效的channel,无效的channel之后将会丢弃

public static boolean saveUser(Channel channel, String nick, String password) { UserInfo userInfo = userInfos.get(channel); if (userInfo == null) { return false; } if (!channel.isActive()) { logger.error("channel is not active, address: {}, nick: {}", userInfo.getAddr(), nick); return false; } if (nick == null || password == null) { return false; } LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper(); lambdaQueryWrapper.eq(Account::getUsername, nick).eq(Account::getPassword, password); Account account = accountMapperStatic.selectOne(lambdaQueryWrapper); if (account == null) { return false; } // 增加一个认证用户 userCount.incrementAndGet(); userInfo.setNick(nick); userInfo.setAuth(true); userInfo.setId(account.getId()); userInfo.setUsername(account.getUsername()); userInfo.setGroupNumber(account.getGroupNumber()); userInfo.setTime(System.currentTimeMillis()); // 注册该用户推送消息的通道 offlineInfoTransmitStatic.registerPull(channel); return true; }

当channel关闭时,就不再接收消息。unregisterPull就是注销信息消费者,客户端不再接取聊天消息。此外,从下方有一个加写锁的操作,就是为了避免channel还在发送消息时,这边突然关闭channel,这样会导致报错。

public static void removeChannel(Channel channel) { try { logger.warn("channel will be remove, address is :{}", NettyUtil.parseChannelRemoteAddr(channel)); //加上读写锁保证移除channel时,避免channel关闭时,还有别的线程对其操作,造成错误 rwLock.writeLock().lock(); channel.close(); UserInfo userInfo = userInfos.get(channel); if (userInfo != null) { if (userInfo.isAuth()) { offlineInfoTransmitStatic.unregisterPull(channel); // 减去一个认证用户 userCount.decrementAndGet(); } userInfos.remove(channel); } } finally { rwLock.writeLock().unlock(); } }

为了无缝切换使用rabbitmq、rocketmq、activemq、不使用中间件存储和转发聊天消息这4种状态,定义如下4个接口。依次是发送单聊消息、群聊消息、客户端启动接收消息、客户端下线不接收消息。

public interface OfflineInfoTransmit { void pushP2P(Integer userId, String message); void pushGroup(String groupNumber, String message); void registerPull(Channel channel); void unregisterPull(Channel channel); }

其中,如何使用rabbitmq、rocketmq、activemq三种中间件中的一种来存储和转发聊天消息,它的处理流程如下:

单聊的模型参考线程池的模型,如果用户在线,直接通过channel发送给用户。如果用户离线,则发往中间件存储,下次用户上线时直接从中间件拉取消息。这样做对比所有消息的发送都通过中间件来转的好处是提升了性能 群聊则是完全通过中间件来转发消息,消息发送中间件,客户端从中间件接取消息。如果仍像单聊那样操作,在线用户直接通过channel发送,操作过于繁琐,要判断这个群组的哪些用户是否在线 如果用户在线就注册消费者,从中间件接取消息。否则,就断开消费者,消息保留在中间件中,以便客户端下次上线时拉取。这样就实现了离线消息的接收。 不管使用哪种中间件或使用不使用中间件,它的处理流程都遵循上面的3个要求,就能无缝切换上方的4种方法来存储和转发消息。需要哪种方法开启相应注解即可。

netty无缝切换rabbitmq、activemq、roc

代码地址:https://github.com/shuangyueliao/netty-chat

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:[email protected]进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

netty 聊天 ct 上一篇新闻:nginx 文件 上传 及 下载 完整版 下一篇新闻:面向服务开发的优点 香港云服务器 10000元红包免费领

红包可用于(云服务器、高防服务器、裸金属服务器、高防IP、云数据库、CDN加速)购买和续费

猜你喜欢 Linux操作系统的安装配置步骤 常用Linux镜像地址介绍 linux中怎么改文件的所有者 xshell连接linux的步骤 Linux系统中“/”和“~”的区别 linux中chmod命令的使用方法 Linux中shell的变量介绍 网站域名解析是什么 如何做才能避免在注册域名时出错 域名不续费会怎样


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3