Spark+Kafka构建实时分析Dashboard

您所在的位置:网站首页 扬州韩劬 Spark+Kafka构建实时分析Dashboard

Spark+Kafka构建实时分析Dashboard

2023-08-15 14:06| 来源: 网络整理| 查看: 265

Spark+Kafka构建实时分析Dashboard

文章目录 Spark+Kafka构建实时分析Dashboard实验系统和软件要求1.安装开发环境(python依赖、Kafka等)安装Spark安装sbt安装maven 安装Kafka核心概念测试简单实例 安装Python PyCharm安装2.对文本文件形式的原始数据集进行预处理数据预处理 Python操作Kafka建立pySpark项目运行项目 测试程序 Spark Streaming实时处理数据(Scala版本)编程思想建立Spark项目运行项目 Flask-SocketIO实时推送数据浏览器获取数据并展示效果展示遇到问题参考博客

实验系统和软件要求 Ubuntu: 18.04 Spark: 2.4.8 sbt:1.3.8 Scala: 2.11.12 kafka: 0.10.1.0 Python: 3.6 Flask: 1.0.2 Flask-SocketIO: 4.3.1 kafka-python: 2.0.2 python-engineio 4.4.1 python-socketio 4.6.0 1.安装开发环境(python依赖、Kafka等) 安装Spark

在Linux系统中打开浏览器,访问Spark官方下载地址,按照如下图下载。 image-20230522143658357

Spark部署模式主要有四种:Local模式(单机模式)、Standalone模式(使用Spark自带的简单集群管理器)、YARN模式(使用YARN作为集群管理器)和Mesos模式(使用Mesos作为集群管理器)。 这里介绍Local模式(单机模式)的 Spark安装。我们选择Spark 3.3.2版本,并且假设当前使用用户名hadoop登录了Linux操作系统。

sudo tar -zxf ~/Downloads/spark-2.4.8-bin-without-hadoop.tgz -C /usr/local/ cd /usr/local sudo mv ./spark-2.4.8-bin-without-hadoop/ ./spark sudo chown -R hadoop:hadoop ./spark # 此处的 hadoop 为你的用户名

安装后,还需要修改Spark的配置文件spark-env.sh

cd /usr/local/spark cp ./conf/spark-env.sh.template ./conf/spark-env.sh

编辑spark-env.sh文件(vim ./conf/spark-env.sh),在第一行添加以下配置信息:

export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)

有了上面的配置信息以后,Spark就可以把数据存储到Hadoop分布式文件系统HDFS中,也可以从HDFS中读取数据。如果没有配置上面信息,Spark就只能读写本地数据,无法读写HDFS数据。 配置完成后就可以直接使用,不需要像Hadoop运行启动命令。 通过运行Spark自带的示例,验证Spark是否安装成功。

cd /usr/local/spark bin/run-example SparkPi image-20230525120939408

执行时会输出非常多的运行信息,输出结果不容易找到,可以通过 grep 命令进行过滤(命令中的 2>&1 可以将所有的信息都输出到 stdout 中,否则由于输出日志的性质,还是会输出到屏幕中):

bin/run-example SparkPi 2>&1 | grep "Pi is" image-20230522143815086

下面正式使用命令进入spark-shell环境,可以通过下面命令启动spark-shell环境:

bin/spark-shell

该命令省略了参数,这时,系统默认是“bin/spark-shell --master local[*]”,也就是说,是采用本地模式运行,并且使用本地所有的CPU核心。

启动Spark

现在,你就可以在里面输入scala代码进行调试了。

比如,下面在命令提示符后面输入一个表达式“8 * 2 + 5”,然后回车,就会立即得到结果:

scala> 8*2+5 res0: Int = 21 image-20230528174535212

最后,可以使用命令“:quit”退出Spark Shell,如下所示:

scala>:quit 安装sbt

使用winscp传输文件

image-20230522161111199 命令将下载后的文件拷贝至安装目录中: sudo mkdir /usr/local/sbt    # 创建安装目录 cd ~/Downloads sudo tar -zxvf ./sbt-1.3.8.tgz -C /usr/local cd /usr/local/sbt sudo chown -R hadoop /usr/local/sbt    # 此处的hadoop为系统当前用户名 cp ./bin/sbt-launch.jar ./ #把bin目录下的sbt-launch.jar复制到sbt安装目录下

接着在安装目录中使用下面命令创建一个Shell脚本文件,用于启动sbt:

vim /usr/local/sbt/sbt

该脚本文件中的代码如下:

#!/bin/bash SBT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M" java $SBT_OPTS -jar `dirname $0`/sbt-launch.jar "$@"

保存后,还需要为该Shell脚本文件增加可执行权限:

chmod u+x /usr/local/sbt/sbt

然后,可以使用如下命令查看sbt版本信息:

cd /usr/local/sbt ./sbt sbtVersion Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=256M; support was removed in 8.0 [warn] No sbt.version set in project/build.properties, base directory: /usr/local/sbt [info] Set current project to sbt (in build file:/usr/local/sbt/) [info] 1.3.8 安装maven

1.安装maven ubuntu中没有自带安装maven,需要手动安装maven.我选择下载到本地在传输到虚拟机。 选择安装在/usr/local/maven中:

sudo unzip ~/Downloads/apache-maven-3.9.2-bin.zip -d /usr/localcd /usr/local sudo mv apache-maven-3.9.2/ ./maven sudo chown -R hadoop ./maven

2.Java应用程序代码 在终端执行如下命令创建一个文件夹sparkapp2作为应用程序根目录

cd ~ #进入用户主文件夹 mkdir -p ./sparkapp2/src/main/java

在 ./sparkapp2/src/main/java 下建立一个名为 SimpleApp.java 的文件(vim ./sparkapp2/src/main/java/SimpleApp.java),添加代码如下:

//*** SimpleApp.java ***/ import org.apache.spark.api.java.*; import org.apache.spark.api.java.function.Function; public class SimpleApp { public static void main(String[] args) { String logFile = "file:///usr/local/spark/README.md"; // Should be some file on your system JavaSparkContext sc = new JavaSparkContext("local", "Simple App", "file:///usr/local/spark/", new String[]{"target/simple-project-1.0.jar"}); JavaRDD logData = sc.textFile(logFile).cache(); long numAs = logData.filter(new Function() { public Boolean call(String s) { return s.contains("a"); } }).count(); long numBs = logData.filter(new Function() { public Boolean call(String s) { return s.contains("b"); } }).count(); System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs); } }

该程序依赖Spark Java API,因此我们需要通过Maven进行编译打包。在./sparkapp2中新建文件pom.xml(vim ./sparkapp2/pom.xml),添加内容如下,声明该独立应用程序的信息以及与Spark的依赖关系:

edu.berkeley simple-project 4.0.0 Simple Project jar 1.0 Akka repository http://repo.akka.io/releases org.apache.spark spark-core_2.11 2.1.0 image-20230522145548274

3.使用maven打包java程序 为了保证maven能够正常运行,先执行如下命令检查整个应用程序的文件结构:

cd ~/sparkapp2 find

文件结构如下图: image-20230522145526078

接着,我们可以通过如下代码将这整个应用程序打包成Jar(注意:电脑需要保持连接网络的状态,而且首次运行mvn package命令时,系统会自动从网络下载相关的依赖包,同样消耗几分钟的时间,后面再次运行mvn package命令,速度就会快很多):

/usr/local/maven/bin/mvn package

如果运行上面命令后出现类似下面的信息,说明生成Jar包成功:

image-20230522145620069

4.通过spark-submit 运行程序

可以通过spark-submit提交应用程序,该命令的格式如下:

./bin/spark-submit --class //需要运行的程序的主类,应用程序的入口点 --master //Master URL,下面会有具体解释 --deploy-mode //部署模式 ... # other options //其他参数 //应用程序JAR包 [application-arguments] //传递给主类的主方法的参数

最后,针对上面编译打包得到的应用程序,可以通过将生成的jar包通过spark-submit提交到Spark中运行,如下命令:

/usr/local/spark/bin/spark-submit --class "SimpleApp" ~/sparkapp2/target/simple-project-1.0.jar#上面命令执行后会输出太多信息,可以不使用上面命令,而使用下面命令查看想要的结果 /usr/local/spark/bin/spark-submit --class "SimpleApp" ~/sparkapp2/target/simple-project-1.0.jar 2>&1 | grep "Lines with a"

最后得到的结果如下:

image-20230522145857511 安装Kafka

winscp传输文件

image-20230522164441653

执行如下步骤:

cd ~/Downloads sudo tar -zxf kafka_2.11-0.10.1.0.tgz -C /usr/local cd /usr/local sudo mv kafka_2.11-0.10.1.0/ ./kafka sudo chown -R hadoop ./kafka 核心概念

下面介绍Kafka相关概念,以便运行下面实例的同时,更好地理解Kafka.

Broker

Kafka集群包含一个或多个服务器,这种服务器被称为broker

Topic

每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

Partition

Partition是物理上的概念,每个Topic包含一个或多个Partition.

Producer

负责发布消息到Kafka broker

Consumer

消息消费者,向Kafka broker读取消息的客户端。

Consumer Group

每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)

测试简单实例

接下来在Ubuntu系统环境下测试简单的实例。Mac系统请自己按照安装的位置,切换到相应的指令。按顺序执行如下命令:

# 进入kafka所在的目录 cd /usr/local/kafka bin/zookeeper-server-start.sh config/zookeeper.properties image-20230522174511434

命令执行后不会返回Shell命令输入状态,zookeeper就会按照默认的配置文件启动服务,请千万不要关闭当前终端.启动新的终端,输入如下命令:

cd /usr/local/kafka bin/kafka-server-start.sh config/server.properties image-20230522174524740

kafka服务端就启动了,请千万不要关闭当前终端。启动另外一个终端,输入如下命令

在创建主题时,

在较新版本(2.2 及更高版本)的 Kafka 不再需要 ZooKeeper 连接字符串,即- -zookeeper localhost:2181。使用 Kafka Broker的 --bootstrap-server localhost:9092来替代- -zookeeper localhost:2181。

cd /usr/local/kafka bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test image-20230522174456406

topic是发布消息发布的category,以单节点的配置创建了一个叫dblab的topic.可以用list列出所有创建的topics,来查看刚才创建的主题是否存在。

bin/kafka-topics.sh --list --bootstrap-server localhost:9092 image-20230522184303069

可以在结果中查看到dblab这个topic存在。接下来用producer生产点数据:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic dblab

并尝试输入如下信息:

hello hadoop hello xmu hadoop world

然后再次开启新的终端或者直接按CTRL+C退出。然后使用consumer来接收数据,输入如下命令:

cd /usr/local/kafka bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dblab --from-beginning image-20230522184609405

便可以看到刚才产生的三条信息。说明kafka安装成功。

安装Python

Ubuntu 18.04.6 LTS自带Python3.6.9

Ubuntu系统中安装pip3,命令如下:

sudo apt-get install python3-pip image-20230523155748954

安装完pip3以后,可以使用如下Shell命令完成Flask和Flask-SocketIO这两个Python第三方库的安装以及与Kafka相关的Python库的安装:

pip3 install flask pip3 install flask-socketio pip3 install kafka-python image-20230523155833420 image-20230523155907445 image-20230523155952995

这些安装好的库在我们的程序文件的开头可以直接用来引用。比如下面的例子。

from flask import Flask from flask_socketio import SocketIO from kafka import KafkaConsumer

from import 跟直接import的区别举个例子来说明。 import socket的话,要用socket.AF_INET,因为AF_INET这个值在socket的名称空间下。 from socket import* 是把socket下的所有名字引入当前名称空间。

PyCharm安装

使用winscp传输PyCharm 安装包

image-20230523160329598

执行下面命令

cd ~ #进入当前hadoop用户的主目录 sudo tar -zxvf ~/Downloads/pycharm-community-2023.1.2.tar.gz -C /usr/local #把pycharm解压缩到/usr/local目录下 cd /usr/local sudo mv pycharm-community-2023.1.2 pycharm #重命名 sudo chown -R hadoop ./pycharm #把pycharm目录权限赋予给当前登录Ubuntu系统的hadoop用户

然后,执行如下命令启动PyCharm:

cd /usr/local/pycharm ./bin/pycharm.sh #启动PyCharm image-20230528173058226

执行上述命令之后,即可开启PyCharm。

2.对文本文件形式的原始数据集进行预处理

​ 本案例采用的数据集压缩包为data_format.zip,该数据集压缩包是淘宝2015年双11前6个月(包含双11)的交易数据(交易数据有偏移,但是不影响实验的结果),里面包含3个文件,分别是用户行为日志文件user_log.csv 、回头客训练集train.csv 、回头客测试集test.csv. 在这个案例中只是用user_log.csv这个文件,下面列出文件user_log.csv的数据格式定义:

用户行为日志user_log.csv,日志中的字段定义如下:

user_id | 买家iditem_id | 商品idcat_id | 商品类别idmerchant_id | 卖家idbrand_id | 品牌idmonth | 交易时间:月day | 交易事件:日action | 行为,取值范围{0,1,2,3},0表示点击,1表示加入购物车,2表示购买,3表示关注商品age_range | 买家年龄分段:1表示年龄=50,0和NULL则表示未知gender | 性别:0表示女性,1表示男性,2和NULL表示未知province| 收获地址省份

数据具体格式如下:

user_id,item_id,cat_id,merchant_id,brand_id,month,day,action,age_range,gender,province 328862,323294,833,2882,2661,08,29,0,0,1,内蒙古 328862,844400,1271,2882,2661,08,29,0,1,1,山西 328862,575153,1271,2882,2661,08,29,0,2,1,山西 328862,996875,1271,2882,2661,08,29,0,1,1,内蒙古 328862,1086186,1271,1253,1049,08,29,0,0,2,浙江 328862,623866,1271,2882,2661,08,29,0,0,2,黑龙江 328862,542871,1467,2882,2661,08,29,0,5,2,四川 328862,536347,1095,883,1647,08,29,0,7,1,吉林

这个案例实时统计每秒中男女生购物人数,因此针对每条购物日志,我们只需要获取gender即可,然后发送给Kafka,接下来Spark Streaming再接收gender进行处理。

数据预处理

​ 本案例使用Python对数据进行预处理,并将处理后的数据直接通过Kafka生产者发送给Kafka,

接着可以写如下Python代码,文件名为producer.py:(具体的工程文件结构参照步骤一)

# coding: utf-8 import csv import time from kafka import KafkaProducer # 实例化一个KafkaProducer示例,用于向Kafka投递消息 producer = KafkaProducer(bootstrap_servers='localhost:9092') # 打开数据文件 csvfile = open("../data/user_log.csv","r") # 生成一个可用于读取csv文件的reader reader = csv.reader(csvfile) for line in reader: gender = line[9] # 性别在每行日志代码的第9个元素 if gender == 'gender': continue # 去除第一行表头 time.sleep(0.1) # 每隔0.1秒发送一行数据 # 发送数据,topic为'sex' producer.send('sex',line[9].encode('utf8')) image-20230528172947156

上述代码很简单,首先是先实例化一个Kafka生产者。然后读取用户日志文件,每次读取一行,接着每隔0.1秒发送给Kafka,这样1秒发送10条购物日志。这里发送给Kafka的topic为’sex’。

Python操作Kafka

我们可以写一个KafkaConsumer测试数据是否投递成功,代码如下,文件名为consumer.py

from kafka import KafkaConsumer consumer = KafkaConsumer('sex') for msg in consumer: print((msg.value).decode('utf8'))

image-20230528173429907在开启上述KafkaProducer和KafkaConsumer之前,需要先开启Kafka,命令如下:

cd /usr/local/kafka bin/zookeeper-server-start.sh config/zookeeper.properties & bin/kafka-server-start.sh config/server.properties

启动zookeeper

image-20230524211044498

启动Kafkaserver

image-20230524211137444

在Kafka开启之后,即可开启KafkaProducer和KafkaConsumer。

在Pycharm启动producer.py和consumer.py

image-20230528172947156 image-20230524211250817

运行上面这条命令以后,这时,你会看到屏幕上会输出一行又一行的数字,类似下面的样子:

image-20230524211314170 建立pySpark项目

首先在/usr/local/spark/mycode新建项目目录

cd /usr/local/spark/mycode mkdir kafka image-20230524231930754

然后在kafka这个目录下创建一个kafka_test.py文件。

from kafka import KafkaProducer from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils from pyspark import SparkConf, SparkContext import json import sys def KafkaWordCount(zkQuorum, group, topics, numThreads): spark_conf = SparkConf().setAppName("KafkaWordCount") sc = SparkContext(conf=spark_conf) sc.setLogLevel("ERROR") ssc = StreamingContext(sc, 1) ssc.checkpoint(".") # 这里表示把检查点文件写入分布式文件系统HDFS,所以要启动Hadoop # ssc.checkpoint(".") topicAry = topics.split(",") # 将topic转换为hashmap形式,而python中字典就是一种hashmap topicMap = {} for topic in topicAry: topicMap[topic] = numThreads lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(lambda x : x[1]) words = lines.flatMap(lambda x : x.split(" ")) wordcount = words.map(lambda x : (x, 1)).reduceByKeyAndWindow((lambda x,y : x+y), (lambda x,y : x-y), 1, 1, 1) wordcount.foreachRDD(lambda x : sendmsg(x)) ssc.start() ssc.awaitTermination() # 格式转化,将[["1", 3], ["0", 4], ["2", 3]]变为[{'1': 3}, {'0': 4}, {'2': 3}],这样就不用修改第四个教程的代码了 def Get_dic(rdd_list): res = [] for elm in rdd_list: tmp = {elm[0]: elm[1]} res.append(tmp) return json.dumps(res) def sendmsg(rdd): if rdd.count != 0: msg = Get_dic(rdd.collect()) # 实例化一个KafkaProducer示例,用于向Kafka投递消息 producer = KafkaProducer(bootstrap_servers='localhost:9092') producer.send("result", msg.encode('utf8')) # 很重要,不然不会更新 producer.flush() if __name__ == '__main__': # 输入的四个参数分别代表着 # 1.zkQuorum为zookeeper地址 # 2.group为消费者所在的组 # 3.topics该消费者所消费的topics # 4.numThreads开启消费topic线程的个数 if (len(sys.argv) /** Set reasonable logging levels for streaming if the user has not configured log4j. */ def setStreamingLogLevels() { val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements if (!log4jInitialized) { // We first log something to initialize Spark's default logging, then we override the // logging level. logInfo("Setting log level to [WARN] for streaming example." + " To override add a custom log4j.properties to the classpath.") Logger.getRootLogger.setLevel(Level.WARN) } } }

这个文件不做过多解释,因为这只是一个辅助文件,下面着重介绍工程主文件,文件名为KafkaTest.scala

package org.apache.spark.examples.streaming import java.util.HashMap import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.json4s._ import org.json4s.jackson.Serialization import org.json4s.jackson.Serialization.write import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.Interval import org.apache.spark.streaming.kafka._ object KafkaWordCount { implicit val formats = DefaultFormats//数据格式化时需要 def main(args: Array[String]): Unit={ if (args.length numThreads的哈稀表 val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap // 创建连接Kafka的消费者链接 val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) val words = lines.flatMap(_.split(" "))//将输入的每行用空格分割成一个个word // 对每一秒的输入数据进行reduce,然后将reduce后的数据发送给Kafka val wordCounts = words.map(x => (x, 1L)) .reduceByKeyAndWindow(_+_,_-_, Seconds(1), Seconds(1), 1).foreachRDD(rdd => { if(rdd.count !=0 ){ val props = new HashMap[String, Object]() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") // 实例化一个Kafka生产者 val producer = new KafkaProducer[String, String](props) // rdd.colect即将rdd中数据转化为数组,然后write函数将rdd内容转化为json格式 val str = write(rdd.collect) // 封装成Kafka消息,topic为"result" val message = new ProducerRecord[String, String]("result", null, str) // 给Kafka发送消息 producer.send(message) } }) ssc.start() ssc.awaitTermination() } }

上述代码注释已经也很清楚了,下面在简要说明下:

首先按每秒的频率读取Kafka消息;然后对每秒的数据执行wordcount算法,统计出0的个数,1的个数,2的个数;最后将上述结果封装成json发送给Kafka。

另外,需要注意,上面代码中有一行如下代码:

ssc.checkpoint(".")

这行代码表示把检查点文件写入分布式文件系统HDFS,所以一定要事先启动Hadoop。如果没有启动Hadoop,则后面运行时会出现“拒绝连接”的错误提示。如果你还没有启动Hadoop,则可以现在在Ubuntu终端中,使用如下Shell命令启动Hadoop:

cd /usr/local/hadoop #hadoop的安装目录 ./sbin/start-dfs.sh

另外,如果不想把检查点写入HDFS,而是直接把检查点写入本地磁盘文件(这样就不用启动Hadoop),则可以对ssc.checkpoint()方法中的文件路径进行指定,比如下面这个例子:

ssc.checkpoint("file:///usr/local/spark/mycode/kafka/checkpoint") 运行项目

编写好程序之后,下面介绍下如何打包运行程序。在/usr/local/spark/mycode/kafka目录下新建文件simple.sbt,输入如下内容:

name := "Simple Project" version := "1.0" scalaVersion := "2.11.12" libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.8" libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.4.8" libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.4.8" libraryDependencies += "org.json4s" %% "json4s-jackson" % "3.2.11"

然后,即可编译打包程序,输入如下命令

/usr/local/sbt/sbt package image-20230525135125064

打包成功之后,接下来编写运行脚本,在/usr/local/spark/mycode/kafka目录下新建startup.sh文件,输入如下内容:

/usr/local/spark/bin/spark-submit --driver-class-path /usr/local/spark/jars/*:/usr/local/spark/jars/kafka/* --class "org.apache.spark.examples.streaming.KafkaWordCount" /usr/local/spark/mycode/kafka/target/scala-2.11/simple-project_2.11-1.0.jar 127.0.0.1:2181 1 sex 1

其中最后四个为输入参数,含义如下

127.0.0.1:2181为Zookeeper地址1 为consumer group标签sex为消费者接收的topic1 为消费者线程数 cd /usr/local/spark/mycode/kafka sh startup.sh

运行脚本:

image-20230525185420715

在同时开启Spark Streaming项目,KafkaProducer以及KafkaConsumer之后,可以在KafkaConsumer运行窗口

运行结果:

image-20230525185139622 Flask-SocketIO实时推送数据

上篇文章说道Spark Streaming实时接收Kafka中topic为’sex’发送的日志数据,然后Spark Streaming进行实时处理,统计好每秒中男女生购物人数之后,将结果发送至Kafka,topic为’result’。在本章节,将介绍如何利用Flask-SocketIO将结果实时推送到浏览器。

利用Flask-SocketIO实时推送数据socket.io.js实时获取数据highlights.js展示数据

Spark Streaming实时接收Kafka中topic为’sex’发送的日志数据,然后Spark Streaming进行实时处理,统计好每秒中男女生购物人数之后,将结果发送至Kafka,topic为’result’。在本章节,将介绍如何利用Flask-SocketIO将结果实时推送到浏览器。 文件目录结构如下:

1.data目录存放的是用户日志数据;2.scripts目录存放的是Kafka生产者和消费者;3.static/js目录存放的是前端所需要的js框架;4.templates目录存放的是html页面;5.app.py为web服务器,接收Spark Streaming处理后的结果,并推送实时数据给浏览器;6.External Libraries是本项目所依赖的Python库,是PyCharm自动生成。 备注:为了方便大家完成上述实验,这里提供源代码文件的下载,请点击这里从百度云盘下载。大家下载完源代码以后,可以导入到自己的IntelliJIDEA开发工具中,查看源代码。备注:点击这里查看IntelliJIDEA工具的安装方法。

首先我们创建如图中的app.py文件,app.py的功能就是作为一个简易的服务器,处理连接请求,以及处理从kafka接收的数据,并实时推送到浏览器。app.py的代码如下:

import json from flask import Flask, render_template from flask_socketio import SocketIO from kafka import KafkaConsumer app = Flask(__name__) app.config['SECRET_KEY'] = 'secret!' socketio = SocketIO(app) thread = None # 实例化一个consumer,接收topic为result的消息 consumer = KafkaConsumer('result', bootstrap_servers='192.168.1.30:9092') # 一个后台线程,持续接收Kafka消息,并发送给客户端浏览器 def background_thread(): girl = 0 boy = 0 for msg in consumer: data_json = msg.value.decode('utf8') data_list = json.loads(data_json) for data in data_list: if '0' in data.keys(): girl = data['0'] elif '1' in data.keys(): boy = data['1'] else: continue result = str(girl) + ',' + str(boy) print(result) socketio.emit('test_message', {'data': result}) # 客户端发送connect事件时的处理函数 @socketio.on('test_connect') def connect(message): print(message) global thread if thread is None: # 单独开启一个线程给客户端发送数据 thread = socketio.start_background_task(target=background_thread) socketio.emit('connected', {'data': 'Connected'}) # 通过访问http://127.0.0.1:5000/访问index.html @app.route("/") def handle_mes(): return render_template("index.html") # main函数 if __name__ == '__main__': socketio.run(app, debug=True)

Python

这段代码实现比较简单,最重要就是background_thread函数,该函数从Kafka接收消息,并进行处理,获得男女生每秒钟人数,然后将结果通过函数socketio.emit实时推送至浏览器。

浏览器获取数据并展示

index.html文件负责获取数据并展示效果,该文件中的代码内容如下(也可以点击这里下载index.html文件):

import json from flask import Flask, render_template from flask_socketio import SocketIO from kafka import KafkaConsumer #因为第一步骤安装好了flask,所以这里可以引用 app = Flask(__name__) app.config['SECRET_KEY'] = 'secret!' socketio = SocketIO(app) thread = None # 实例化一个consumer,接收topic为result的消息 consumer = KafkaConsumer('result') # 一个后台线程,持续接收Kafka消息,并发送给客户端浏览器 def background_thread(): girl = 0 boy = 0 for msg in consumer: data_json = msg.value.decode('utf8') data_list = json.loads(data_json) for data in data_list: if '0' in data.keys(): girl = data['0'] elif '1' in data.keys(): boy = data['1'] else: continue result = str(girl) + ',' + str(boy) print(result) socketio.emit('test_message',{'data':result}) # 客户端发送connect事件时的处理函数 @socketio.on('test_connect') def connect(message): print(message) global thread if thread is None: # 单独开启一个线程给客户端发送数据 thread = socketio.start_background_task(target=background_thread) socketio.emit('connected', {'data': 'Connected'}) # 通过访问http://127.0.0.1:5000/访问index.html @app.route("/") def handle_mes(): return render_template("index.html") # main函数 if __name__ == '__main__': socketio.run(app,debug=True) 效果展示

经过以上步骤,一切准备就绪,我们就可以启动程序来看看最后的效果。启动步骤如下:

1.确保kafka开启。 2.开启producer.py模拟数据流。 3.启动Spark Streaming实时处理数据。提示你可以在实时处理数据启动之后,把consumer.py的topic改成result,运行comsumer.py就可以看到数据处理后的输出结果。 4.启动app.py。如果你是使用pycharm客户端,那右键就可以了。当然也可以使用终端命令:

python3 app.py

启动后的效果如下图:

pycharm运行图

image-20230527005430366 Screenshot from 2023-05-28 00-06-32 遇到问题

说明:在本次实验中需要特别注意版本号,在运行代码、脚本、打包时,需要注意配置文件对应的jar包是否为对应环境版本

无法启动kafka 原因:新版的kafka启动命令已经修改,在较新版本(2.2 及更高版本)的 Kafka 不再需要 ZooKeeper 连接字符串,即- -zookeeper localhost:2181。使用 Kafka Broker的 --bootstrap-server localhost:9092来替代- -zookeeper localhost:2181。 无法使用sbt打包 原因:simple.sbt文件未修改为对应版本,sbt内容应于对应环境的spark、scala版本对应

参考博客

Spark+Kafka构建实时分析Dashboard案例



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3