使用 WebSocket 和 Node.JS Stream 构建 HTTP 隧道 – Blog of Embbnux

您所在的位置:网站首页 websocket的应用 使用 WebSocket 和 Node.JS Stream 构建 HTTP 隧道 – Blog of Embbnux

使用 WebSocket 和 Node.JS Stream 构建 HTTP 隧道 – Blog of Embbnux

2023-03-02 09:11| 来源: 网络整理| 查看: 265

在开发与第三方服务集成的应用程序或机器人时,通常需要将本地开发服务器暴露在 Internet 上以接收 Webhook 消息。要实现这一点,需要为本地服务器创建 HTTP 隧道。本文演示了如何使用 WebSocket 和 Node.js 流构建HTTP隧道工具并传输大数据。

为什么要部署自己的HTTP隧道服务

许多在线服务提供 HTTP 隧道,例如 ngrok,它提供付费的固定公共域来连接本地服务器。它还提供免费的套餐,但仅提供随机域,每次客户端重新启动时都会更改,使得在第三方服务中保存域名不方便。

要获得固定域,您可以在自己的服务器上部署HTTP隧道。ngrok还提供了开源版本用于服务器端部署,但它是一个旧版本1.x,存在一些严重的可靠性问题,不建议用于生产。

此外,使用自己的服务器,您可以确保数据安全。

Lite HTTP Tunnel项目简介

Lite HTTP Tunnel 是一项最近开发的HTTP隧道服务,可以自行托管。您可以使用 Github 存储库中的Deploy按钮部署它并免费获取固定域。

它基于 Express.js 和 Socket.io 构建,仅使用几行代码。它使用 WebSocket 将 HTTP / HTTPS 请求从公共服务器流式传输到本地服务器。

实现 步骤1:在服务器和客户端之间建立 WebSocket 连接

要在服务器端支持 WebSocket 连接,我们使用 socket.io:

const http = require('http'); const express = require('express'); const { Server } = require('socket.io'); const app = express(); const httpServer = http.createServer(app); const io = new Server(httpServer); let connectedSocket = null; io.on('connection', (socket) => { console.log('client connected'); connectedSocket = socket; const onMessage = (message) => { if (message === 'ping') { socket.send('pong'); } } const onDisconnect = (reason) => { console.log('client disconnected: ', reason); connectedSocket = null; socket.off('message', onMessage); socket.off('error', onError); }; const onError = (e) => { connectedSocket = null; socket.off('message', onMessage); socket.off('disconnect', onDisconnect); }; socket.on('message', onMessage); socket.once('disconnect', onDisconnect); socket.once('error', onError); }); httpServer.listen(process.env.PORT);

要在客户端连接 WebSocket:

const { io } = require('socket.io-client'); let socket = null; function initClient(options) { socket = io(options.server, { transports: ["websocket"], auth: { token: options.jwtToken, }, }); socket.on('connect', () => { if (socket.connected) { console.log('client connect to server successfully'); } }); socket.on('connect_error', (e) => { console.log('connect error', e && e.message); }); socket.on('disconnect', () => { console.log('client disconnected'); }); } 第二步:使用 JWT Token 保护 WebSocket 连接

在服务器端,我们使用 socket.io 中间件拒绝无效连接:

const jwt = require('jsonwebtoken'); io.use((socket, next) => { if (connectedSocket) { return next(new Error('Connected error')); } if (!socket.handshake.auth || !socket.handshake.auth.token){ next(new Error('Authentication error')); } jwt.verify(socket.handshake.auth.token, process.env.SECRET_KEY, function(err, decoded) { if (err) { return next(new Error('Authentication error')); } if (decoded.token !== process.env.VERIFY_TOKEN) { return next(new Error('Authentication error')); } next(); }); }); 第 3 步:将请求从服务器传输到客户端

要将请求数据从服务器发送到客户端,我们使用可写流。以下代码实现了 SocketRequest 类,该类扩展了Node.js内置 stream 模块中的 Writable 类。

const { Writable } = require('stream'); class SocketRequest extends Writable { constructor({ socket, requestId, request }) { super(); this._socket = socket; this._requestId = requestId; this._socket.emit('request', requestId, request); } _write(chunk, encoding, callback) { this._socket.emit('request-pipe', this._requestId, chunk); this._socket.conn.once('drain', () => { callback(); }); } _writev(chunks, callback) { this._socket.emit('request-pipes', this._requestId, chunks); this._socket.conn.once('drain', () => { callback(); }); } _final(callback) { this._socket.emit('request-pipe-end', this._requestId); this._socket.conn.once('drain', () => { callback(); }); } _destroy(e, callback) { if (e) { this._socket.emit('request-pipe-error', this._requestId, e && e.message); this._socket.conn.once('drain', () => { callback(); }); return; } callback(); } } app.use('/', (req, res) => { if (!connectedSocket) { res.status(404); res.send('Not Found'); return; } const requestId = uuidV4(); const socketRequest = new SocketRequest({ socket: connectedSocket, requestId, request: { method: req.method, headers: { ...req.headers }, path: req.url, }, }); const onReqError = (e) => { socketRequest.destroy(new Error(e || 'Aborted')); } req.once('aborted', onReqError); req.once('error', onReqError); req.pipe(socketRequest); req.once('finish', () => { req.off('aborted', onReqError); req.off('error', onReqError); }); // ... });

要在客户端接收请求数据,我们使用可读流。以下代码实现了 SocketRequest 类,该类扩展了Node.js内置 stream 模块中的 Readable 类。

onst stream = require('stream'); class SocketRequest extends stream.Readable { constructor({ socket, requestId }) { super(); this._socket = socket; this._requestId = requestId; const onRequestPipe = (requestId, data) => { if (this._requestId === requestId) { this.push(data); } }; const onRequestPipes = (requestId, data) => { if (this._requestId === requestId) { data.forEach((chunk) => { this.push(chunk); }); } }; const onRequestPipeError = (requestId, error) => { if (this._requestId === requestId) { this._socket.off('request-pipe', onRequestPipe); this._socket.off('request-pipes', onRequestPipes); this._socket.off('request-pipe-error', onRequestPipeError); this._socket.off('request-pipe-end', onRequestPipeEnd); this.destroy(new Error(error)); } }; const onRequestPipeEnd = (requestId, data) => { if (this._requestId === requestId) { this._socket.off('request-pipe', onRequestPipe); this._socket.off('request-pipes', onRequestPipes); this._socket.off('request-pipe-error', onRequestPipeError); this._socket.off('request-pipe-end', onRequestPipeEnd); if (data) { this.push(data); } this.push(null); } }; this._socket.on('request-pipe', onRequestPipe); this._socket.on('request-pipes', onRequestPipes); this._socket.on('request-pipe-error', onRequestPipeError); this._socket.on('request-pipe-end', onRequestPipeEnd); } _read() {} } socket.on('request', (requestId, request) => { console.log(`${requesthod}: `, request.path); request.port = options.port; request.hostname = options.host; const socketRequest = new SocketRequest({ requestId, socket: socket, }); const localReq = http.request(request); socketRequest.pipe(localReq); const onSocketRequestError = (e) => { socketRequest.off('end', onSocketRequestEnd); localReq.destroy(e); }; const onSocketRequestEnd = () => { socketRequest.off('error', onSocketRequestError); }; socketRequest.once('error', onSocketRequestError); socketRequest.once('end', onSocketRequestEnd); // ... }); 第四步:从客户端传输响应到服务器

为将响应数据发送到隧道服务器,我们将使用流模块创建可写 stream。

const stream = require('stream'); class SocketResponse extends stream.Writable { constructor({ socket, responseId }) { super(); this._socket = socket; this._responseId = responseId; } _write(chunk, encoding, callback) { this._socket.emit('response-pipe', this._responseId, chunk); this._socket.io.engine.once('drain', () => { callback(); }); } _writev(chunks, callback) { this._socket.emit('response-pipes', this._responseId, chunks); this._socket.io.engine.once('drain', () => { callback(); }); } _final(callback) { this._socket.emit('response-pipe-end', this._responseId); this._socket.io.engine.once('drain', () => { callback(); }); } _destroy(e, callback) { if (e) { this._socket.emit('response-pipe-error', this._responseId, e && e.message); this._socket.io.engine.once('drain', () => { callback(); }); return; } callback(); } writeHead(statusCode, statusMessage, headers) { this._socket.emit('response', this._responseId, { statusCode, statusMessage, headers, }); } } socket.on('request', (requestId, request) => { // ...stream request and send request to local server... const onLocalResponse = (localRes) => { localReq.off('error', onLocalError); const socketResponse = new SocketResponse({ responseId: requestId, socket: socket, }); socketResponse.writeHead( localRes.statusCode, localRes.statusMessage, localRes.headers ); localRes.pipe(socketResponse); }; const onLocalError = (error) => { console.log(error); localReq.off('response', onLocalResponse); socket.emit('request-error', requestId, error && error.message); socketRequest.destroy(error); }; localReq.once('error', onLocalError); localReq.once('response', onLocalResponse); });

为了在隧道服务器中获取响应数据,我们将创建一个可读流。

class SocketResponse extends Readable { constructor({ socket, responseId }) { super(); this._socket = socket; this._responseId = responseId; const onResponse = (responseId, data) => { if (this._responseId === responseId) { this._socket.off('response', onResponse); this._socket.off('request-error', onRequestError); this.emit('response', data.statusCode, data.statusMessage, data.headers); } } const onResponsePipe = (responseId, data) => { if (this._responseId === responseId) { this.push(data); } }; const onResponsePipes = (responseId, data) => { if (this._responseId === responseId) { data.forEach((chunk) => { this.push(chunk); }); } }; const onResponsePipeError = (responseId, error) => { if (this._responseId !== responseId) { return; } this._socket.off('response-pipe', onResponsePipe); this._socket.off('response-pipes', onResponsePipes); this._socket.off('response-pipe-error', onResponsePipeError); this._socket.off('response-pipe-end', onResponsePipeEnd); this.destroy(new Error(error)); }; const onResponsePipeEnd = (responseId, data) => { if (this._responseId !== responseId) { return; } if (data) { this.push(data); } this._socket.off('response-pipe', onResponsePipe); this._socket.off('response-pipes', onResponsePipes); this._socket.off('response-pipe-error', onResponsePipeError); this._socket.off('response-pipe-end', onResponsePipeEnd); this.push(null); }; const onRequestError = (requestId, error) => { if (requestId === this._responseId) { this._socket.off('request-error', onRequestError); this._socket.off('response', onResponse); this._socket.off('response-pipe', onResponsePipe); this._socket.off('response-pipes', onResponsePipes); this._socket.off('response-pipe-error', onResponsePipeError); this._socket.off('response-pipe-end', onResponsePipeEnd); this.emit('requestError', error); } }; this._socket.on('response', onResponse); this._socket.on('response-pipe', onResponsePipe); this._socket.on('response-pipes', onResponsePipes); this._socket.on('response-pipe-error', onResponsePipeError); this._socket.on('response-pipe-end', onResponsePipeEnd); this._socket.on('request-error', onRequestError); } _read(size) {} } app.use('/', (req, res) => { // ... stream request to tunnel client const onResponse = (statusCode, statusMessage, headers) => { socketRequest.off('requestError', onRequestError) res.writeHead(statusCode, statusMessage, headers); }; socketResponse.once('requestError', onRequestError) socketResponse.once('response', onResponse); socketResponse.pipe(res); const onSocketError = () => { res.end(500); }; socketResponse.once('error', onSocketError); connectedSocket.once('close', onSocketError) res.once('close', () => { connectedSocket.off('close', onSocketError); socketResponse.off('error', onSocketError); }); });

在完成以上所有步骤后,我们已经支持将 HTTP 请求流式传输到本地计算机,并从本地服务器发送响应到原始请求。这是一种轻量级的解决方案,但稳定性高且易于部署在任何 “Node.js” 环境中。

第六步:部署HTTP隧道服务

我们可以将HTTP隧道服务部署到云提供商(如 Heroku)。项目 Lite HTTP Tunnel 在Github存储库中包含一个 Heroku/Render 按钮,可让您快速将服务部署到 Heroku/Render。

更多信息

在本文中,我们学习了如何基于 WebSocket 和 Node.js 流构建HTTP隧道工具。使用此工具,我们可以将本地开发服务器暴露到 Internet,并从第三方服务接收 Webhook 消息。我们还了解了如何使用 Node.js 流在客户端和服务器之间传输大量数据。

英文文章:https://dev.to/embbnux/building-a-http-tunnel-with-websocket-and-nodejs-4bp5



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3