Gateway整合spring boot websocket、原理分析

您所在的位置:网站首页 websocket网关集群 Gateway整合spring boot websocket、原理分析

Gateway整合spring boot websocket、原理分析

2023-12-22 02:55| 来源: 网络整理| 查看: 265

之前写过gateway整合websocket,使用stomp Gateway整合websocket stomp_u014203449的博客-CSDN博客_gateway整合websocket。

如果使用springboot websocket,也是可以的,使用简单方便。

顺便看看gateway和boot 关于websocket的核心类

gateway

以下是代码,gateway配置。

注意路由规则中,websocekt转发的协议要写成 ws: 。lb是指从注册中心根据服务名获取服务

routes: - id: web order: 5 uri: lb://web #lb代表从注册中心获取服务,将path的请求路由到uri predicates: - Path=/web/** filters: - StripPrefix=1 #除去第一个/前缀,比如请求/wisdomclass-demo/demo,会去除前缀/wisdomclass-demo,请求到路由服务的 /demo接口 - name: Hystrix #Hystrix Filter的名称,一个在网关层面的熔断过滤器,如果后端服务不可用,可作服务降级,返回友好提示 args: #Hystrix配置参数 name: fallbackcmd #HystrixCommond 的名字 fallbackUri: forward:/fallback #自定义的接口 - id: web uri: lb:ws://web #wesocket协议 order: 2 predicates: - Path=/web/websocket/apaas/** filters: - StripPrefix=1

但我测试以下这个handler不加也可以,在boot中有对ws的处理 

@Component public class WebsocketHandler implements GlobalFilter, Ordered { private final Logger logger = LoggerFactory.getLogger(WebsocketHandler.class); private final static String DEFAULT_FILTER_PATH = "/websocket/**"; /** * * @param exchange ServerWebExchange是一个HTTP请求-响应交互的契约。提供对HTTP请求和响应的访问, * 并公开额外的 服务器 端处理相关属性和特性,如请求属性 * @param chain * @return */ @Override public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { String upgrade = exchange.getRequest().getHeaders().getUpgrade(); URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR); String scheme = requestUrl.getScheme(); AntPathMatcher pathMatcher = new AntPathMatcher(); if (!"ws".equals(scheme) && !"wss".equals(scheme)) { return chain.filter(exchange); } else if (pathMatcher.match(DEFAULT_FILTER_PATH,requestUrl.getPath())) { logger.info("now request websocekt ={}",requestUrl.getPath()); String wsScheme = convertWsToHttp(scheme); URI wsRequestUrl = UriComponentsBuilder.fromUri(requestUrl).scheme(wsScheme).build().toUri(); exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, wsRequestUrl); } return chain.filter(exchange); } @Override public int getOrder() { return Ordered.LOWEST_PRECEDENCE - 2; } static String convertWsToHttp(String scheme) { scheme = scheme.toLowerCase(); return "ws".equals(scheme) ? "http" : "wss".equals(scheme) ? "https" : scheme; } } boot项目

其他boot websocket配置简单

org.springframework.boot spring-boot-starter-websocket import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; @Configuration public class WebSocketConfig { /** * ServerEndpointExporter 作用 * * 这个Bean会自动注册使用@ServerEndpoint注解声明的websocket endpoint * * @return */ @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }

websocket接口 

import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; /** * @author :PangTiemin * @date :Created in 2020/12/14 20:46 * @description: * @modified By: */ @ServerEndpoint("/websocket/apaas/{sid}") @Component public class WebSocketServer { //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。 private static AtomicInteger onlineNum = new AtomicInteger(); //concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketServer对象。 private static ConcurrentHashMap sessionPools = new ConcurrentHashMap(); //发送消息 public void sendMessage(Session session, String message) throws IOException { if(session != null){ synchronized (session) { // System.out.println("发送数据:" + message); session.getBasicRemote().sendText(message); } } } //给指定用户发送信息 public void sendInfo(String userName, String message){ Session session = sessionPools.get(userName); try { sendMessage(session, message); }catch (Exception e){ e.printStackTrace(); } } //建立连接成功调用 @OnOpen public void onOpen(Session session, @PathParam(value = "sid") String userName){ sessionPools.put(userName, session); addOnlineCount(); System.out.println(userName + "加入webSocket2!当前人数为" + onlineNum); try { sendMessage(session, "欢迎" + userName + "加入连接!"); } catch (IOException e) { e.printStackTrace(); } } //关闭连接时调用 @OnClose public void onClose(@PathParam(value = "sid") String userName){ sessionPools.remove(userName); subOnlineCount(); System.out.println(userName + "断开webSocket连接!当前人数为" + onlineNum); } //收到客户端信息 @OnMessage public void onMessage(String message) throws IOException{ message = "webSocket2 客户端:" + message + ",已收到"; System.out.println(message); for (Session session: sessionPools.values()) { try { sendMessage(session, message); } catch(Exception e){ e.printStackTrace(); continue; } } } //错误时调用 @OnError public void onError(Session session, Throwable throwable){ System.out.println("发生错误"); throwable.printStackTrace(); } public static void addOnlineCount(){ onlineNum.incrementAndGet(); } public static void subOnlineCount() { onlineNum.decrementAndGet(); } }

页面

My WebSocket Send Close var websocket = null; //判断当前浏览器是否支持WebSocket, 主要此处要更换为自己的地址 if ('WebSocket' in window) { websocket = new WebSocket("ws://localhost:9000/web/websocket/apaas/hi?Authorization="+"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOlsid2ViIl0sInBob25lTnVtYmVyIjoiMTU2ODYwOTczNjYiLCJ1c2VyX25hbWUiOiIxNTY4NjA5NzM2NiIsInNjb3BlIjpbImFsbCJdLCJzeXN0ZW1UeXBlIjoiMSIsImV4cCI6MTYwNjk4NTc2OCwidXNlcklkIjoiOSIsImp0aSI6Ijk3NTNlNTk2LTNmNTUtNDE3ZS1iZmNmLTVkZmY5ZGFmZWU3MiIsImNsaWVudF9pZCI6ImNsaWVudCJ9.5QpMmSVYqvALrHpk3zT_zJWxbJKtkIJs3eUa2WWUuWI"); } else { alert('Not support websocket') } //连接发生错误的回调方法 websocket.onerror = function() { setMessageInnerHTML("error"); }; //连接成功建立的回调方法 websocket.onopen = function(event) { setMessageInnerHTML("open"); } //接收到消息的回调方法 websocket.onmessage = function(event) { setMessageInnerHTML(event.data); } //连接关闭的回调方法 websocket.onclose = function(event) { setMessageInnerHTML("close"); } //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。 // window.onbeforeunload = function() { // websocket.close(); // } //将消息显示在网页上 function setMessageInnerHTML(innerHTML) { document.getElementById('message').innerHTML += innerHTML + ''; } //关闭连接 function closeWebSocket() { websocket.close(); } //发送消息 function send() { var message = document.getElementById('text').value; websocket.send(message); } 携带认证信息

关键问题不好传递请求头,将token传递,网上有在protocol子协议的地方写入token,这种方式实际上是传递了一个Sec-websocket-protocol的请求头,服务端需要修改获取token的逻辑,而且这个请求头要求相应也有同样名称的响应头。我没采用这个方式,直接将token写在了url的queryparam上,再修改服务端获取token的逻辑。(注意websocket不支持cookie传递)

websocket = new WebSocket(url,[protocol])

gateway处理websocket

WebsocketRoutingFilter是一个GlobalFilter,对Sec-websocket-protocol的处理,然后交给 webSocketService继续处理

HandshakeWebSocketService进行一些校验,然后握手

boot处理websocket

AbstractProtocol是个核心类,他处理所有协议请求过程,debug websocekt 可以发现,他循环找不同的processor处理,状态state值随之变化。

初始state是closed,先用Http11Processor处理 state为upgrading升级中,在这步会调用ws open方法(需要我们写的ws open业务方法);再UpgradeProcessorInternal处理为 upgraded升级成功。

先截图 Http11Processor类service方法的一段。   this.getAdapter().service(this.request, this.response) 会执行 Filter链

如果执行正常,会返回 UPGRADING的状态,验证了 websocket握手时通过http,然后再升级。

AbstractProtocol,upgrading后,再UpgradeProcessorInternal处理为 upgraded升级成功。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3