python编写kafka生产者实时生产数据 |
您所在的位置:网站首页 › kafka生产者实例 › python编写kafka生产者实时生产数据 |
写在前头: 更多大数据相关精彩内容请进我的知识星球,每周定期更新 ? ?正文 : ?创建topic实例(zk或者bootstrap,注意bootstrap只能在kafka2.2及以上版本才能用,可以用./kafka-topics.sh --help看是否有该参数) zk方式: ./kafka-topics.sh --zookeeper tdh09:2181,tdh10:2181,tdh11:2181 --create --topic passenger-flow2 --partitions 1 --replication-factor 2 --config message.timestamp.type=LogAppendTime bootstrap方式: ./kafka-topics.sh --bootstrap-server tdh09:9092, tdh10:9092,tdh11:9092 --create --topic passenger-flow-boot --partitions 1 --replication-factor 2 --config message.timestamp.type=LogAppendTime 验证是否创建成功: 消费者 ./kafka-console-consumer.sh --topic passenger-flow2 --bootstrap-server tdh09:9092, tdh10:9092,tdh11:9092 生产者 ./kafka-console-producer.sh --broker-list tdh09:9092,tdh10:9092,tdh11:9092 --topic passenger-flow2 kafka生产者程序所需包如下: kafka-python 代码如下: #-*-coding:utf-8-*- import datetime import random import json import time from kafka import KafkaProducer from kafka.errors import KafkaError producer=KafkaProducer(bootstrap_servers=['tdh09:9092','tdh10:9092','tdh11:9092'],acks=1, value_serializer=lambdam:json.dumps(m).encode('GBK')) topic='passenger-flow2' def simulate_passenger(): #模拟客流数据,关键字段有车站id,车站名,发生时间,设备id,客流量,进站还是出站 try: n=0 whileTrue: dic={} dic['stationId']=n dic['stationName']='车站1' dic['time']=datetime.datetime.now().strftime("%Y%m%d%H:%M:%S") dic['equipmentId']=1 dic['passengerNum']=random.randint(0,100) dic['inOrOut']=0 producer.send(topic=topic,value=dic) time.sleep(2) n=n+1 except KafkaError as e: print(e) finally: producer.close() if__name__=='__main__': simulate_passenger() |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |