Hive mapreduce的map与reduce个数由什么决定?

您所在的位置:网站首页 cpu体质由什么决定 Hive mapreduce的map与reduce个数由什么决定?

Hive mapreduce的map与reduce个数由什么决定?

2024-07-03 23:23| 来源: 网络整理| 查看: 265

文章目录 1.MapTask的数量决定2.如何来调整MapTask的数量2.1 增加map的数量 : 调小maxsize (要小于blockSize才有效,比如100byte)2.2 减少map的数量 : 调大minSize (要大于blockSize才有效,比如250M)2.3生产中一般不调整,但是要知道原理。 3. ReduceTask的数量决定3.1 在执行hive shell的时候可以看到下列日志3.2 官网对这三个参数的解释3.3 通过源码分析 hive是如何通过方式一 动态计算reduce的个数的 4.如何调整reduceTask的数量4.1.hive.exec.reducers.bytes.per.reducer与hive.exec.reducers.max4.2.mapreduce.job.reduces4.3 设置reduce数量无效的情况4.3.1 order by4.3.2 笛卡尔积4.3.3 map端输出的数据量很小

1.MapTask的数量决定 (1)文件的个数 (2)文件大小 (3)blocksize

简单来说:输入的目录中文件的数量决定多少个map会被运行起来,应用针对每一个分片运行一个map,一般而言,对于每一个输入的文件会有一个map split(我们常常所说的小文件问题,可能引起Hadoop集群资源雪崩,就是这个原因。如在Flink实时写入Hive表场景,如果不进行小文件合并,极可能导致Hadoop集群资源问题)。如果输入文件太大,超过了hdfs块的大小(128M)那么对于同一个输入文件我们会有多余2个的map运行起来

mapTask的数量由文件决定的数学含义:

MapReduce的每一个map处理数据是不能跨越文件的。也就是说minMapNum>=inputFileNum,所以,最终的map个数应该为: mapNum=max(computeMapNum,inputFileNum)

下面讲述文件大小和blockSize是如果影响mapTask的数量的。

2.如何来调整MapTask的数量 在不改变blockSize的情况下,变更map的情况

计算splitSize的公式:

//下面的调整都是按照此公式进行调整(具体解释看我的另外一篇博客 https://blog.csdn.net/lihuazaizheli/article/details/107370695) splitSize=Min(maxSize,Max(minSize,blockSize)) 当 minSize《 blockSize 《 maxSize 时,splitSize=blockSize 当 blockSize《 minSize《 maxSize 时,splitSize=minSize ,可应用于减少map数量,调大minSize(至少大于blockSize) 当 minSize《 maxSize 《blockSize 时,splitSize=maxSize , 可应用于增加map数量,调小maxSize(至少小于blockSize)

如果开启了本地测试模式,需要set hive.exec.mode.local.auto=fase关闭才能够看出具的map与reduce个数的日志

重要结论: (1)【重要】如果不进行任何设置,默认的map个数是和blcok_size相关的。 default_num = 输入文件的整体大小 / block_size; 上面所讲的是不改变blockSize的情况下变更map数量,在生产中常可以通过改变block_size来改变map的个数。 (2)可以通过参数mapred.map.tasks来设置程序员期望的map个数,但是这个个数只有在大于default_num的时候,才会生效。 (3)一般是结合hive使用,在hive中的更加通用的配置(下文2.1中介绍的是fileinputformat的输入形式配置) a.minSize相关的参数:

hive (default)> set mapred.min.split.size; mapred.min.split.size=1 hive (default)> set mapred.min.split.size.per.node; mapred.min.split.size.per.node=1 hive (default)> set mapred.min.split.size.per.rack; mapred.min.split.size.per.rack=1

b.maxSize相关的参数

hive (default)> set mapred.max.split.size; mapred.max.split.size=256000000

c.本地聚合参数

hive (default)> set hive.input.format; hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat #官网解释 hive.input.format Default Value: org.apache.hadoop.hive.ql.io.CombineHiveInputFormat Added In: Hive 0.5.0 The default input format. Set this to HiveInputFormat if you encounter problems with CombineHiveInputFormat. Also see: Configuration Properties#hive.tez.input.format 2.1 增加map的数量 : 调小maxsize (要小于blockSize才有效,比如100byte)

注意,下面的例子是fileinputformat格式的数据 方式一:

(1)maxsize的默认值为256M hive (default)> set mapreduce.input.fileinputformat.split.maxsize; mapreduce.input.fileinputformat.split.maxsize=256000000 mapreduce.input.fileinputformat.split.maxsize (2) 将次值调小为100字节,此时splitSize=100; set mapreduce.input.fileinputformat.split.maxsize=100; select count(1) from empt; 可以看到日志 number of mappers: 2; number of reducers: 1

方式二:

设置mapred.map.tasks 为一个较大的值(大于default_num=输入文件整体大小/splitSize)。 2.2 减少map的数量 : 调大minSize (要大于blockSize才有效,比如250M) (1)minsize的默认值是1 hive (default)> set mapreduce.input.fileinputformat.split.minsize; mapreduce.input.fileinputformat.split.minsize=1 (2)调整minsize为256M,此时splitSize=256M set mapreduce.input.fileinputformat.split.minsize=256000000; select count(1) from empt; 2.3生产中一般不调整,但是要知道原理。 一个文件有2列数据,有128M, 默认blockSize为128M的情况下,只有1个map,这个时候可以将map数量调大,以更快的处理数据。 3. ReduceTask的数量决定 reduce个数决定文件的输出个数 (1)决定方式一 后面看源码解析 hive.exec.reducers.bytes.per.reducer 参数1(default:256M) hive.exec.reducers.max 参数2 (default:1009) min(参数2, 总数据量/参数1) 一般参数2的值不变动,在普通集群规模下,hive根据数据量自动决定reduce的个数为: 输入总数据量/hive.exec.reducers.bytes.per.reducer (2)决定方式二 set mapreduce.job.reduces=1; 3.1 在执行hive shell的时候可以看到下列日志 Number of reduce tasks determined at compile time: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer= In order to limit the maximum number of reducers: set hive.exec.reducers.max= In order to set a constant number of reducers: set mapreduce.job.reduces= 这三个参数决定了reduce的个数,下面详细解释 3.2 官网对这三个参数的解释

(1)hive.exec.reducers.bytes.per.reducer

Default Value: 1,000,000,000 prior to Hive 0.14.0; 256 MB (256,000,000) in Hive 0.14.0 and later Added In: Hive 0.2.0; default changed in 0.14.0 with HIVE-7158 (and HIVE-7917) Size per reducer. The default in Hive 0.14.0 and earlier is 1 GB, that is, if the input size is 10 GB then 10 reducers will be used. # 重点看这句话 In Hive 0.14.0 and later the default is 256 MB, that is, if the input size is 1 GB then 4 reducers will be used. # chd5.16.2 hive环境中是128M hive (default)> set hive.exec.reducers.bytes.per.reducer; hive.exec.reducers.bytes.per.reducer=134217728

(2)hive.exec.reducers.max

Default Value: 999 prior to Hive 0.14.0; 1009 in Hive 0.14.0 and later Added In: Hive 0.2.0; default changed in 0.14.0 with HIVE-7158 (and HIVE-7917) Maximum number of reducers that will be used. If the one specified in the configuration property Configuration Properties#mapred.reduce.tasks is negative, Hive will use this as the maximum number of reducers when automatically determining the number of reducers # chd5.16.2 hive环境 hive (default)> set hive.exec.reducers.max; hive.exec.reducers.max=500

(3)mapreduce.job.reduces

Default Value: -1 (disabled) Added in: Hive 1.1.0 with HIVE-7567 Sets the number of reduce tasks for each Spark shuffle stage (e.g. the number of partitions when performing a Spark shuffle). This is set to -1 by default (disabled); instead the number of reduce tasks is dynamically calculated based on Hive data statistics. Setting this to a constant value sets the same number of partitions for all Spark shuffle stages. # 重点读这句话, 默认-1是禁用,并且是根据hive的数据量动态计算reduce的个数 the number of reduce tasks is dynamically calculated based on Hive data statistics. # chd5.16.2 hive环境 hive (default)> set mapreduce.job.reduces; mapreduce.job.reduces=-1 3.3 通过源码分析 hive是如何通过方式一 动态计算reduce的个数的 在org.apache.hadoop.hive.ql.exec.mr包下的 MapRedTask类中 //方法类调用逻辑 MapRedTask | ----setNumberOfReducers | ---- estimateNumberOfReducers |---- estimateReducers

(1)核心方法setNumberOfReducers

/** * Set the number of reducers for the mapred work. */ private void setNumberOfReducers() throws IOException { ReduceWork rWork = work.getReduceWork(); // this is a temporary hack to fix things that are not fixed in the compiler // 获取通过外部传参设置reduce数量的值 rWork.getNumReduceTasks() Integer numReducersFromWork = rWork == null ? 0 : rWork.getNumReduceTasks(); if (rWork == null) { console .printInfo("Number of reduce tasks is set to 0 since there's no reduce operator"); } else { if (numReducersFromWork >= 0) { //如果手动设置了reduce的数量 大于等于0 ,则进来,控制台打印日志 console.printInfo("Number of reduce tasks determined at compile time: " + rWork.getNumReduceTasks()); } else if (job.getNumReduceTasks() > 0) { //如果手动设置了reduce的数量,获取配置中的值,并传入到work中 int reducers = job.getNumReduceTasks(); rWork.setNumReduceTasks(reducers); console .printInfo("Number of reduce tasks not specified. Defaulting to jobconf value of: " + reducers); } else { //如果没有手动设置reduce的数量,进入方法 if (inputSummary == null) { inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null); } //【重中之中】estimateNumberOfReducers int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(), work.isFinalMapRed()); rWork.setNumReduceTasks(reducers); console .printInfo("Number of reduce tasks not specified. Estimated from input data size: " + reducers); } //hive shell中所看到的控制台打印日志就在这里 console .printInfo("In order to change the average load for a reducer (in bytes):"); console.printInfo(" set " + HiveConf.ConfVars.BYTESPERREDUCER.varname + "="); console.printInfo("In order to limit the maximum number of reducers:"); console.printInfo(" set " + HiveConf.ConfVars.MAXREDUCERS.varname + "="); console.printInfo("In order to set a constant number of reducers:"); console.printInfo(" set " + HiveConf.ConfVars.HADOOPNUMREDUCERS + "="); } }

(2)如果没有手动设置reduce的个数,hive是如何动态计算reduce个数的?

int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(), work.isFinalMapRed()); /** * Estimate the number of reducers needed for this job, based on job input, * and configuration parameters. * * The output of this method should only be used if the output of this * MapRedTask is not being used to populate a bucketed table and the user * has not specified the number of reducers to use. * * @return the number of reducers. */ public static int estimateNumberOfReducers(HiveConf conf, ContentSummary inputSummary, MapWork work, boolean finalMapRed) throws IOException { // bytesPerReducer默认值为256M BYTESPERREDUCER("hive.exec.reducers.bytes.per.reducer", 256000000L) long bytesPerReducer = conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER); //maxReducers的默认值1009 MAXREDUCERS("hive.exec.reducers.max", 1009) int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS); //对totalInputFileSize的计算 double samplePercentage = getHighestSamplePercentage(work); long totalInputFileSize = getTotalInputFileSize(inputSummary, work, samplePercentage); // if all inputs are sampled, we should shrink the size of reducers accordingly. if (totalInputFileSize != inputSummary.getLength()) { LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers=" + maxReducers + " estimated totalInputFileSize=" + totalInputFileSize); } else { LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers=" + maxReducers + " totalInputFileSize=" + totalInputFileSize); } // If this map reduce job writes final data to a table and bucketing is being inferred, // and the user has configured Hive to do this, make sure the number of reducers is a // power of two boolean powersOfTwo = conf.getBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT_NUM_BUCKETS_POWER_TWO) && finalMapRed && !work.getBucketedColsByDirectory().isEmpty(); //【真正计算reduce个数的方法】看源码的技巧return的方法是重要核心方法 return estimateReducers(totalInputFileSize, bytesPerReducer, maxReducers, powersOfTwo); }

(3) 计算reduce个数的方法 estimateReducers

public static int estimateReducers(long totalInputFileSize, long bytesPerReducer, int maxReducers, boolean powersOfTwo) { // 假设totalInputFileSize 1000M // bytes=Math.max(1024M,256M)=1000M double bytes = Math.max(totalInputFileSize, bytesPerReducer); //reducers=(int)Math.ceil(1000M/256M)=4 此公式说明如果totalInputFileSize 小于256M ,则reducers=1 ;繁殖 则通过 int reducers = (int) Math.ceil(bytes / bytesPerReducer); //Math.max(1, 4)=4 ,reducers的结果还是4 reducers = Math.max(1, reducers); //Math.min(1009,4)=4; reducers的结果还是4 reducers = Math.min(maxReducers, reducers); int reducersLog = (int)(Math.log(reducers) / Math.log(2)) + 1; int reducersPowerTwo = (int)Math.pow(2, reducersLog); if (powersOfTwo) { // If the original number of reducers was a power of two, use that if (reducersPowerTwo / 2 == reducers) { // nothing to do } else if (reducersPowerTwo > maxReducers) { // If the next power of two greater than the original number of reducers is greater // than the max number of reducers, use the preceding power of two, which is strictly // less than the original number of reducers and hence the max reducers = reducersPowerTwo / 2; } else { // Otherwise use the smallest power of two greater than the original number of reducers reducers = reducersPowerTwo; } } return reducers; } 4.如何调整reduceTask的数量 调整hive的reduce个数的两种方法: 4.1.hive.exec.reducers.bytes.per.reducer与hive.exec.reducers.max a.解释: 在生产中,一般不调整这两个参数,这两个参数是 如果我们不指定hive的reduce个数,hive程序通过上面两个参数进行动态计算 决定reduce的个数。 b.生产一般不调整 4.2.mapreduce.job.reduces a.解释: 一般在生产中对reduce的个数也不做太多调整,但是有时候reduce的个数太多,hdfs上的小文件太多。 此时就可以通过 调小mapreduce.job.reduces的个数,来减少hdfs上输出文件的个数。 b.生产手动调整reduce个数,使用此参数 c.案例 可以看到number of reducers的个数为5 hive (default)>set mapreduce.job.reduces=5; hive (default)> select * from empt sort by length(ename); Query ID = root_20200725161515_730c6c65-9945-4cec-bbaa-e284bcdbb3ce Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks not specified. Defaulting to jobconf value of: 5 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer= In order to limit the maximum number of reducers: set hive.exec.reducers.max= In order to set a constant number of reducers: set mapreduce.job.reduces= Starting Job = job_1594965583131_0089, Tracking URL = http://hadoop:7776/proxy/application_1594965583131_0089/ Kill Command = /hadoop/hadoop/bin/hadoop job -kill job_1594965583131_0089 Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 5 4.3 设置reduce数量无效的情况 4.3.1 order by sql中使用了order by ,由于order by是全局排序,只能在一个reduce中完成,无论怎么调整reduce的数量都是无效的。 hive (default)>set mapreduce.job.reduces=5; hive (default)> select * from empt order by length(ename); Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1 4.3.2 笛卡尔积 可以看到此案例中的笛卡尔积不走reduce,无论设置多少个都无效。其实本质原因是 数据量比较小,hive的0.10以上版本都是将Mapjoin开启的。 hive (default)>set mapreduce.job.reduces=5; hive (default)> select a.*,b.* from empt a join dept b; Warning: Map Join MAPJOIN[7][bigTable=a] in task 'Stage-3:MAPRED' is a cross product Hadoop job information for Stage-3: number of mappers: 1; number of reducers: 0 关闭掉hive-site.xml中的mapJoin参数 hive.auto.convert.join false 开启MapJoin功能 再次执行,可以看到日志,笛卡尔积其实也是全局聚合,只能够一个reduce来处理。即使设置了5个reduce也没有效果。 hive (default)> set mapreduce.job.reduces=5; hive (default)> select a.*,b.* from empt a join dept b; Warning: Map Join MAPJOIN[16][bigTable=?] in task 'Stage- 3:MAPRED' is a cross product Warning: Shuffle Join JOIN[4][tables = [a, b]] in Stage 'Stage-1:MAPRED' is a cross product Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 1 4.3.3 map端输出的数据量很小

计算reduce个数的方法 estimateReducers中有这三行代码:

int reducers = (int) Math.ceil(bytes / bytesPerReducer); reducers = Math.max(1, reducers); reducers = Math.min(maxReducers, reducers);

如果map端输出的数据量(假如只有1M)小于hive.exec.reducers.bytes.per.reducer(default:256M)参数值,maxReducers默认为1009,可以计算

int reducers = (int) Math.ceil(1 / 256M)=1; reducers = Math.max(1, 1)=1; reducers = Math.min(1009, 1)=1;

所以此时即使你set mapreduce.job.reduces=10是没用的,最后还是只有1个。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3