spring websocket连接不上的问题排查

您所在的位置:网站首页 web服务器连接不上怎么回事 spring websocket连接不上的问题排查

spring websocket连接不上的问题排查

2024-07-17 06:56| 来源: 网络整理| 查看: 265

前言: 很久以前就在自己的服务上部署了websocket,现在有一个新接收的项目也要加上websocket,直接把代码复制过去,发现连不上。 处理过程

1.创建一个websocketserver,并加上@serverEndPoint注解。 package com.yuelun.berlin.modules.base; /** • @Author: toryxu • @Date: 2019/5/29 0029 14:31 • @Version 1.0 */ import lombok.EqualsAndHashCode; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import javax.annotation.PreDestroy; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.EOFException; import java.io.IOException; import java.util.concurrent.CopyOnWriteArraySet; @Slf4j @Component @EqualsAndHashCode @ServerEndpoint("/api_base/websocket/{sid}") public class WebSocketServer { /** * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。 */ private static int onlineCount = 0; /** * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。 */ private static CopyOnWriteArraySet webSocketSet = new CopyOnWriteArraySet(); /** * 与某个客户端的连接会话,需要通过它来给客户端发送数据 */ private Session session; /** * 接收sid */ private String sid = ""; /** * 群发自定义消息 */ public static void sendInfoToAll(String message) throws IOException { log.info("ws推送 to all 客户端个数--->{} ,---> {},", webSocketSet.size(), message); for (WebSocketServer item : webSocketSet) { log.info("ws推送 sessionId {}", item.sid); item.sendMessage(message); } } /** * 指定用户发送消息 */ public static void sendInfo(String sessionId, String message) { log.info("ws推送 sessionId {}, message {}", sessionId, message); if (StringUtils.isEmpty(sessionId)) { log.error("ws推送失败 sessionId not specified"); return; } for (WebSocketServer item : webSocketSet) { try { if (item.sid.equals(sessionId)) { item.sendMessage(message); break; } } catch (Exception e) { log.error(e.getMessage(), e); continue; } } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocketServer.onlineCount++; } public static synchronized void subOnlineCount() { WebSocketServer.onlineCount--; } /** * 连接建立成功调用的方法 */ @OnOpen public void onOpen(Session session, @PathParam("sid") String sid) { this.session = session; //加入set中 webSocketSet.add(this); //在线数加1 addOnlineCount(); log.info("ws有新连接开始监听 sid{}, count {}", sid, getOnlineCount()); this.sid = sid; try { sendMessage("连接成功"); } catch (IOException e) { log.error(e.getMessage(), e); } } /** * 连接关闭调用的方法 */ @OnClose public void onClose() { webSocketSet.remove(this); subOnlineCount(); //在线数减1 log.info("ws有一连接关闭!当前在线 " + getOnlineCount()); } /** * 收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息 */ @OnMessage public void onMessage(String message, Session session) { log.info("收到来自窗口" + sid + "的信息:" + message); //此处添加判断是否是处理了告警,若处理了告警之后需要广播所有客户端进行页面刷新 //群发消息 for (WebSocketServer item : webSocketSet) { try { item.sendMessage(message); } catch (IOException e) { log.error(e.getMessage(), e); } } } /** * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { if (error instanceof EOFException) { // ws 已断开,客户端有做重连机制 可能原因,nginx配置读超时 log.error("ws已断开: " + session.getId(), error.toString()); } else { log.error("sessionId: " + session.getId(), error); } } /** * 实现服务器主动推送 */ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } /** * 销毁资源 */ @PreDestroy public void destroy() { try { this.session.close(); } catch (IOException e) { log.error(e.getMessage(), e); } } } 2.注入serverEndPointExport package com.yuelun.berlin.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * @Author: toryxu * @Date: 2019/5/29 0029 14:29 * 开启WebSocket支持 * @Version 1.0 */ @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }

这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint。 3.因为平台配置了shiro,所以需要添加websocket endpoint的白名单。 filterMap.put("/api_base/websocket/*", “anon”); 4.启动,发现启动报错

java.lang.IllegalStateException: Failed to register @ServerEndpoint class: class com.yuelun.berlin.modules.base.controller.WebSocketServer$$EnhancerBySpringCGLIB$$6d78c605 at org.springframework.web.socket.server.standard.ServerEndpointExporter.registerEndpoint(ServerEndpointExporter.java:158) at org.springframework.web.socket.server.standard.ServerEndpointExporter.registerEndpoints(ServerEndpointExporter.java:133) at org.springframework.web.socket.server.standard.ServerEndpointExporter.afterSingletonsInstantiated(ServerEndpointExporter.java:111) at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:866) at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:877) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:549) at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:142) at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775) at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) at org.springframework.boot.SpringApplication.run(SpringApplication.java:316) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) at com.yuelun.berlin.Application.main(Application.java:23) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)

报错信息很明显,无法把@ServerEndPoint注册到WebsocketServer,因为这个类已经被代理了。为什么被代理?检查发现代码里有一个日志切面sysLogAspect类,把websocketServer也切进去了。 5. 解决 修改切面类的配置,或者调整websocketServer的路径,解决,连接成功。 6.细究 切面代理的原理大家都清楚,接下来研究一下serverEndPoint的注入原理。 上文说到,serverEndpointExport这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint。 先一下这个类的注释:

* Detects beans of type {@link javax.websocket.server.ServerEndpointConfig} and registers * with the standard Java WebSocket runtime. Also detects beans annotated with * {@link ServerEndpoint} and registers them as well. Although not required, it is likely * annotated endpoints should have their {@code configurator} property set to * {@link SpringConfigurator}. * * When this class is used, by declaring it in Spring configuration, it should be * possible to turn off a Servlet container's scan for WebSocket endpoints. This can be * done with the help of the {@code } element in {@code web.xml}.

需要注意的是,servlet容器也是可以扫描websocket endpoints的(tomcat、jetty),当然因为目前是springboot服务,所以得加上配置。

/** * Actually register the endpoints. Called by {@link #afterSingletonsInstantiated()}. */ protected void registerEndpoints() { Set endpointClasses = new LinkedHashSet(); if (this.annotatedEndpointClasses != null) { endpointClasses.addAll(this.annotatedEndpointClasses); } ApplicationContext context = getApplicationContext(); if (context != null) { String[] endpointBeanNames = context.getBeanNamesForAnnotation(ServerEndpoint.class); for (String beanName : endpointBeanNames) { endpointClasses.add(context.getType(beanName)); } } for (Class endpointClass : endpointClasses) { registerEndpoint(endpointClass); } if (context != null) { Map endpointConfigMap = context.getBeansOfType(ServerEndpointConfig.class); for (ServerEndpointConfig endpointConfig : endpointConfigMap.values()) { registerEndpoint(endpointConfig); } } }

接下来看实际注册的方法,可以通过注释看到,是在本类实例化之后被调用。 很容易懂,就是把有@ServerEndpoint注解的类以及beanType为ServerEndPointConfig的bean给找出来,注册。关键在于注册这一步。

private void registerEndpoint(Class endpointClass) { ServerContainer serverContainer = getServerContainer(); Assert.state(serverContainer != null, "No ServerContainer set. Most likely the server's own WebSocket ServletContainerInitializer " + "has not run yet. Was the Spring ApplicationContext refreshed through a " + "org.springframework.web.context.ContextLoaderListener, " + "i.e. after the ServletContext has been fully initialized?"); try { if (logger.isDebugEnabled()) { logger.debug("Registering @ServerEndpoint class: " + endpointClass); } serverContainer.addEndpoint(endpointClass); } catch (DeploymentException ex) { throw new IllegalStateException("Failed to register @ServerEndpoint class: " + endpointClass, ex); } }

ServerContainer是啥?

/** * Provides the ability to deploy endpoints programmatically. */ public interface ServerContainer extends WebSocketContainer { public abstract void addEndpoint(Class clazz) throws DeploymentException; public abstract void addEndpoint(ServerEndpointConfig sec) throws DeploymentException; }

看其实现方法WsServerContainer.addEndPoint

/** * Provides the equivalent of {@link #addEndpoint(ServerEndpointConfig)} * for publishing plain old java objects (POJOs) that have been annotated as * WebSocket endpoints. * * @param pojo The annotated POJO */ @Override public void addEndpoint(Class pojo) throws DeploymentException { if (deploymentFailed) { throw new DeploymentException(sm.getString("serverContainer.failedDeployment", servletContext.getContextPath(), servletContext.getVirtualServerName())); } ServerEndpointConfig sec; try { ServerEndpoint annotation = pojo.getAnnotation(ServerEndpoint.class); if (annotation == null) { throw new DeploymentException( sm.getString("serverContainer.missingAnnotation", pojo.getName())); } String path = annotation.value(); // Validate encoders validateEncoders(annotation.encoders()); // ServerEndpointConfig Class configuratorClazz = annotation.configurator(); Configurator configurator = null; if (!configuratorClazz.equals(Configurator.class)) { try { configurator = annotation.configurator().getConstructor().newInstance(); } catch (ReflectiveOperationException e) { throw new DeploymentException(sm.getString( "serverContainer.configuratorFail", annotation.configurator().getName(), pojo.getClass().getName()), e); } } sec = ServerEndpointConfig.Builder.create(pojo, path). decoders(Arrays.asList(annotation.decoders())). encoders(Arrays.asList(annotation.encoders())). subprotocols(Arrays.asList(annotation.subprotocols())). configurator(configurator). build(); } catch (DeploymentException de) { failDeployment(); throw de; } addEndpoint(sec); }

通过打断点可以看到这个serverEndpointConfig里面是有endpoint的类对象,以及注解里配置的连接地址。 再看下面的addEndPoint(sec)

/** * Published the provided endpoint implementation at the specified path with * the specified configuration. {@link #WsServerContainer(ServletContext)} * must be called before calling this method. * * @param sec The configuration to use when creating endpoint instances * @throws DeploymentException if the endpoint cannot be published as * requested */ @Override public void addEndpoint(ServerEndpointConfig sec) throws DeploymentException { if (enforceNoAddAfterHandshake && !addAllowed) { throw new DeploymentException( sm.getString("serverContainer.addNotAllowed")); } if (servletContext == null) { throw new DeploymentException( sm.getString("serverContainer.servletContextMissing")); } if (deploymentFailed) { throw new DeploymentException(sm.getString("serverContainer.failedDeployment", servletContext.getContextPath(), servletContext.getVirtualServerName())); } try { String path = sec.getPath(); // Add method mapping to user properties PojoMethodMapping methodMapping = new PojoMethodMapping(sec.getEndpointClass(), sec.getDecoders(), path); if (methodMapping.getOnClose() != null || methodMapping.getOnOpen() != null || methodMapping.getOnError() != null || methodMapping.hasMessageHandlers()) { sec.getUserProperties().put(org.apache.tomcat.websocket.pojo.Constants.POJO_METHOD_MAPPING_KEY, methodMapping); } UriTemplate uriTemplate = new UriTemplate(path); if (uriTemplate.hasParameters()) { Integer key = Integer.valueOf(uriTemplate.getSegmentCount()); SortedSet templateMatches = configTemplateMatchMap.get(key); if (templateMatches == null) { // Ensure that if concurrent threads execute this block they // both end up using the same TreeSet instance templateMatches = new TreeSet( TemplatePathMatchComparator.getInstance()); configTemplateMatchMap.putIfAbsent(key, templateMatches); templateMatches = configTemplateMatchMap.get(key); } if (!templateMatches.add(new TemplatePathMatch(sec, uriTemplate))) { // Duplicate uriTemplate; throw new DeploymentException( sm.getString("serverContainer.duplicatePaths", path, sec.getEndpointClass(), sec.getEndpointClass())); } } else { // Exact match ServerEndpointConfig old = configExactMatchMap.put(path, sec); if (old != null) { // Duplicate path mappings throw new DeploymentException( sm.getString("serverContainer.duplicatePaths", path, old.getEndpointClass(), sec.getEndpointClass())); } } endpointsRegistered = true; } catch (DeploymentException de) { failDeployment(); throw de; } }

里面的方法大致就是将path和websocket的endpoint做一个映射。访问该地址。 注意:这里的PojoMethodMapping,将endPoint类里的onOpen等实现方法注入:

public PojoMethodMapping(Class clazzPojo, List decoderClazzes, String wsPath) throws DeploymentException { this.wsPath = wsPath; List decoders = Util.getDecoders(decoderClazzes); Method open = null; Method close = null; Method error = null; Method[] clazzPojoMethods = null; Class currentClazz = clazzPojo; while (!currentClazz.equals(Object.class)) { Method[] currentClazzMethods = currentClazz.getDeclaredMethods(); if (currentClazz == clazzPojo) { clazzPojoMethods = currentClazzMethods; } for (Method method : currentClazzMethods) { if (method.isSynthetic()) { // Skip all synthetic methods. // They may have copies of annotations from methods we are // interested in and they will use the wrong parameter type // (they always use Object) so we can't used them here. continue; } if (method.getAnnotation(OnOpen.class) != null) { checkPublic(method); if (open == null) { open = method; } else { if (currentClazz == clazzPojo || !isMethodOverride(open, method)) { // Duplicate annotation throw new DeploymentException(sm.getString( "pojoMethodMapping.duplicateAnnotation", OnOpen.class, currentClazz)); } } } else if (method.getAnnotation(OnClose.class) != null) { checkPublic(method); if (close == null) { close = method; } else { if (currentClazz == clazzPojo || !isMethodOverride(close, method)) { // Duplicate annotation throw new DeploymentException(sm.getString( "pojoMethodMapping.duplicateAnnotation", OnClose.class, currentClazz)); } } } else if (method.getAnnotation(OnError.class) != null) { checkPublic(method); if (error == null) { error = method; } else { if (currentClazz == clazzPojo || !isMethodOverride(error, method)) { // Duplicate annotation throw new DeploymentException(sm.getString( "pojoMethodMapping.duplicateAnnotation", OnError.class, currentClazz)); } } } else if (method.getAnnotation(OnMessage.class) != null) { checkPublic(method); MessageHandlerInfo messageHandler = new MessageHandlerInfo(method, decoders); boolean found = false; for (MessageHandlerInfo otherMessageHandler : onMessage) { if (messageHandler.targetsSameWebSocketMessageType(otherMessageHandler)) { found = true; if (currentClazz == clazzPojo || !isMethodOverride(messageHandler.m, otherMessageHandler.m)) { // Duplicate annotation throw new DeploymentException(sm.getString( "pojoMethodMapping.duplicateAnnotation", OnMessage.class, currentClazz)); } } } if (!found) { onMessage.add(messageHandler); } } else { // Method not annotated } } currentClazz = currentClazz.getSuperclass(); } ..... }

看看类被日志切面类代理后,是在什么时候报的错: 可以看到,这个pojo上面被日志切面给代理类,找不到serverEndpoint这个注解了。 然后来说说什么是websocket的endpoint:

The Web Socket Endpoint represents an object that can handle websocket conversations. Developers may extend this class in order to implement a programmatic websocket endpoint. The Endpoint class holds lifecycle methods that may be overridden to intercept websocket open, error and close events. By implementing the onOpen method, the programmatic endpoint gains access to the Session object, to which the developer may add MessageHandler implementations in order to intercept incoming websocket messages. Each instance of a websocket endpoint is guaranteed not to be called by more than one thread at a time per active connection.



【本文地址】


今日新闻


推荐新闻


    CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3