Kafka采集数据保存到MongoDB中

您所在的位置:网站首页 卡夫卡数据库官网 Kafka采集数据保存到MongoDB中

Kafka采集数据保存到MongoDB中

2024-07-16 03:24| 来源: 网络整理| 查看: 265

大数据学习路线图

作者:厦门大学计算机系林子雨副教授 说明:本博客是与林子雨编著《数据采集与预处理》教材配套的教学资料。 版权声明:本博客内容,未经同意,禁止转载。 操作系统:Windows10 Kafka:kafka_2.12-2.4.0 MongoDB6.0:请参考教程《在Windows10系统中安装和使用MongDB6.0》

一、任务描述

在“C:\Python38\mycode”目录下有一个stat.csv文件,里面包含如下两行内容:

1,2023-1-21,Ziyu,Lin 2,2023-1-22,Shufan,Lin

现在需要编写程序,使用Kafka采集stat.csv中的数据,并对数据进行解析,然后保存到MongoDB数据库中。

二、实现代码

在“C:\Python38\mycode”目录下新建一个代码文件producer.py,其内容如下:

from kafka import KafkaProducer print("this is producer") producer = KafkaProducer(bootstrap_servers=['localhost:9092'], api_version=(0, 10)) csvFilePath = 'stat.csv' data = [] with open(csvFilePath, "rb") as csvfile: data = csvfile.readlines() for rec in data: # Topic为'csvdata' 消息内容为读取的CSV文件的一行 producer.send('csvDataTopic', rec)

在“C:\Python38\mycode”目录下新建一个代码文件consumer.py,其内容如下:

from kafka import KafkaConsumer from pymongo import MongoClient print("this is consumer") consumer = KafkaConsumer('csvDataTopic',bootstrap_servers=['localhost:9092']) client = MongoClient(port=27017) db = client.db result = {} csv_data = [] header_arr = ['id','timestamp','first_name','last_name'] for message in consumer: # 收到的订阅消息处理 print(str(message.value)) csv_data = str(message.value).split(',') data = {header_arr[i] : str(csv_data[i]) for i in range(len(header_arr))} result = db.csvstats.insert_one(data) 三、执行过程

在Windows系统中打开第1个cmd窗口,执行如下命令启动Zookeeper服务:

> cd c:\kafka_2.12-2.4.0 > .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.Properties

打开第2个cmd窗口,然后执行下面命令启动Kafka服务:

> cd c:\kafka_2.12-2.4.0 > .\bin\windows\kafka-server-start.bat .\config\server.properties

打开第3个cmd窗口,执行如下命令创建一个名为test的Topic:

> cd c:\kafka_2.12-2.4.0 > .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic csvDataTopic

执行如下命令查看csvDataTopic是否创建成功:

> .\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

打开第4个cmd窗口,执行如下命令启动消费者:

> cd C:\Python38\mycode > python ./consumer.py

打开IDLE,在IDLE中打开文件producer.py并执行,从而让生产者产生数据。 打开第5个cmd窗口,执行如下命令进入MongoDB Shell执行环境:

> mongosh

然后,在MongoDB Shell环境中执行如下命令查看数据:

>show dbs; >use db; >show collections; >db.csvstats.find();

查询结果图如1所示。 图1 MongoDB数据库查询结果



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3