python编写kafka生产者实时生产数据

您所在的位置:网站首页 kafka生产者实例 python编写kafka生产者实时生产数据

python编写kafka生产者实时生产数据

2022-05-17 08:38| 来源: 网络整理| 查看: 265

写在前头:

更多大数据相关精彩内容请进我的知识星球,每周定期更新

?

?正文 :

?创建topic实例(zk或者bootstrap,注意bootstrap只能在kafka2.2及以上版本才能用,可以用./kafka-topics.sh --help看是否有该参数)

zk方式:

./kafka-topics.sh --zookeeper tdh09:2181,tdh10:2181,tdh11:2181 --create --topic passenger-flow2 --partitions 1 --replication-factor 2 --config message.timestamp.type=LogAppendTime

bootstrap方式:

./kafka-topics.sh --bootstrap-server tdh09:9092, tdh10:9092,tdh11:9092 --create --topic passenger-flow-boot --partitions 1 --replication-factor 2 --config message.timestamp.type=LogAppendTime

验证是否创建成功:

消费者

./kafka-console-consumer.sh --topic passenger-flow2 --bootstrap-server tdh09:9092, tdh10:9092,tdh11:9092

生产者

./kafka-console-producer.sh --broker-list tdh09:9092,tdh10:9092,tdh11:9092 --topic passenger-flow2

kafka生产者程序所需包如下:

kafka-python

代码如下:

#-*-coding:utf-8-*- import datetime import random import json import time from kafka import KafkaProducer from kafka.errors import KafkaError producer=KafkaProducer(bootstrap_servers=['tdh09:9092','tdh10:9092','tdh11:9092'],acks=1, value_serializer=lambdam:json.dumps(m).encode('GBK')) topic='passenger-flow2' def simulate_passenger(): #模拟客流数据,关键字段有车站id,车站名,发生时间,设备id,客流量,进站还是出站 try: n=0 whileTrue: dic={} dic['stationId']=n dic['stationName']='车站1' dic['time']=datetime.datetime.now().strftime("%Y%m%d%H:%M:%S") dic['equipmentId']=1 dic['passengerNum']=random.randint(0,100) dic['inOrOut']=0 producer.send(topic=topic,value=dic) time.sleep(2) n=n+1 except KafkaError as e: print(e) finally: producer.close() if__name__=='__main__': simulate_passenger()



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3