Spring Boot中使用WebSocket总结(二):向指定用户发送WebSocket消息并处理对方不在线的情况

您所在的位置:网站首页 飞秋显示对方在线,但是发不了信息 Spring Boot中使用WebSocket总结(二):向指定用户发送WebSocket消息并处理对方不在线的情况

Spring Boot中使用WebSocket总结(二):向指定用户发送WebSocket消息并处理对方不在线的情况

2024-07-14 22:22| 来源: 网络整理| 查看: 265

|  暂无评论 |  4447 views

在上一篇文章(https://www.zifangsky.cn/1355.html)中我介绍了在Spring项目中使用WebSocket的几种实现方式。但是,上篇文章中只介绍了服务端采用广播模式给所有客户端发送消息,然而我们有时需要服务端给指定用户的客户端发送消息(比如:发送Web通知、实时打印用户任务的日志、两个用户点对点聊天等)。

关于服务端如何给指定用户的客户端发送消息,一般可以通过以下三种方案来实现:

方案一:WebSocket使用“Java提供的@ServerEndpoint注解”实现或者使用“Spring低层级API”实现,在建立连接时从HttpSession中获取用户登录后的用户名,然后把“用户名+该WebSocket连接”存储到ConcurrentHashMap。给指定用户发送消息,只需要根据接收者的用户名获取对方已经建立的WebSocket连接,接着给他发送消息即可。

方案二:在页面的监听路径前面动态添加当前登录的“用户ID/用户名”,这样给指定用户发送消息,只需要发送广播消息到监听了前面那个路径的客户端即可。

方案三:这种方案类似于方案一。使用Spring的高级API实现WebSocket,然后自定义HandshakeHandler类并重写determineUser方法,其目的是为了在建立连接时使用用户登录后的用户名作为此次WebSocket的凭证,最后我们就可以使用messagingTemplate.convertAndSendToUser方法给指定用户发送消息了。

注:本篇文章的完整源码可以参考:https://github.com/zifangsky/WebSocketDemo

使用SimpMessagingTemplate发送消息

使用org.springframework.messaging.simp.SimpMessagingTemplate类可以在服务端的任意地方给客户端发送消息。此外,在我们配置Spring支持STOMP后SimpMessagingTemplate类就会被自动装配到Spring的上下文中,因此我们只需要在想要使用的地方使用@Autowired注解注入SimpMessagingTemplate即可使用。

需要说明的是,SimpMessagingTemplate类有两个重要的方法,它们分别是:

public void convertAndSend(D destination, Object payload):给监听了路径destination的所有客户端发送消息payload

public void convertAndSendToUser(String user, String destination, Object payload):给监听了路径destination的用户user发送消息payload

一个简单示例: package cn.zifangsky.stompwebsocket.controller; import cn.zifangsky.stompwebsocket.model.websocket.Greeting; import cn.zifangsky.stompwebsocket.model.websocket.HelloMessage; import cn.zifangsky.stompwebsocket.service.RedisService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.messaging.simp.user.SimpUserRegistry; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import javax.annotation.Resource; /** * 测试{@link org.springframework.messaging.simp.SimpMessagingTemplate}类的基本用法 * @author zifangsky * @date 2018/10/10 * @since 1.0.0 */ @Controller @RequestMapping(("/wsTemplate")) public class MessageTemplateController { private final Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private SimpMessagingTemplate messagingTemplate; @Autowired private SimpUserRegistry userRegistry; @Resource(name = "redisServiceImpl") private RedisService redisService; /** * 简单测试SimpMessagingTemplate的用法 */ @PostMapping("/greeting") @ResponseBody public String greeting(@RequestBody Greeting greeting) { this.messagingTemplate.convertAndSend("/topic/greeting", new HelloMessage("Hello," + greeting.getName() + "!")); return "ok"; } }

很显然,这里发送的地址是上篇文章中最后那个示例监听的地址,在客户端页面建立连接后,我们使用Postman请求一下上面这个方法,效果如下:

然后我们可以发现页面中也收到消息了:

向指定用户发送WebSocket消息并处理对方不在线的情况

给指定用户发送消息:

如果接收者在线,则直接发送消息;否则将消息存储到redis,等用户上线后主动拉取未读消息。 (1)自定义HandshakeInterceptor,用于禁止未登录用户连接WebSocket:

 

package cn.zifangsky.stompwebsocket.interceptor.websocket; import cn.zifangsky.stompwebsocket.common.Constants; import cn.zifangsky.stompwebsocket.common.SpringContextUtils; import cn.zifangsky.stompwebsocket.model.User; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.stereotype.Component; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.HandshakeInterceptor; import javax.servlet.http.HttpSession; import java.text.MessageFormat; import java.util.Map; /** * 自定义{@link org.springframework.web.socket.server.HandshakeInterceptor},实现“需要登录才允许连接WebSocket” * * @author zifangsky * @date 2018/10/11 * @since 1.0.0 */ @Component public class AuthHandshakeInterceptor implements HandshakeInterceptor { private final Logger logger = LoggerFactory.getLogger(getClass()); @Override public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map map) throws Exception { HttpSession session = SpringContextUtils.getSession(); User loginUser = (User) session.getAttribute(Constants.SESSION_USER); if(loginUser != null){ logger.debug(MessageFormat.format("用户{0}请求建立WebSocket连接", loginUser.getUsername())); return true; }else{ logger.error("未登录系统,禁止连接WebSocket"); return false; } } @Override public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) { } } (2)自定义HandshakeHandler,用于在建立WebSocket的时候使用自定义的Principal package cn.zifangsky.stompwebsocket.interceptor.websocket; import cn.zifangsky.stompwebsocket.common.Constants; import cn.zifangsky.stompwebsocket.common.SpringContextUtils; import cn.zifangsky.stompwebsocket.model.User; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.server.ServerHttpRequest; import org.springframework.stereotype.Component; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.support.DefaultHandshakeHandler; import javax.servlet.http.HttpSession; import java.security.Principal; import java.text.MessageFormat; import java.util.Map; /** * 自定义{@link org.springframework.web.socket.server.support.DefaultHandshakeHandler},实现“生成自定义的{@link java.security.Principal}” * * @author zifangsky * @date 2018/10/11 * @since 1.0.0 */ @Component public class MyHandshakeHandler extends DefaultHandshakeHandler{ private final Logger logger = LoggerFactory.getLogger(getClass()); @Override protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map attributes) { HttpSession session = SpringContextUtils.getSession(); User loginUser = (User) session.getAttribute(Constants.SESSION_USER); if(loginUser != null){ logger.debug(MessageFormat.format("WebSocket连接开始创建Principal,用户:{0}", loginUser.getUsername())); return new MyPrincipal(loginUser.getUsername()); }else{ logger.error("未登录系统,禁止连接WebSocket"); return null; } } }

相应地,这里的MyPrincipal继承了java.security.Principal类:

package cn.zifangsky.stompwebsocket.interceptor.websocket; import java.security.Principal; /** * 自定义{@link java.security.Principal} * * @author zifangsky * @date 2018/10/11 * @since 1.0.0 */ public class MyPrincipal implements Principal { private String loginName; public MyPrincipal(String loginName) { this.loginName = loginName; } @Override public String getName() { return loginName; } } (3)自定义ChannelInterceptor,用于在用户断开连接的时候记录日志: package cn.zifangsky.stompwebsocket.interceptor.websocket; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.simp.stomp.StompCommand; import org.springframework.messaging.simp.stomp.StompHeaderAccessor; import org.springframework.messaging.support.ChannelInterceptor; import org.springframework.stereotype.Component; import java.security.Principal; import java.text.MessageFormat; /** * 自定义{@link org.springframework.messaging.support.ChannelInterceptor},实现断开连接的处理 * * @author zifangsky * @date 2018/10/10 * @since 1.0.0 */ @Component public class MyChannelInterceptor implements ChannelInterceptor{ private final Logger logger = LoggerFactory.getLogger(getClass()); @Override public void afterSendCompletion(Message message, MessageChannel channel, boolean sent, Exception ex) { StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); StompCommand command = accessor.getCommand(); //用户已经断开连接 if(StompCommand.DISCONNECT.equals(command)){ String user = ""; Principal principal = accessor.getUser(); if(principal != null && StringUtils.isNoneBlank(principal.getName())){ user = principal.getName(); }else{ user = accessor.getSessionId(); } logger.debug(MessageFormat.format("用户{0}的WebSocket连接已经断开", user)); } } } (4)WebSocket相关的完整配置: package cn.zifangsky.stompwebsocket.config; import cn.zifangsky.stompwebsocket.interceptor.websocket.AuthHandshakeInterceptor; import cn.zifangsky.stompwebsocket.interceptor.websocket.MyChannelInterceptor; import cn.zifangsky.stompwebsocket.interceptor.websocket.MyHandshakeHandler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.simp.config.ChannelRegistration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; import org.springframework.web.socket.config.annotation.StompEndpointRegistry; import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; /** * WebSocket相关配置 * * @author zifangsky * @date 2018/9/30 * @since 1.0.0 */ @Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer{ @Autowired private AuthHandshakeInterceptor authHandshakeInterceptor; @Autowired private MyHandshakeHandler myHandshakeHandler; @Autowired private MyChannelInterceptor myChannelInterceptor; @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/stomp-websocket").withSockJS(); registry.addEndpoint("/chat-websocket") .addInterceptors(authHandshakeInterceptor) .setHandshakeHandler(myHandshakeHandler) .withSockJS(); } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { //客户端需要把消息发送到/message/xxx地址 registry.setApplicationDestinationPrefixes("/message"); //服务端广播消息的路径前缀,客户端需要相应订阅/topic/yyy这个地址的消息 registry.enableSimpleBroker("/topic"); } @Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.interceptors(myChannelInterceptor); } } (5)Controller中的消息处理如下: package cn.zifangsky.stompwebsocket.controller; import cn.zifangsky.stompwebsocket.common.Constants; import cn.zifangsky.stompwebsocket.common.SpringContextUtils; import cn.zifangsky.stompwebsocket.enums.ExpireEnum; import cn.zifangsky.stompwebsocket.model.User; import cn.zifangsky.stompwebsocket.model.websocket.Greeting; import cn.zifangsky.stompwebsocket.model.websocket.HelloMessage; import cn.zifangsky.stompwebsocket.service.RedisService; import cn.zifangsky.stompwebsocket.utils.JsonUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.messaging.simp.user.SimpUser; import org.springframework.messaging.simp.user.SimpUserRegistry; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import javax.annotation.Resource; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpSession; import java.text.MessageFormat; import java.util.HashMap; import java.util.List; import java.util.Map; /** * 测试{@link org.springframework.messaging.simp.SimpMessagingTemplate}类的基本用法 * @author zifangsky * @date 2018/10/10 * @since 1.0.0 */ @Controller @RequestMapping(("/wsTemplate")) public class MessageTemplateController { private final Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private SimpMessagingTemplate messagingTemplate; @Autowired private SimpUserRegistry userRegistry; @Resource(name = "redisServiceImpl") private RedisService redisService; /** * 简单测试SimpMessagingTemplate的用法 */ @PostMapping("/greeting") @ResponseBody public String greeting(@RequestBody Greeting greeting) { this.messagingTemplate.convertAndSend("/topic/greeting", new HelloMessage("Hello," + greeting.getName() + "!")); return "ok"; } /** * 给指定用户发送WebSocket消息 */ @PostMapping("/sendToUser") @ResponseBody public String chat(HttpServletRequest request) { //消息接收者 String receiver = request.getParameter("receiver"); //消息内容 String msg = request.getParameter("msg"); HttpSession session = SpringContextUtils.getSession(); User loginUser = (User) session.getAttribute(Constants.SESSION_USER); HelloMessage resultData = new HelloMessage(MessageFormat.format("{0} say: {1}", loginUser.getUsername(), msg)); this.sendToUser(loginUser.getUsername(), receiver, "/topic/reply", JsonUtils.toJson(resultData)); return "ok"; } /** * 给指定用户发送消息,并处理接收者不在线的情况 * @param sender 消息发送者 * @param receiver 消息接收者 * @param destination 目的地 * @param payload 消息正文 */ private void sendToUser(String sender, String receiver, String destination, String payload){ SimpUser simpUser = userRegistry.getUser(receiver); //如果接收者存在,则发送消息 if(simpUser != null && StringUtils.isNoneBlank(simpUser.getName())){ this.messagingTemplate.convertAndSendToUser(receiver, destination, payload); } //否则将消息存储到redis,等用户上线后主动拉取未读消息 else{ //存储消息的Redis列表名 String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + receiver + ":" + destination; logger.info(MessageFormat.format("消息接收者{0}还未建立WebSocket连接,{1}发送的消息【{2}】将被存储到Redis的【{3}】列表中", receiver, sender, payload, listKey)); //存储消息到Redis中 redisService.addToListRight(listKey, ExpireEnum.UNREAD_MSG, payload); } } /** * 拉取指定监听路径的未读的WebSocket消息 * @param destination 指定监听路径 * @return java.util.Map */ @PostMapping("/pullUnreadMessage") @ResponseBody public Map pullUnreadMessage(String destination){ Map result = new HashMap(); try { HttpSession session = SpringContextUtils.getSession(); //当前登录用户 User loginUser = (User) session.getAttribute(Constants.SESSION_USER); //存储消息的Redis列表名 String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + loginUser.getUsername() + ":" + destination; //从Redis中拉取所有未读消息 List messageList = redisService.rangeList(listKey, 0, -1); result.put("code", "200"); if(messageList !=null && messageList.size() > 0){ //删除Redis中的这个未读消息列表 redisService.delete(listKey); //将数据添加到返回集,供前台页面展示 result.put("result", messageList); } }catch (Exception e){ result.put("code", "500"); result.put("msg", e.getMessage()); } return result; } }

注:这里对应的几个Redis操作的方法如下:

@Override public boolean delete(String key) { return redisTemplate.delete(key); } @Override public void addToListLeft(String listKey, ExpireEnum expireEnum, Object... values) { //绑定操作 BoundListOperations boundValueOperations = redisTemplate.boundListOps(listKey); //插入数据 boundValueOperations.leftPushAll(values); //设置过期时间 boundValueOperations.expire(expireEnum.getTime(),expireEnum.getTimeUnit()); } @Override public void addToListRight(String listKey, ExpireEnum expireEnum, Object... values) { //绑定操作 BoundListOperations boundValueOperations = redisTemplate.boundListOps(listKey); //插入数据 boundValueOperations.rightPushAll(values); //设置过期时间 boundValueOperations.expire(expireEnum.getTime(),expireEnum.getTimeUnit()); } @Override public List rangeList(String listKey, long start, long end) { //绑定操作 BoundListOperations boundValueOperations = redisTemplate.boundListOps(listKey); //查询数据 return boundValueOperations.range(start, end); } (6)示例页面: Chat With STOMP Message #connect-container { margin: 0 auto; width: 400px; } #connect-container div { padding: 5px; margin: 0 7px 10px 0; } .message input { padding: 5px; margin: 0 7px 10px 0; } .layui-btn { display: inline-block; } var stompClient = null; $(function () { var target = $("#target"); if (window.location.protocol === 'http:') { target.val('http://' + window.location.host + target.val()); } else { target.val('https://' + window.location.host + target.val()); } }); function setConnected(connected) { var connect = $("#connect"); var disconnect = $("#disconnect"); var echo = $("#echo"); if (connected) { connect.addClass("layui-btn-disabled"); disconnect.removeClass("layui-btn-disabled"); echo.removeClass("layui-btn-disabled"); } else { connect.removeClass("layui-btn-disabled"); disconnect.addClass("layui-btn-disabled"); echo.addClass("layui-btn-disabled"); } connect.attr("disabled", connected); disconnect.attr("disabled", !connected); echo.attr("disabled", !connected); } //连接 function connect() { var target = $("#target").val(); var ws = new SockJS(target); stompClient = Stomp.over(ws); stompClient.connect({}, function () { setConnected(true); log('Info: STOMP connection opened.'); //连接成功后,主动拉取未读消息 pullUnreadMessage("/topic/reply"); //订阅服务端的/topic/reply地址 stompClient.subscribe("/user/topic/reply", function (response) { log(JSON.parse(response.body).content); }) },function () { //断开处理 setConnected(false); log('Info: STOMP connection closed.'); }); } //断开连接 function disconnect() { if (stompClient != null) { stompClient.disconnect(); stompClient = null; } setConnected(false); log('Info: STOMP connection closed.'); } //向指定用户发送消息 function sendMessage() { if (stompClient != null) { var receiver = $("#receiver").val(); var msg = $("#message").val(); log('Sent: ' + JSON.stringify({'receiver': receiver, 'msg':msg})); $.ajax({ url: "/wsTemplate/sendToUser", type: "POST", dataType: "json", async: true, data: { "receiver": receiver, "msg": msg }, success: function (data) { } }); } else { layer.msg('STOMP connection not established, please connect.', { offset: 'auto' ,icon: 2 }); } } //从服务器拉取未读消息 function pullUnreadMessage(destination) { $.ajax({ url: "/wsTemplate/pullUnreadMessage", type: "POST", dataType: "json", async: true, data: { "destination": destination }, success: function (data) { if (data.result != null) { $.each(data.result, function (i, item) { log(JSON.parse(item).content); }) } else if (data.code !=null && data.code == "500") { layer.msg(data.msg, { offset: 'auto' ,icon: 2 }); } } }); } //日志输出 function log(message) { console.debug(message); } Seems your browser doesn't support Javascript! Websockets rely on Javascript being enabled. Please enable Javascript and reload this page! Chat With STOMP Message Connect Disconnect Send Message

启动项目后,分别在两个浏览器中使用不同的账号登录,接着互相给对方发送消息,效果如下:

界面一:

界面二:

 



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3