Datawhale

您所在的位置:网站首页 怎么把一列数据分开求和呢 Datawhale

Datawhale

2023-03-31 00:27| 来源: 网络整理| 查看: 265

10. 1 面试题10.1.1 hive外部表和内部表的区别内部表的加载数据和创建表的过程是分开的,在加载数据时,实际数据会被移动到数仓目录中,之后对数据的访问是在数仓目录实现。而外部表加载数据和创建表是同一个过程,对数据的访问是读取HDFS中的数据;内部表删除时,因为数据移动到了数仓目录中,因此删除表时,表中数据和元数据会被同时删除。外部表因为数据还在HDFS中,删除表时并不影响数据。创建表时不做任何指定,默认创建的就是内部表。想要创建外部表,则需要使用External进行修饰10.1.2 简述对Hive桶的理解?

对于每一个表或者分区, Hive可以进一步组织成桶,也就是说分桶是更为细粒度的数据范围划分。Hive会计算桶列的哈希值再以桶的个数取模来计算某条记录属于那个桶。把表(或者分区)组织成桶(Bucket)有两个理由:

获得更高的查询处理效率。桶为表加上了额外的结构,Hive在处理有些查询时能利用这个结构。具体而言,连接两个在(包含连接列的)相同列上划分了桶的表,可以使用Map端连接(Map-side join)高效的实现。使取样(sampling)更高效。在处理大规模数据集时,在开发和修改查询的阶段,如果能在数据集的一小部分数据上试运行查询,会带来很多方便。10.1.3 HBase和Hive的区别?

HBase是一个面向列式存储、分布式、可伸缩的数据库,它可以提供数据的实时访问功能,而Hive只能处理静态数据,主要是BI报表数据。就设计初衷而言,在Hadoop上设计Hive,是为了减少复杂MapReduce应用程序的编写工作,在Hadoop上设计HBase是为了实现对数据的实时访问。所以,HBase与Hive的功能是互补的,它实现了Hive不能提供的功能。

10.1.4 简述Spark宽窄依赖

宽依赖和窄依赖的区别是RDD之间是否存在shuffle操作。

窄依赖指父RDD的每一个分区最多被一个子RDD的分区所用,即一个父RDD对应一个子RDD或多个父RDD对应一个子RDD

map,filter,union属于窄依赖窄依赖对于流水化作业有优化效果每一个RDD算子都是一个fork/join操作,join会写入磁盘,流水线作业优化后fork,中间不join写入磁盘

宽依赖指子RDD的每个分区都依赖于父RDD的多个分区

groupby和join属于宽依赖DAGScheduler从当前算子往前推,遇到宽依赖,就生成一个stage10.1.5 Hadoop和Spark的相同点和不同点10.1.6 Spark为什么比MapReduce块?

1)DAG相比Hadoop的MapReduce在大多数情况下可以减少磁盘I/O次数,而不是shuffle次数。因为MapReduce计算模型只能包含一个Map和一个Reduce,所以reduce完后必须进行落盘,而DAG可以连续shuffle的,也就是说一个DAG可以完成好几个MapReduce,所以DAG只需要在最后一次reduce落盘,这就比mapreduce少了,总shuffle次数越多,减少的落盘次数越多。

2)Spark shuffle的优化

MapReduce在Shuffle时默认进行排序。Spark在Shuffle时则只有部分场景才需要排序(bypass机制不需要排序)。排序是非常耗时的,这样就可以加快shuffle速度。

3)Spark支持将需要反复用到的数据缓存到内存中,下次再使用此RDD时,不用再次计算,而是直接从内存中获取,因此可以减少数据加载耗时,所以更适合需要迭代计算的机器学习算法。

10.1.7 说说你对Hadoop生态的认识

1.Hadoop是Apache旗下的一套开源软件平台,是用来分析和处理大数据的软件平台。

2.Hadoop提供的功能:利用服务器集群,根据用户的自定义业务逻辑, 对海量数据进行分布式处理。3.Hadoop的核心组件:由底层往上分别是 HDFS、Yarn、MapReduce。

随着处理任务不同,各种组件相继出现,丰富Hadoop生态圈,目前生态圈结构大致如图所示:

10.2 实战

从新闻文章中发现热门话题和趋势话题是舆论监督的一项重要任务。在这个项目中,你的任务是使用新闻数据集进行文本数据分析,使用 Python 中 pySpark 的 RDD 和 DataFrame 的 API。问题是计算新闻文章数据集中每年各词的权重,然后选择每年 top-k个最重要的词。

project_df_template.pyfrom pyspark.sql.session import SparkSession from pyspark.sql.functions import * import sys from math import log10 def takeTopK(pairList, k): #填入获得前k个排名的函数 pairList.sort(key=lambda x:x[1],reverse=True) return pairList[:k] class Project2: def run(self, inputPath, outputPath, stopwords, k): spark = SparkSession.builder.master("local").appName("project2_df").getOrCreate() #填入以下两个函数的参数 fileDF = spark.read.text(inputPath) swlist = spark.sparkContext.broadcast(stopwords) headlineDF = fileDF.select(split(fileDF['value'], ',').getItem(0).substr(0, 4).alias('year'), split(fileDF['value'], ',').getItem(1).alias('headline')) headlineDF = headlineDF.withColumn('word', split('headline', ' ')) headlineDF2 = headlineDF.select('year', array_distinct(headlineDF.word).alias('word')) headlineDF3 = headlineDF2.withColumn('word', explode('word')) yearwordsDF = headlineDF3[~headlineDF3.word.isin(swlist.value)] #写出TF计算的函数 TF =yearwordsDF.groupBY(['year','word']).count().alias('TF') totalYear = yearwordsDF.select('year').distinct().count() #写出DF计算的函数 DF = yearwordsDF.groupBY('word').countDistinct('year').alias('DF') TFIDF = TF.join(DF, TF.word == DF.word).select(TF.year, TF.word, TF.TF, DF.DF) weightDF = TFIDF.withColumn('weight', round(TFIDF.TF * log10(totalYear / TFIDF.DF), 6)).select('year', 'word', 'weight') groupDF = weightDF.groupBy('year').agg(collect_list(struct('word', 'weight')).alias('pair')) topkUDF = udf(lambda z, k: takeTopK(z, k)) res = groupDF.withColumn('pair', topkUDF('pair', lit(int(k)))).orderBy('year') res2 = res.coalesce(1).orderBy('year').withColumn('result', concat_ws('\t', 'year', 'pair')).select('result') res2.write.text(outputPath) spark.stop() if __name__ == "__main__": if len(sys.argv) != 5: print("Wrong arguments") sys.exit(-1) Project2().run(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4])project_rdd_template.pyfrom pyspark import SparkContext, SparkConf from operator import add from math import log10 import heapq import sys def takeTopK(pairList, k): #填入获得前k个排名的函数 return heaps.nlargest(k,pairList) class Project2: def run(self, inputPath, outputPath, stopwords, k): conf = SparkConf().setAppName("project2_rdd") sc = SparkContext(conf=conf) # 填入以下两个函数的参数 filerdd = sc.textFile(inputPath) swlist = sc.broadcast(stopwords) #写出以下标题、年度词、TF、总年计算的函数 headlines = filerdd.map(lambda x:x.split(“,”)) yearwords = TF = totalYear = # DF = yearwords.map(lambda x:(x[0][1], x[0][0])).countByKey() DF = yearwords.map(lambda x: (x[0][1], x[0][0])).groupByKey().map(lambda x: (x[0], len(x[1]))) TFIDF = TF.join(DF).map(lambda x: (x[1][0][0], (round(x[1][0][1] * log10(totalYear / x[1][1]), 6), x[0]))) res = TFIDF.groupByKey().map(lambda x: (x[0], list(x[1]))).mapValues(lambda x: takeTopK(x, int(k))) res2 = res.coalesce(1).sortByKey().map( lambda x: f'{x[0]}\t{";".join([item[1] + "," + str(item[0]) for item in x[1]])}') res2.saveAsTextFile(outputPath) sc.stop() if __name__ == "__main__": if len(sys.argv) != 5: print("Wrong arguments") sys.exit(-1) Project2().run(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4])



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3