Web即时通信技术

您所在的位置:网站首页 浏览器监听推送 Web即时通信技术

Web即时通信技术

2023-07-23 20:24| 来源: 网络整理| 查看: 265

SSE是一种可以主动从服务端推送消息的技术。SSE的本质其实就是一个HTTP的长连接,只不过它给客户端发送的不是一次性的数据包,而是一个stream流,格式为text/event-stream。所以客户端不会关闭连接,会一直等着服务器发过来的新的数据流。

原理 SSE 使用 HTTP 协议,现有的服务器软件都支持。WebSocket 是一个独立协议。SSE 属于轻量级,使用简单;WebSocket 协议相对复杂。SSE 默认支持断线重连,WebSocket 需要自己实现。SSE 一般只用来传送文本,二进制数据需要编码后传送,WebSocket 默认支持传送二进制数据。SSE 支持自定义发送的消息类型。

 应用场景

股票行情、新闻推送的这种只需要服务器发送消息给客户端场景中。

客户端实现

在浏览器端创建一个EventSource实例,向服务器发起连接

open:连接一旦建立,就会触发open事件,可以在onopen属性定义回调函数

message:客户端收到服务器发来的数据,就会触发message事件,可以在onmessage属性定义回调函数。

error:如果发生通信错误(如连接中断,服务器返回数据失败),就会触发error事件,可以在onerror属性定义回调函数。

close:用于关闭 SSE 连接。source.close();

自定义事件:EventSource规范允许服务器端执行自定义事件,客户端监听该事件即可,需要使用addEventListener

订阅消息 .left-container { float: left; width: 350px; min-height: 300px; border-right: 3px solid #4b4b4b; } .left-container li{ text-overflow: ellipsis; white-space: nowrap; overflow: hidden; } .right-container{ padding-left: 30px; float: left; width: 350px; } 订阅主题 订阅 收到消息如下: 消息主题 消息内容 发布 发布消息和内容如下: function subscribe() { let topic = document.getElementById('topic').value; let url = location.origin + '/subscribe?topic=' + topic; send(url, null, process); } //发送订阅消息 function send(url, data, callback) { /* let xhr = new XMLHttpRequest(); xhr.onreadystatechange = function(){ if (xhr.readyState == 3 || xhr.readyState == 4){ if (callback) { callback(xhr.responseText); } } }; xhr.open('get', url, true); xhr.send(data); */ if (callback) { let eventSource = new EventSource(url); eventSource.onmessage = function (e) { //let temperature = JSON.parse(e.data); process(e.data); }; eventSource.onopen = function (e) { process('Connection opened'); }; eventSource.onerror = function (e) { process('Connection closed'); }; } else { let xhr = new XMLHttpRequest(); xhr.onreadystatechange = function(){ }; xhr.open('get', url, true); xhr.send(data); } } let len = 0; //处理订阅消息 function process(messsage) { let li = document.createElement('li'); li.innerHTML = messsage; //len = messsage.length; let ul = document.getElementById('message'); ul.appendChild(li); } //发布消息 function publish() { let topic = document.getElementById('pub_topic').value; let content = document.getElementById('pub_content').value; let url = location.origin + '/publish?topic=' + topic + '&content=' + content; send(url, null, null); let li = document.createElement('li'); li.innerHTML = `发布主题:${topic}; 发布内容:${content}`; let ul = document.getElementById('pub_message'); ul.appendChild(li); }  服务器端实现

事件流的对应MIME格式为text/event-stream。

服务器向浏览器发送的 SSE 数据,必须是 UTF-8 编码的文本

服务端返回数据需要特殊的格式,分为四种消息类型,且消息的每个字段使用"\n"来做分割,

Event: 事件类型,支持自定义事件

Data: 发送的数据内容,如果数据很长,可以分成多行,用\n结尾,最后一行用\n\n结尾。

ID: 每一条事件流的ID,相当于每一条数据的编号,

Retry:指定浏览器重新发起连接的时间间隔。在自动重连过程中,之前收到的最后一个ID会被发送到服务端。

//Controller package com.example.ssedemo.controller; import com.example.ssedemo.utils.ReqContextUtils; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @RestController public class SubscribeController { @RequestMapping(value = "/subscribe", method = RequestMethod.GET, produces = {MediaType.TEXT_EVENT_STREAM_VALUE}) public SseEmitter subscribe(HttpServletRequest req, HttpServletResponse res, @RequestParam("topic") String topic) { return ReqContextUtils.addSubscrib(topic, req, res); } @RequestMapping("/publish") public void publish(@RequestParam("topic") String topic, @RequestParam("content") String content) { ReqContextUtils.publishMessage(topic, content); } } //处理逻辑 package com.example.ssedemo.utils; import com.google.gson.JsonObject; import org.springframework.http.MediaType; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import javax.servlet.AsyncContext; import javax.servlet.AsyncEvent; import javax.servlet.AsyncListener; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.io.PrintWriter; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; public class ReqContextUtils { //超时时间 private static int DEFAULT_TIME_OUT = 60*60*1000; //订阅列表,存储所有主题的订阅请求,每个topic对应一个ArrayList,ArrayList里该topic的所有订阅请求 private static HashMap subscribeArray = new LinkedHashMap(); //添加订阅消息 public static SseEmitter addSubscrib(String topic, HttpServletRequest request, HttpServletResponse response) { if (null == topic || "".equals(topic)) { return null; } SseEmitter emitter = new SseEmitter(); ArrayList emitterList = subscribeArray.get(topic); if (null == emitterList) { emitterList = new ArrayList(); subscribeArray.put(topic, emitterList); } emitterList.add(emitter); return emitter; } //获取订阅列表 public static ArrayList getSubscribList(String topic) { return subscribeArray.get(topic); } //推送消息 public static void publishMessage(String topic, String content) { ArrayList emitterList = subscribeArray.get(topic); if (null != emitterList) { for(SseEmitter emitter :emitterList) { try { //emitter.send(content); emitter.send(SseEmitter.event().name("message").data(content)); } catch (Exception e) { e.printStackTrace(); } } } } } 浏览器兼容性

 



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3