开源大数据案例(第1章 通话记录数据分析)思路,操作,及执行ct |
您所在的位置:网站首页 › 查询手机话单 › 开源大数据案例(第1章 通话记录数据分析)思路,操作,及执行ct |
目录 思路 代码 1.创建通用基础模块 2.数据生产模块 3.上传至Linux生成数据 思路 操作 思路 代码 执行 通话记录数据分析 项目背景通信运营商每时每刻会产生大量的通信数据,例如通话记录,短信记录,彩信记录,第三方服务资费等等繁多信息。数据量如此巨大,除了要满足用户的实时查询和展示之外,还需要定时定期的对已有数据进行离线的分析处理。例如,当日话单,月度话单,季度话单,年度话单,通话详情,通话记录等等+。我们以此为背景,寻找一个切入点,学习其中的方法论。当前我们的需求是:统计每天、每月以及每年的每个人的通话次数及时长。 项目架构 整体架构数据分析流程 数据展示流程 消费模型 项目实现 系统环境: 表1 系统 版本 windows 10 专业版 linux CentOS 6.8 开发工具: 表2 工具 版本 idea 2017.2.5旗舰版 maven 3.3.9 JDK 1.8+ 提示:idea2017.2.5必须使用maven3.3.9,不要使用maven3.5,有部分兼容性问题 集群环境: 表3 框架 版本 hadoop 2.7.2 zookeeper 3.4.10 hbase 1.3.1 flume 1.7.0 kafka 2.11-0.11.0.0 硬件环境: 表4 hadoop102 hadoop103 hadoop104 内存 4G 2G 2G CPU 2核 1核 1核 硬盘 50G 50G 50G 数据生产此情此景,对于该模块的业务,即数据生产过程,一般并不会让你来进行操作,数据生产是一套完整且严密的体系,这样可以保证数据的鲁棒性。但是如果涉及到项目的一体化方案的设计(数据的产生、存储、分析、展示),则必须清楚每一个环节是如何处理的,包括其中每个环境可能隐藏的问题;数据结构,数据内容可能出现的问题。 数据结构我们将在HBase中存储两个电话号码,以及通话建立的时间和通话持续时间,最后再加上一个flag作为判断第一个电话号码是否为主叫。姓名字段的存储我们可以放置于另外一张表做关联查询,当然也可以插入到当前表中。 表5 列名 解释 举例 call1 第一个手机号码 15369468720 call1_name 第一个手机号码人姓名(非必须) 李雁 call2 第二个手机号码 19920860202 call2_name 第二个手机号码人姓名(非必须) 卫艺 date_time 建立通话的时间 20171017081520 date_time_ts 建立通话的时间(时间戳形式) duration 通话持续时间(秒) 0600 编写代码 思路1.创建Java集合类存放模拟的电话号码和联系人; 2.随机选取两个手机号码当作“主叫”与“被叫”(注意判断两个手机号不能重复),产出call1与call2字段数据; 3.创建随机生成通话建立时间的方法,可指定随机范围,最后生成通话建立时间,产出date_time字段数据; 4.随机一个通话时长,单位:秒,产出duration字段数据; 5.将产出的一条数据拼接封装到一个字符串中; 6.使用IO操作将产出的一条通话数据写入到本地文件中; 代码首先新建一个maven项目 qingtaishuju-project-ct 作为整个工程的父项目,因为所有的业务不可能通过一个项目来实现,会有很多模块,比如有生产、消费、统计、展示等等。 1.创建通用基础模块新建ct-common,创建好之后导入项目所需要的依赖,如下所示。
org.apache.hbase hbase-server 1.3.1
org.apache.hbase hbase-client 1.3.1
依赖导入成功之后,接下来创建相应的包、接口和类,搭建基础的环境,模块结构如下
将对应的包复制到对应的 2.数据生产模块1. 通用模块创建好之后,新建数据生产模块:ct-producer 模块包结构如下:
2. 打开Bootstrap类,修改代码如下
3. 将cantact.log复制到Datas目录下(需要新建Datas目录)
4. 运行Bootstrap的main方法 5. 此时在Datas目录下自动生成call.log文件,并持续写入数据(点击可停止运行) 3.上传至Linux生成数据 1. 将主函数的输入输出路径写成动态的
2.打jar包(注意将程序打包成运行包,不是依赖包)
3.将数据包和jar包上传至Linux
运行jar包
数据采集/消费(存储) 欢迎来到数据采集模块(消费),在企业中你要清楚流式数据采集框架flume和kafka的定位是什么。我们在此需要将实时数据通过flume采集到kafka然后供给给hbase消费。 flume:cloudera公司研发 适合下游数据消费者不多的情况; 适合数据安全性要求不高的操作; 适合与Hadoop生态圈对接的操作。 kafka:linkedin公司研发 适合数据下游消费众多的情况; 适合数据安全性要求较高的操作(支持replication); 因此我们常用的一种模型是: 线上数据 --> flume --> kafka --> flume(根据情景增删该流程) --> HDFS 数据采集 思路1. 配置kafka,启动zookeeper和kafka集群; 2. 创建kafka主题; 3. 启动kafka控制台消费者(此消费者只用于测试使用); 4. 配置flume,监控日志文件; 5. 启动flume监控任务; 6. 观察测试。 操作启动zookeeper,kafka集群 zkServer.sh start kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties 创建kafka主题 kafka-topics.sh --zookeeper master:2181 --topic calllog --create --replication-factor 1 --partitions 3 启动zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties bin/zookeeper-server-start.sh -daemon config/zookeeper.properties #建议使用这种方式,不需要启动多个窗口 启动kafka bin/kafka-server-start.sh config/server.properties bin/kafka-server-start.sh -daemon config/server.properties #建议使用这种方式,不需要启动多个窗口
检查一下是否创建主题成功: kafka-topics.sh --zookeeper master:2181 --list
启动kafka控制台消费者,等待flume信息的输入 kafka-console-consumer.sh --bootstrap-server master:9092 -topic calllog --from-beginning 配置flume-env.sh export JAVA_HOME=/home/software/jdk1.8 export HADOOP_HOME=/home/software/hadoop 解决版本冲突 flume-ng
配置flume(flume-kafka.conf) # define a1.sources = r1 a1.sinks = k1 a1.channels = c1 # source a1.sources.r1.type = exec a1.sources.r1.command = tail -F -c +0 /opt/open/call.log # sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = master:9092,slave01:9092,slave02:9092 a1.sinks.k1.kafka.topic = calllog a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 # channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 启动flume flume-ng agent --conf $FLUME_HOME/conf --name a1 --conf-file $FLUME_HOME/conf/flume-kafka.conf -Dflume.root.logger=INFO,console 在flume所在的文件夹即安装目录./bin/flume-ng agent --conf ./conf --name a1 --conf-file ./conf/flume-kafka.conf -Dflume.root.logger=INFO,console
观察kafka控制台消费者是否成功显示产生的数据 数据消费 如果以上操作均成功,则开始编写操作HBase的代码,用于消费数据,将产生的数据实时存储在HBase中。 思路 编写kafka消费者,读取kafka集群中缓存的消息,并打印到控制台以观察是否成功;既然能够读取到kafka中的数据了,就可以将读取出来的数据写入到HBase中,所以编写调用HBaseAPI相关方法,将从Kafka中读取出来的数据写入到HBase;以上两步已经足够完成消费数据,存储数据的任务,但是涉及到解耦,所以过程中需要将一些属性文件外部化,HBase通用性方法封装到某一个类中。 代码创建新的module项目:ct_consumer 模块目录如下 执行 1.虚拟机执行jar包生产数据 java -jar ct-producer.jar contact.log call.log 2.flume采集 bin/flume-ng agent -c conf/ -n a1 -f /opt/open-bigdata-project/flume-2-kafka.conf 3.启动Hadoop(否则无法启动hbase)
4.启动虚拟机HBASE ./start-hbase.sh
4.idea消费者消费 消费完之后查看HBASE数据库 数据查询方式一 使用scan查看HBase中是否正确存储了数据,同时尝试使用过滤器查询扫描指定通话时长的数据。进行该单元测试前,需要先运行数据采集任务,确保HBase中已有数据存在。 新建工具过滤器工具类:HBaseFilterUtil 新建单元测试类:HBaseScanTest1 运行单元测试 数据消费测试项目成功后,则将项目打包后在linux中运行测试。 1) 打包HBase消费者代码 数据分析我们的数据已经完整的采集到了HBase集群中,这次我们需要对采集到的数据进行分析,统计出我们想要的结果。注意,在分析的过程中,我们不一定会采取一个业务指标对应一个mapreduce-job的方式,如果情景允许,我们会采取一个mapreduce分析多个业务指标的方式来进行任务。具体何时采用哪种方式,我们后续会详细探讨。 分析模块流程如图所示:
业务指标: a) 用户每天主叫通话个数统计,通话时间统计。 b) 用户每月通话记录统计,通话时间统计。 c) 用户之间亲密关系统计。(通话次数与通话时间体现用户亲密关系) 需求分析根据需求目标,设计出下述(3.2.2)表结构。我们需要按照时间范围(年月日),结合MapReduce统计出所属时间范围内所有手机号码的通话次数总和以及通话时长总和。 思路: a) 维度,即某个角度,某个视角,按照时间维度来统计通话,比如我想统计2017年所有月份所有日子的通话记录,那这个维度我们大概可以表述为2017年*月*日 b) 通过Mapper将数据按照不同维度聚合给Reducer c) 通过Reducer拿到按照各个维度聚合过来的数据,进行汇总,输出 d) 根据业务需求,将Reducer的输出通过Outputformat把数据 数据输入:HBase 数据输出:Mysql HBase中数据源结构: 标签 举例&说明 rowkey hashregion_call1_datetime_call2_flag_duration 01_15837312345_20170527081033_13766889900_1_0180 family f1列族:存放主叫信息 f2列族:存放被叫信息 call1 第一个手机号码 call2 第二个手机号码 date_time 通话建立的时间,例如:20171017081520 date_time_ts date_time对应的时间戳形式 duration 通话时长(单位:秒) flag 标记call1是主叫还是被叫(call1的身份与call2的身份互斥) a) 已知目标,那么需要结合目标思考已有数据是否能够支撑目标实现; b) 根据目标数据结构,构建Mysql表结构,建表; c) 思考代码需要涉及到哪些功能模块,建立不同功能模块对应的包结构。 d) 描述数据,一定是基于某个维度(视角)的,所以构建维度类。比如按照“年”与“手机号码”的组合作为key聚合所有的数据,便可以统计这个手机号码,这一年的相关结果。 e) 自定义OutputFormat用于对接Mysql,使数据输出。 f) 创建相关工具类。 MySQL数据表结构设计我们将分析的结果数据保存到Mysql中,以方便Web端进行查询展示。 1) :db_telecom.tb_contacts 用于存放用户手机号码与联系人姓名。 列 备注 类型 id 自增主键 int(11) NOT NULL telephone 手机号码 varchar(255) NOT NULL name 联系人姓名 varchar(255) NOT NULL 2) :db_telecom.tb_call 用于存放某个时间维度下通话次数与通话时长的总和。 列 备注 类型 id_date_contact 复合主键(联系人维度id,时间维度id) varchar(255) NOT NULL id_date_dimension 时间维度id int(11) NOT NULL id_contact 查询人的电话号码 int(11) NOT NULL call_sum 通话次数总和 int(11) NOT NULL DEFAULT 0 call_duration_sum 通话时长总和 int(11) NOT NULL DEFAULT 0 3) :db_telecom.tb_dimension_date 用于存放时间维度的相关数据 列 备注 类型 id 自增主键 int(11) NOT NULL year 年,当前通话信息所在年 int(11) NOT NULL month 月,当前通话信息所在月,如果按照年来统计信息,则month为-1。 int(11) NOT NULL day 日,当前通话信息所在日,如果是按照月来统计信息,则day为-1。 int(11) NOT NULL 4):db_telecom.tb_intimacy 用于存放所有用户用户关系的结果数据。 列 备注 类型 id 自增主键 int(11) NOT NULL intimacy_rank 好友亲密度排名 int(11) NOT NULL id_contact1 联系人1,当前所查询人 int(11) NOT NULL id_contact2 联系人2,与联系人为好友 int(11) NOT NULL call_count 两联系人通话次数 int(11) NOT NULL DEFAULT 0 call_duration_count 两联系人通话持续时间 int(11) NOT NULL DEFAULT 0 环境准备1) 新建module:ct_analysis 2) 创建包结构,如下图
3) 类表 类名 备注 CountDurationMapper 数据分析的Mapper类,继承自TableMapper CountDurationReducer 数据分析的Reducer类,继承自Reduccer CountDurationRunner 数据分析的驱动类,组装Job MySQLOutputFormat 自定义Outputformat,对接Mysql BaseDimension 维度(key)基类 BaseValue 值(value)基类 ComDimension 时间维度+联系人维度的组合维度 ContactDimension 联系人维度 DateDimension 时间维度 CountDurationValue 通话次数与通话时长的封装 JDBCUtil 连接Mysql的工具类 JDBCCacheBean 单例JDBCConnection IConverter 转化接口,用于根据传入的维度对象,得到该维度对象对应的数据库主键id DimensionConverter IConverter实现类,负责实际的维度转id功能 LRUCache 用于缓存已知的维度id,减少对mysql的操作次数,提高效率 Constants 常量类 需求实现将代码导入对应位置 运行测试将mysql驱动包放入到hadoop根目录的$HADOOP_HOME/share/hadoop/common/目录下将打包好的程序上传到master中3) 提交任务 cd 程序包目录 $ HADOOP_HOME/yarn jar ct_analysis-1.0-SNAPSHOT.jar 观察Mysql中的结果。 数据展示 环境准备新建module或项目:ct_web项目结构如下:
pom.xml配置文件: 2) 创建包结构 bean contants controller dao entries 3) 类表 类名 备注 CallLog 用于封装数据分析结果的JavaBean Contact 用于封装联系人的JavaBean Contants 常量类 CallLogHandler 用于处理请求的Controller CallLogDAO 查询某人某个维度通话记录的DAO ContactDAO 查询联系人的DAO QueryInfo 用于封装向服务器发来的请求参数 4) web目录结构,web部分的根目录:webapp 文件夹名 备注 css 存放css静态资源的文件夹 html 存放html静态资源的文件夹 images 存放图片静态资源文件夹 js 存放js静态资源的文件夹 jsp 存放jsp页面的文件夹 WEB-INF 存放web相关配置的文件夹 5) resources目录下创建spring相关配置文件 6) WEB-INF目录下创建web相关配置 7) 拷贝js框架到js目录下 编写代码思路: a) 首先测试数据通顺以及完整性,写一个联系人的测试用例。 b) 测试通过后,通过输入手机号码以及时间参数,查询指定维度的数据,并以图表展示。 代码:将代码复制到对应包中 最终预览查询人通话时长与通话次数统计大概如下所示: 折线图如图所示:
柱状图如图所示:
统一展示如图所示: 项目总结
重新总结梳理整个项目流程和方法论。 |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |