【netty】java如何作为websocket客户端 对服务端发起请求

您所在的位置:网站首页 websocket连接400 【netty】java如何作为websocket客户端 对服务端发起请求

【netty】java如何作为websocket客户端 对服务端发起请求

#【netty】java如何作为websocket客户端 对服务端发起请求| 来源: 网络整理| 查看: 265

文章目录 前言代码

是的 本文介绍java如何作为客户端 发起websocket请求 博主不做标题党 不会服务端客户端分不清就写个标题 乱写文章

前言

为什么会使用java作为websocket客户端? 虽说websocket协议 本意是web与服务端之间的通讯协议,那假设有一天 我们的供应商 或者是甲方大爷 只提供了websocket接口呢? 如果直接让前端去对接,再把数据传给后端,那从前端对接到后端入库的步骤,万一出现数据丢失呢? 总之把数据处理放在后端,是相对可靠的, 我们可以借助netty来实现websocket客户端功能

长链接有两点值得注意的,一是心跳机制 二是重连机制 如果不发送心跳包,可能过会儿连接就断开了; 重新机制就比较好理解了,不管是服务端还是客户端的断开,作为客户端都需要能够重连

代码

博主对长链接并不是特别熟练,但是什么代码是能正式用的,什么代码上不了生产只是个写着玩的demo 还是一眼能分辨出来的,代码主要参考 [email protected]:yimiancheng/netty-study.git ,写的很不错,代码优化空间很少了, 线程池博主是推荐用new ThreadPoolExecutor()创建,避免OOM的问题,除此之外应该是准生产环境级别代码了。

maven依赖

io.netty netty-all 4.1.35.Final

websocket客户端handle类 主要处理接收的消息、事件等, 事件触发会进入userEventTriggered方法 建立连接会进入channelActive方法 接收消息会进入channelRead0方法 出现异常会进入exceptionCaught方法 断开连接会进入channelInactive方法(本文中未重写该方法, 在channelInactive方法中重连也是没问题的)

import com.qiuhuanhen.springroot.interfaces.websocket.websocketclient.SocketClient; import com.qiuhuanhen.springroot.interfaces.websocket.websocketclient.websocket.SendMsg; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.timeout.IdleStateEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * WebSocketClientFrameHandler * * @date 2019/8/20 16:42 */ public class WebSocketClientFrameHandler extends SimpleChannelInboundHandler { private static final Logger LOG = LoggerFactory.getLogger(WebSocketClientFrameHandler.class); private SocketClient socketClient; private ChannelPromise channelPromise; @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { LOG.info("客户端接收到事件 " + (evt.getClass()) + " | " + evt.toString()); if(WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE.equals(evt)) { LOG.info(ctx.channel().id().asShortText() + " 握手完成!"); socketClient.CHANNEL_IS_READY.set(true); channelPromise.setSuccess(); // SendMsg.send(ctx.channel(),"客户端握手完成消息 -》服务器时间: " + System.currentTimeMillis()); } else if(evt instanceof IdleStateEvent){ //ctx.channel().writeAndFlush(new PingWebSocketFrame()); IdleStateEvent evtIdle = (IdleStateEvent) evt; switch(evtIdle.state()) { case WRITER_IDLE: // SendMsg.send(ctx.channel(),"客户端 ping 消息 -》服务器时间: " + System.currentTimeMillis()); ctx.channel().writeAndFlush(new PingWebSocketFrame()); case READER_IDLE: // SendMsg.send(ctx.channel(),"客户端 ping 消息 -》服务器时间: " + System.currentTimeMillis()); ctx.channel().writeAndFlush(new PingWebSocketFrame()); default: break; } } super.userEventTriggered(ctx, evt); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); LOG.info("打开连接 handlerAdded SUCCESS. | name = " +channel.id().asShortText()); super.handlerAdded(ctx); channelPromise = ctx.newPromise(); } /** * Channel 已经被注册到了EventLoop */ @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); LOG.info("注册成功 channelRegistered SUCCESS. | name = " +channel.id().asShortText()); super.channelRegistered(ctx); } /** * Channel 处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); LOG.info("活动状态 channelActive SUCCESS. | name = " +channel.id().asShortText()); super.channelActive(ctx); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, WebSocketFrame webSocketFrame) throws Exception { if(webSocketFrame instanceof TextWebSocketFrame) { // String message = textWebSocketFrame.content().toString(Charset.forName("utf-8")); String message = ((TextWebSocketFrame) webSocketFrame).text(); LOG.info("客户端接收到消息:" + message); } else { LOG.info("接收到消息类型" + (webSocketFrame.getClass().getName())); } SendMsg.sendPong(channelHandlerContext.channel()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { LOG.error("消息处理失败: " + cause.getMessage(), cause); ctx.close(); } /** * Channel 已经被创建,但还未注册到EventLoop * 连接断开 */ @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); LOG.info("连接断开 channelUnregistered SUCCESS. | name = " +channel.id().asShortText()); super.channelUnregistered(ctx); channelPromise = null; } public SocketClient getSocketClient() { return socketClient; } public void setSocketClient(SocketClient socketClient) { this.socketClient = socketClient; } public ChannelPromise getChannelPromise() { return channelPromise; } }

心跳任务类

import com.qiuhuanhen.springroot.interfaces.websocket.websocketclient.SocketClient; import com.qiuhuanhen.springroot.interfaces.websocket.websocketclient.websocket.SendMsg; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.TimerTask; /** * HeartBeatTimerTask * * @date 2019/9/2 16:24 */ public class HeartBeatTimerTask extends TimerTask { private static final Logger LOG = LoggerFactory.getLogger(ReconnectTimerTask.class); private SocketClient socketClient; public HeartBeatTimerTask(SocketClient socketClient) { this.socketClient = socketClient; } @Override public void run() { if(socketClient != null && socketClient.isValid()) { SendMsg.send(socketClient.getChannel(), "客户端心跳消息 => " + System.currentTimeMillis()); } } }

重连任务类

import com.qiuhuanhen.springroot.interfaces.websocket.websocketclient.SocketClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.TimerTask; /** * ReconnectTimerTask * * @date 2019/9/2 16:03 */ public class ReconnectTimerTask extends TimerTask { private static final Logger LOG = LoggerFactory.getLogger(ReconnectTimerTask.class); private SocketClient socketClient; public ReconnectTimerTask(SocketClient socketClient) { this.socketClient = socketClient; } @Override public void run() { if(socketClient != null && !socketClient.isValid()) { LOG.info("=== 客户端重连 " + System.currentTimeMillis()); socketClient.connect(); } } }

自定义线程工厂类,主要是给线程重命名 便于维护、调试

import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; public class NamedThreadFactory implements ThreadFactory { static final AtomicInteger poolNumber = new AtomicInteger(1); final AtomicInteger threadNumber = new AtomicInteger(1); final ThreadGroup group; final String prefix; final boolean isDaemon; final int priority; public NamedThreadFactory() { this("pool"); } public NamedThreadFactory(String prefix) { this(prefix, false, Thread.NORM_PRIORITY); } public NamedThreadFactory(String prefix, boolean isDaemon, int priority) { SecurityManager sm = System.getSecurityManager(); this.group = (sm != null) ? sm.getThreadGroup() : Thread.currentThread().getThreadGroup(); this.prefix = prefix + "-" + poolNumber.getAndIncrement() + "-thread-"; this.isDaemon = isDaemon; this.priority = priority; } public Thread newThread(Runnable r) { Thread thread = new Thread(group, r, prefix + threadNumber.getAndIncrement(), 0); thread.setDaemon(isDaemon); thread.setPriority(priority); return thread; } }

消息监听类,主要用于发送消息(包括心跳包)是否成功

import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; class CustomerChannelFutureListener implements ChannelFutureListener { private static final Logger LOG = LoggerFactory.getLogger(CustomerChannelFutureListener.class); public void operationComplete(ChannelFuture channelFuture) throws Exception { // LOG.info(JSON.toJSONString(channelFuture)); if(channelFuture.isDone() && channelFuture.isSuccess()){ // LOG.info("send success."); } else { channelFuture.channel().close(); LOG.info("send error. cause = " + channelFuture.cause()); channelFuture.cause().printStackTrace(); } } }

简单封装的消息发送类

import com.alibaba.fastjson.JSON; import io.netty.channel.Channel; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import org.apache.commons.collections.MapUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * SendMsg * * @date 2019/8/30 18:02 */ public class SendMsg { private static final Logger LOG = LoggerFactory.getLogger(SendMsg.class); public static ConcurrentHashMap ALL_CHANNEL = new ConcurrentHashMap(); public static void startSendMsg() { Thread thread = new Thread(new Runnable() { public void run() { while(true) { sendMsgTest(); } } }); thread.start(); } public static void sendMsgTest() { try { Map map = Collections.unmodifiableMap(ALL_CHANNEL); LOG.info("map size = " + map.size()); if(MapUtils.isEmpty(map)) { Thread.sleep(2000); return; } for(Map.Entry entry : map.entrySet()) { LOG.info("------------- key = " + entry.getKey()); send(entry.getValue(), "服务端发送消息 " + entry.getKey() + " | " + System.currentTimeMillis()); } Thread.sleep(10000); } catch(Exception ex) { ex.printStackTrace(); } } public static void put(Channel channel) { ALL_CHANNEL.put(channel.id().asShortText(), channel); } public static void remove(Channel channel) { ALL_CHANNEL.remove(channel.id().asShortText()); } public static void send(Channel channel, Object msg) { final String textMsg = JSON.toJSONString(msg); if(channel != null && channel.isActive()) { TextWebSocketFrame frame = new TextWebSocketFrame(textMsg); channel.writeAndFlush(frame) .addListener(new CustomerChannelFutureListener()); } else { LOG.error("消息发送失败! textMsg = " + textMsg); } } public static void sendPing(Channel channel) { if(channel != null && channel.isActive()) { channel.writeAndFlush(new PingWebSocketFrame()) .addListener(new CustomerChannelFutureListener()); } else { LOG.error("消息发送失败! ping"); } } public static void sendPong(Channel channel) { if(channel != null && channel.isActive()) { channel.writeAndFlush(new PongWebSocketFrame()) .addListener(new CustomerChannelFutureListener()); } else { LOG.error("消息发送失败! pong"); } } }

channel初始化

import com.qiuhuanhen.springroot.interfaces.websocket.websocketclient.handler.WebSocketClientFrameHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler; import io.netty.handler.timeout.IdleStateHandler; /** * ClientChannelInitializer * * @date 2019/8/31 16:05 */ public class ClientChannelInitializer extends ChannelInitializer { private WebSocketClientHandshaker webSocketClientHandshaker; private WebSocketClientFrameHandler webSocketFrameHandler; public ClientChannelInitializer(WebSocketClientHandshaker webSocketClientHandshaker, WebSocketClientFrameHandler webSocketFrameHandler) { this.webSocketClientHandshaker = webSocketClientHandshaker; this.webSocketFrameHandler = webSocketFrameHandler; } @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new HttpClientCodec());//Http协议编码解码器 pipeline.addLast(new HttpObjectAggregator(65536));//聚合 HttpRequest pipeline.addLast(new IdleStateHandler(5, 10, 0)); //会处理ping pong close消息 pipeline.addLast(new WebSocketClientProtocolHandler(webSocketClientHandshaker,true)); pipeline.addLast(webSocketFrameHandler); } }

客户端主类 : 添加工作组 , 启动websocket client , 建立连接等

import com.qiuhuanhen.springroot.interfaces.websocket.websocketclient.handler.WebSocketClientFrameHandler; import com.qiuhuanhen.springroot.interfaces.websocket.websocketclient.task.HeartBeatTimerTask; import com.qiuhuanhen.springroot.interfaces.websocket.websocketclient.task.ReconnectTimerTask; import com.qiuhuanhen.springroot.interfaces.websocket.websocketclient.thread.NamedThreadFactory; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; import io.netty.handler.codec.http.websocketx.WebSocketVersion; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.URI; import java.net.URISyntaxException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** * SocketClient * * @date 2019/8/20 17:53 */ public class SocketClient { private static final Logger LOG = LoggerFactory.getLogger(SocketClient.class); public static String CLIENT_VERSION = "client_version"; private static final int DEFAULT_PORT = 80; /** * 长链重连间隔时间,单位s */ public static long RECONNECT_INTERVAL = 10; /** * 长链心跳时间,单位s */ public static long FETCH_PERIOD = 30; public static String host = "127.0.0.1"; public static int port = 8585; private Channel channel = null; private NioEventLoopGroup nioEventLoopGroup; public AtomicBoolean CHANNEL_IS_READY = new AtomicBoolean(false); private ScheduledExecutorService RECONNECT_TIMER; private ScheduledExecutorService HEARTBEAT_TIMER; static final String URL = System.getProperty("url", "ws://127.0.0.1:8585/boot/imserver/1111"); URI uri = new URI(URL); static { // RECONNECT_TIMER = Executors.newSingleThreadScheduledExecutor(); } public SocketClient(URI uri) throws URISyntaxException { this.uri = uri; } private void start() { Bootstrap boot = new Bootstrap(); nioEventLoopGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors()); try { HttpHeaders httpHeaders = new DefaultHttpHeaders(); httpHeaders.add(CLIENT_VERSION, 1); WebSocketClientHandshaker webSocketClientHandshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, false, httpHeaders); boot.group(nioEventLoopGroup) .option(ChannelOption.TCP_NODELAY, true) .channel(NioSocketChannel.class); WebSocketClientFrameHandler webSocketFrameHandler = new WebSocketClientFrameHandler(); webSocketFrameHandler.setSocketClient(this); ClientChannelInitializer clientChannelInitializer = new ClientChannelInitializer(webSocketClientHandshaker, webSocketFrameHandler); boot.handler(new LoggingHandler(LogLevel.INFO)); boot.handler(clientChannelInitializer); port = (uri.getPort() == -1) ? DEFAULT_PORT : uri.getPort(); host = uri.getHost(); channel = boot.connect(host, port).sync().channel(); LOG.info("SocketClient has started. CHANNEL_IS_READY = " + CHANNEL_IS_READY.get()); webSocketFrameHandler.getChannelPromise().sync(); LOG.info("SocketClient has started full. CHANNEL_IS_READY = " + CHANNEL_IS_READY.get()); } catch(Exception ex) { ex.printStackTrace(); LOG.error("connect error. uri " + uri.toString()); } finally { // nioEventLoopGroup.shutdownGracefully(); } } /** * 客户端连接服务端 */ public void connect() { stop(); start(); startReconnect(); doHeartBeat(); } /** * 开启线程-断开重连 */ public void startReconnect() { RECONNECT_TIMER = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("reconnect-schedule-pool", Boolean.TRUE, Thread.NORM_PRIORITY)); // https://www.jianshu.com/p/502f9952c09b RECONNECT_TIMER.scheduleAtFixedRate(new ReconnectTimerTask(this), RECONNECT_INTERVAL * 1000L, RECONNECT_INTERVAL * 1000L, TimeUnit.MILLISECONDS); } /** * 心跳 */ private void doHeartBeat() { HEARTBEAT_TIMER = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("heartbeat-schedule-pool", Boolean.TRUE, Thread.NORM_PRIORITY)); // https://www.jianshu.com/p/502f9952c09b HEARTBEAT_TIMER.scheduleAtFixedRate(new HeartBeatTimerTask(this), FETCH_PERIOD * 1000L, FETCH_PERIOD * 1000L, TimeUnit.MILLISECONDS); } /** * 客户端停止 */ public void stop() { try { if(nioEventLoopGroup != null) { nioEventLoopGroup.shutdownGracefully(); } if(channel != null) { channel.close(); } if(RECONNECT_TIMER != null) { RECONNECT_TIMER.shutdown(); RECONNECT_TIMER = null; } if(HEARTBEAT_TIMER != null) { HEARTBEAT_TIMER.shutdown(); HEARTBEAT_TIMER = null; } } catch(Exception ex) { //do nothing. } } public boolean isValid() { if (channel != null && channel.isActive()) { return true; } else { return false; } } public Channel getChannel() { return channel; } public static void main(String[] args) { try { new SocketClient(new URI(URL)).connect(); } catch (URISyntaxException e) { e.printStackTrace(); } } }

重连成功截图: 在这里插入图片描述



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3