Kafka采集数据保存到MongoDB中 |
您所在的位置:网站首页 › 卡夫卡数据库官网 › Kafka采集数据保存到MongoDB中 |
![]() 作者:厦门大学计算机系林子雨副教授 说明:本博客是与林子雨编著《数据采集与预处理》教材配套的教学资料。 版权声明:本博客内容,未经同意,禁止转载。 操作系统:Windows10 Kafka:kafka_2.12-2.4.0 MongoDB6.0:请参考教程《在Windows10系统中安装和使用MongDB6.0》 一、任务描述在“C:\Python38\mycode”目录下有一个stat.csv文件,里面包含如下两行内容: 1,2023-1-21,Ziyu,Lin 2,2023-1-22,Shufan,Lin现在需要编写程序,使用Kafka采集stat.csv中的数据,并对数据进行解析,然后保存到MongoDB数据库中。 二、实现代码在“C:\Python38\mycode”目录下新建一个代码文件producer.py,其内容如下: from kafka import KafkaProducer print("this is producer") producer = KafkaProducer(bootstrap_servers=['localhost:9092'], api_version=(0, 10)) csvFilePath = 'stat.csv' data = [] with open(csvFilePath, "rb") as csvfile: data = csvfile.readlines() for rec in data: # Topic为'csvdata' 消息内容为读取的CSV文件的一行 producer.send('csvDataTopic', rec)在“C:\Python38\mycode”目录下新建一个代码文件consumer.py,其内容如下: from kafka import KafkaConsumer from pymongo import MongoClient print("this is consumer") consumer = KafkaConsumer('csvDataTopic',bootstrap_servers=['localhost:9092']) client = MongoClient(port=27017) db = client.db result = {} csv_data = [] header_arr = ['id','timestamp','first_name','last_name'] for message in consumer: # 收到的订阅消息处理 print(str(message.value)) csv_data = str(message.value).split(',') data = {header_arr[i] : str(csv_data[i]) for i in range(len(header_arr))} result = db.csvstats.insert_one(data) 三、执行过程在Windows系统中打开第1个cmd窗口,执行如下命令启动Zookeeper服务: > cd c:\kafka_2.12-2.4.0 > .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.Properties打开第2个cmd窗口,然后执行下面命令启动Kafka服务: > cd c:\kafka_2.12-2.4.0 > .\bin\windows\kafka-server-start.bat .\config\server.properties打开第3个cmd窗口,执行如下命令创建一个名为test的Topic: > cd c:\kafka_2.12-2.4.0 > .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic csvDataTopic执行如下命令查看csvDataTopic是否创建成功: > .\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181打开第4个cmd窗口,执行如下命令启动消费者: > cd C:\Python38\mycode > python ./consumer.py打开IDLE,在IDLE中打开文件producer.py并执行,从而让生产者产生数据。 打开第5个cmd窗口,执行如下命令进入MongoDB Shell执行环境: > mongosh然后,在MongoDB Shell环境中执行如下命令查看数据: >show dbs; >use db; >show collections; >db.csvstats.find();查询结果图如1所示。
|
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |