VUE2 前端应用kafka

您所在的位置:网站首页 卡夫卡数据库怎么用 VUE2 前端应用kafka

VUE2 前端应用kafka

2024-07-16 01:01| 来源: 网络整理| 查看: 265

本次应用理论阐述

前端获取kafka数据需要启用node.js服务端。服务端使用kafka-node插件,启动后获取kafka数据。vue-socket.io用于web端与启动的node服务做socket链接,从而将node服务端获取的数据发送到web页面。 启用node服务,选用的是express。有一点需要注意,socket.io的版本,正常安装的话应该是4点多的版本,但是在node与web做连接的时候,可能会出现连接不通的情况。但是换到2.0.4的版本就可以正常连接了。个人感觉是相关依赖版本之间的问题,但是没有深入的研究,有兴趣的可以调整寻找一下。

vue项目添加相关依赖 "express": "^4.18.2", "kafka-node": "^5.0.0", "socket.io": "^2.0.4", "socket.io-client": "^4.7.2", "vue-socket.io": "^3.0.10", src同级新建node需要执行的文件夹

在这里插入图片描述

首先说node服务端代码–server文件夹下创建js文件,名字正常即可(我使用的server.js)

node端代码如下:

let express = require('express'); let app = express(); // app.all('*', function (req, res) {//本地如果出现跨域的话可以把注释这一块放开,正常不需要,看情况而定 // //设置可以接收请求的域名 // res.header("Access-Control-Allow-Origin", "*"); // res.header("Access-Control-Allow-Headers", "X-Requested-With"); // res.header("Access-Control-Allow-Methods", "PUT,POST,GET,DELETE,OPTIONS"); // res.header("Content-Type", "application/json;charset=utf-8"); // next(); // }) let server = require('http').createServer(app); let io = require('socket.io')(server, { cors: true }); let kafka = require('kafka-node'); let payloads = [{//payloads 是kafka要获取数据的主题,可以传多个,此处为数组 topic: "",//需要连接kafka的主题,主题是必填的 groupId: '',//需要连接kafka的组名,组的话正常都会有,也可以看做必填 partitions: 0,//需要连接kafka的分区,一般使用0就行。 }]; let options = { host: "",//需要连接的kafka的iP+端口 sessionTimeout: 15000, groupId: '',//需要连接kafka的组名 autoCommit: true, fromOffset: true, partitions: 0, }; let client = new kafka.KafkaClient({ kafkaHost: ""//需要连接的kafka的iP+端口 }); const kafkaConsumer = (payloads) => { let consumer = new kafka.Consumer(client, payloads, options); consumer.on("message", function (message) { io.sockets.emit('server_counter', message); }); io.on("connection", (socket) => { //从页面发送过来的需要订阅的kafka主题数据 socket.on("addPlayData", (data) => { let topicArr = []; if (data.dpNo) { topicArr = [{ topic: data.dpNo, offset: 0 }] } else if (data.dpNos) { let dpNoArr = data.dpNos.split(','); dpNoArr.forEach(value => { let obj = { topic: value, // offset: 0 } topicArr.push(obj) }) } consumer.addTopics(topicArr, function (err, added) { }) }) //订阅的kafka如果有重复订阅的可能,需要及时删除订阅的kafka主题内容,防止重复订阅 socket.on("removePlayData", (data) => { let topicArr = []; if (data.dpNo) { topicArr.push(data.dpNo) } else if (data.dpNos) { topicArr = data.dpNos.split(','); } consumer.removeTopics(topicArr, function (err, added) { }) }) }) } kafkaConsumer(payloads); server.listen(3000);//一般node启动服务端口为3000,如果需要修改端口的话,可以另外设置,这里就不做赘述了 web页面代码 1、main.js文件代码–引入vue-socket.io用来连接node服务端 import VueSocketIO from 'vue-socket.io' Vue.use(new VueSocketIO({ debug: false,//debug可以关闭,正常连接后关闭即可 connection: 'http://localhost:3000',//此处为node服务端ip+端口,本地开发可以直接用localhost options: {//此处配置,是用来设置在需要位置触发socket接受传输数据 autoConnect: false }, })) 2、在methods同级增加socket代码。

在这里插入图片描述 详细代码内容如下:

sockets: { server_counter(socketsData) {//server_counter为node服务端通过socket传递的主题名称server_counter console.log("====", socketsData);//socketsData内容即为从kafka获取的数据内容 }, }, this.$socket.emit("addPlayData", data);//向socket发送要订阅的kafka主题数据 this.$socket.connect();//web端触发socket传输数据,可以放到适当位置。 this.$socket.close();//web端停止接受socket传输数据,可以放到适当位置应用,如果需要一直连接可以不用。 this.$socket.emit("removePlayData", this.nowClickData);//向socket发送要移除订阅的kafka主题数据

以上为本地开发应用。

上线部署服务器的话,server.js可以vue打包后单独放到服务器存放项目文件的文件夹中。相对的server.js文件的依赖需要在服务器单独npm一下。到此node服务端代码和web页面端代码已经结束。应用时先启用node服务端:在存放server文件夹出进行cmd,然后执行node server.js即可(server.js为个人在server文件夹下创建的js文件),web端就可以获取到kafka推送的数据。 "express": "^4.18.2", "kafka-node": "^5.0.0", "socket.io": "^2.0.4", "socket.io-client": "^4.7.2", 我们是把kafka数据,做成echarts动态折线图,在测试过程中,我们kafka推送的数据频率比较高,导致web端生成echarts图形以及页面卡顿现象。偶尔也会出现kafka数据堆积导致web端收不到数据的情况,建议在应用过程中控制kafka发送数据频率。

后续在使用过程中有其他问题会持续更新,以上为个人使用总结,如有错误欢迎指出,会及时更新



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3