Hadoop MapReduce基本概念与详细流程 |
您所在的位置:网站首页 › hdfs基本概念 › Hadoop MapReduce基本概念与详细流程 |
Hadoop MapReduce是Hadoop 中一个批量计算的框架,在整个mapreduce作业的过程中,包括从数据的输入,数据的处理,数据的数据输入这些部分,而其中数据的处理部分就要map,reduce,combiner等操作组成。这篇文章,分享了MapReduce基本概念与详细流程,enjoy~~ 一、概念MapReduce是一种基于java的分布式计算的处理技术和程序模型。 MapReduce算法包含两个重要任务,即Map和Reduce。Map采用一组数据并将其转换为另一组数据,其中各个元素被分解为元组(键/值对)。其次,reduce任务,它将map的输出作为输入,并将这些数据元组合并成一组较小的元组。作为MapReduce名称的顺序,reduce任务总是在map作业之后执行。 MapReduce的主要优点是易于在多个计算节点上扩展数据处理。在MapReduce模型下,数据处理原语称为映射器和缩减器。将数据处理应用程序分解为映射器和简化器有时并不重要。但是,一旦我们以MapReduce形式编写应用程序,扩展应用程序以在集群中运行数百,数千甚至数万台机器只是一种配置更改。这种简单的可扩展性是吸引许多程序员使用MapReduce模型的原因。 二、MapReduce的基本流程MapReduce主要是先读取文件数据,然后进行Map处理,接着Reduce处理,最后把处理结果写到文件中。 三、MapReduce的主要流程 1,Record reader记录阅读器会翻译由输入格式生成的记录,记录阅读器用于将数据解析给记录,并不分析记录自身。记录读取器的目的是将数据解析成记录,但不分析记录本身。它将数据以键值对的形式传输给mapper。通常键是位置信息,值是构成记录的数据存储块.自定义记录不在本文讨论范围之内. 2,Map在映射器中用户提供的代码称为中间对。对于键值的具体定义是慎重的,因为定义对于分布式任务的完成具有重要意义.键决定了数据分类的依据,而值决定了处理器中的分析信息.本书的设计模式将会展示大量细节来解释特定键值如何选择. 3,Shuffle and Sortruduce任务以随机和排序步骤开始。此步骤写入输出文件并下载到本地计算机。这些数据采用键进行排序以把等价密钥组合到一起。 4,Reducereduce采用分组数据作为输入。该功能传递键和此键相关值的迭代器。可以采用多种方式来汇总、过滤或者合并数据。当reduce功能完成,就会发送0个或多个键值对。 5,输出格式输出格式会转换最终的键值对并写入文件。默认情况下键和值以tab分割,各记录以换行符分割。因此可以自定义更多输出格式,最终数据会写入HDFS。 四、MapReduce读取数据通过InputFormat决定读取的数据的类型,然后拆分成一个个InputSplit,每个InputSplit对应一个Map处理,RecordReader读取InputSplit的内容给Map。 1,InputFormat决定读取数据的格式,可以是文件或数据库等。 功能 (1)验证作业输入的正确性,如格式等 (2) 将输入文件切割成逻辑分片(InputSplit),一个InputSplit将会被分配给一个独立的Map任务 (3)提供RecordReader实现,读取InputSplit中的"K-V对"供Mapper使用 方法 List getSplits(): 获取由输入文件计算出输入分片(InputSplit),解决数据或文件分割成片问题 RecordReader createRecordReader(): 创建RecordReader,从InputSplit中读取数据,解决读取分片中数据问题。 类结构 TextInputFormat: 输入文件中的每一行就是一个记录,Key是这一行的byte offset,而value是这一行的内容 KeyValueTextInputFormat: 输入文件中每一行就是一个记录,第一个分隔符字符切分每行。在分隔符字符之前的内容为Key,在之后的为Value。分隔符变量通过key.value.separator.in.input.line变量设置,默认为(\t)字符。 NLineInputFormat: 与TextInputFormat一样,但每个数据块必须保证有且只有N行,mapred.line.input.format.linespermap属性,默认为1 SequenceFileInputFormat: 一个用来读取字符流数据的InputFormat,为用户自定义的。字符流数据是Hadoop自定义的压缩的二进制数据格式。它用来优化从一个MapReduce任务的输出到另一个MapReduce任务的输入之间的数据传输过程。 2,InputSplit代表一个个逻辑分片,并没有真正存储数据,只是提供了一个如何将数据分片的方法。 Split内有Location信息,利于数据局部化一个InputSplit给一个单独的Map处理。 public abstract class InputSplit { /** * 获取Split的大小,支持根据size对InputSplit排序. */ public abstract long getLength() throws IOException, InterruptedException; /** * 获取存储该分片的数据所在的节点位置. */ public abstract String[] getLocations() throws IOException, InterruptedException; } 3,RecordReader将InputSplit拆分成一个个对给Map处理,也是实际的文件读取分隔对象 4,问题大量小文件如何处理 CombineFileInputFormat可以将若干个Split打包成一个,目的是避免过多的Map任务(因为Split的数目决定了Map的数目,大量的Mapper Task创建销毁开销将是巨大的) 怎么计算split的 通常一个split就是一个block(FileInputFormat仅仅拆分比block大的文件),这样做的好处是使得Map可以在存储有当前数据的节点上运行本地的任务,而不需要通过网络进行跨节点的任务调度 通过mapred.min.split.size, mapred.max.split.size, block.size来控制拆分的大小 如果mapred.min.split.size大于block size,则会将两个block合成到一个split,这样有部分block数据需要通过网络读取 如果mapred.max.split.size小于block size,则会将一个block拆成多个split,增加了Map任务数(Map对split进行计算并且上报结果,关闭当前计算打开新的split均需要耗费资源) 先获取文件在HDFS上的路径和Block信息,然后根据splitSize对文件进行切分( splitSize = computeSplitSize(blockSize, minSize, maxSize) ),默认splitSize 就等于blockSize的默认值(64m) public List getSplits(JobContext job) throws IOException { // 首先计算分片的最大和最小值。这两个值将会用来计算分片的大小 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); // generate splits List splits = new ArrayList(); List files = listStatus(job); for (FileStatus file: files) { Path path = file.getPath(); long length = file.getLen(); if (length != 0) { FileSystem fs = path.getFileSystem(job.getConfiguration()); // 获取该文件所有的block信息列表[hostname, offset, length] BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); // 判断文件是否可分割,通常是可分割的,但如果文件是压缩的,将不可分割 if (isSplitable(job, path)) { long blockSize = file.getBlockSize(); // 计算分片大小 // 即 Math.max(minSize, Math.min(maxSize, blockSize)); long splitSize = computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining = length; // 循环分片。 // 当剩余数据与分片大小比值大于Split_Slop时,继续分片, 小于等于时,停止分片 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; } // 处理余下的数据 if (bytesRemaining != 0) { splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkLocations.length-1].getHosts())); } } else { // 不可split,整块返回 splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts())); } } else { // 对于长度为0的文件,创建空Hosts列表,返回 splits.add(makeSplit(path, 0, length, new String[0])); } } // 设置输入文件数量 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); LOG.debug("Total # of splits: " + splits.size()); return splits; }分片间的数据如何处理 split是根据文件大小分割的,而一般处理是根据分隔符进行分割的,这样势必存在一条记录横跨两个split 解决办法是只要不是第一个split,都会远程读取一条记录。不是第一个split的都忽略到第一条记录 public class LineRecordReader extends RecordReader { private CompressionCodecFactory compressionCodecs = null; private long start; private long pos; private long end; private LineReader in; private int maxLineLength; private LongWritable key = null; private Text value = null; // initialize函数即对LineRecordReader的一个初始化 // 主要是计算分片的始末位置,打开输入流以供读取K-V对,处理分片经过压缩的情况等 public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); compressionCodecs = new CompressionCodecFactory(job); final CompressionCodec codec = compressionCodecs.getCodec(file); // 打开文件,并定位到分片读取的起始位置 FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); boolean skipFirstLine = false; if (codec != null) { // 文件是压缩文件的话,直接打开文件 in = new LineReader(codec.createInputStream(fileIn), job); end = Long.MAX_VALUE; } else { // 只要不是第一个split,则忽略本split的第一行数据 if (start != 0) { skipFirstLine = true; --start; // 定位到偏移位置,下次读取就会从偏移位置开始 fileIn.seek(start); } in = new LineReader(fileIn, job); } if (skipFirstLine) { // 忽略第一行数据,重新定位start start += in.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE, end - start)); } this.pos = start; } public boolean nextKeyValue() throws IOException { if (key == null) { key = new LongWritable(); } key.set(pos);// key即为偏移量 if (value == null) { value = new Text(); } int newSize = 0; while (pos break; } pos += newSize; // 读取的数据长度小于最大行长度,也说明已读取完毕 if (newSize key = null; value = null; return false; } else { return true; } } }以上的内容来源网络,仅供学习交流,如有侵犯,联系删除哦! |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |