Hadoop学习

您所在的位置:网站首页 hadoop中的mapreduce主要思想是那两个 Hadoop学习

Hadoop学习

2024-07-15 07:27| 来源: 网络整理| 查看: 265

  上一篇参考Hadoop学习——MapReduce的简单介绍及执行步骤

MapReduce的组件

  组件是实现MapReduce的真正干活的东西,即我们的业务逻辑,就是要写到这里边来的。MapReduce共有4个组件

一、Mapper组件 介绍

  可以读取文件,默认是一行一行读取,把输入 key和value通过map()传给程序员,输出key和value由业务来决定。MR框架会按照Mapper的输出key做排序,输出key如果要自定义排序,可以实现WritableComparable接口

补充

  MapTask的数量 = 切片数量,即有几个切片,代码执行的时候,就会有几个mapTask。上一篇说的是等于切块数量,实际上等于切片更加贴切,切片本质上是一个对象,封装了文件块的描述信息,其中是不包含真正的数据的,切块是真正的数据。

二、Reducer组件 介绍

  接收Mapper组件的输出key和value,然后按相同key做聚合。

补充

  ReduceTask任务数量通过代码来指定。默认为1。

三、Partitioner组件 介绍

  分区组件,分区概念等同于ReduceTask,即有几个ReduceTask,就有几个分区。默认的分区器是HashPartitioner,作用是按照Mapper输出key的hash分区,可以确保相同的key落到同一个分区,此外可以自定义分区,即写一个类继承Partitioner,最后在Driver指定分区方法即可。

补充

  类:HashPartitioner是默认的排序组件,底层用的是简单的hash算法,这种分区发可能会产生数据倾斜。

四、Combiner组件 介绍

  合并组件,作用是让合并工作在MapTask提前发生,可以减少reduceTask的合并负载,然后再发给Reduce端进一步执行。

补充

  开发一个Combine组件即写一个类,同样继承Reducer。然后在Driver中通过job.setCombinerClass()来指定。   combine组件,如果不设定,默认是没有combine过程的。   使用combine机制,不能改变最后的结果,即写法跟后边的reducer内容是一样的。

JAVA API写法

  因为要利用到API,所以需要先引入包,引包的话,尽量与hadoop的版本一致,首先你要创建一个java project项目,并下载一个hadoop项目到本地,需要用到它里边的jar包。如果想知道引哪些包,可以参考这篇Hadoop Intellij IDEA本地开发环境搭建

1、简单的例子(利用mapper组件)

  首先将待处理文件上传到分布式文件系统。比如,如下文件word.txt的内容,现在对于它简单的输出一下:

hello word hello word hellohadoop01 hellohadoop01

  将该文件上传到hdfs里的word文件夹下。如果hdfs环境还没有搭建,可以参考我的Hadoop学习——简单介绍及单点配置步骤。

  然后在开发工具里创建一个新的,编写一个mapper类,这个类里面实际上相当于创建一个MapTask,具体实现如下:

package mrDay1.mapreduce.word; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * * 新建一个WordCountMapper类,并继承Mapper * 1.job的MapTask如何处理文件块数据,是由Mapper组件来决定的,此类的代码需要程序员自行编写 * 2.开发一个Mapper组件的方式是让一个类继承Mapper * 3.第一个泛型类型对应的MapTask的输入key的类型(输入key:每行的行首偏移量,其类型是LongWritable) * 4.第二个泛型类型对应的MapTask的输入Value的类型(输入Value:每行的内容,其类型是Text) * 5.writable机制是hadoop的序列化机制 * 常用的类型:LongWritable、IntWritable、Text(String)、NullWritable * 6.在类里面重写map方法,用于将输入key和输入value传给程序员,有一行数据,该方法调用一次 * 7.WordCountMapper中的第三个泛型类型是MapTask的输出key类型 * 8.WordCountMapper中的第四个发型类型是MapTask的输出value类型 * */ public class WordCountMapper extends Mapper{ @Override //重写map()方法 protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { //将输入key和输入value直接输出,来验证一下输入key为行首偏移量,value为每行数据 context.write(key, value); } }

  想要执行该任务,现在还少一个驱动程序,接下来编写驱动程序的类,具体实现如下

package mrDay1.mapreduce.word; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //获取job对象 Job job = Job.getInstance(conf); //设置job方法入口的驱动类 job.setJarByClass(WordCountDriver.class); //设置Mapper组件类 job.setMapperClass(WordCountMapper.class); //设置mapper的输出key类型 job.setMapOutputKeyClass(LongWritable.class); //设置Mappper的输出value类型,注意Text的导包问题 job.setMapOutputValueClass(Text.class); //设置输入路径,下边的ip即是hadoop的安装主机名 //9000端口是表示hdfs的rpc通信端口 //如果指定的是目录,会执行当前目录下的所有非“_”开头的文件。 FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop01:9000/word")); //设置输出结果路径,要求结果路径事先不能存在 FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/word/result")); //提交job,产生阻塞,直到job任务执行完成后才放开 job.waitForCompletion(true); } }

  然后将编写完成的项目打成jar包,并指定执行类。

  将jar包上传到Linux服务器,首先运行jps命令,确保ResourceManager、NodeManager这两个进程已经运行,如果这两个进程没有运行,当执行hadoop jar命令时会报错。

  如果运行成功,则可以直接输入如下命令执行jar包。

hadoop jar wordcount.jar

  其中wordcount.jar即是打好的jar包。当执行结束,可以通过命令查看HDFS里的word/result文件夹下,会有一个part-r-00000文件(如果reduce任务有多个,会出来多个文件),即结果,可以通过命令找到并查看该文件。

hadoop fs -cat /word/result/part-r-00000

  会得到如下内容,即是我们的计算结果:

0 hello word 0 hello word 11 hellohadoop01 11 hellohadoop01

  以上即是一个简单的含有Mapper组件的写法。

2、统计文件中的单词数量(利用mapper+reducer组件)

  统计一个“word.txt”文件中不同单词出现的次数。文件内容具体如下:

hello word hello hadoop01 i am a programer

  实现方法如下:

  首先编写Mapper类:

package mrDay1.mapreduce.word.example; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class EXWordCountMapper extends Mapper { @Override protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { String line = value.toString(); //text类型没有截取方法,则转为string String[] words = line.split(" "); for (String word : words) { context.write(new Text(word), new IntWritable(1)); } } }

  如果单纯的只写mapper组件,结果会如下:

a 1 am 1 hadoop01 1 hello 1 hello 1 i 1 programer 1 word 1

  现在假如还要将上边的结果按照相同的key做聚合,那就需要reduce组件了

  Reduce的工作原理:将相同的key做聚合,将value形成迭代器。以下为其API实现:

首先开发ExWordCountReducer

package mrDay1.mapreduce.word.reduce; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * 1.第一个泛型类型对应的是reducer的输出key类型 * 2.第二个泛型类型对应的是reducer的输出value类型 * 3.第三个泛型类型是reduce的输出key类型 * 4.第四个泛型类型是reduce的输出value类型 * 5.reduce组件不能单独存在,因为要接收Mapper组件的输出 * 6.Mapper组件可以单独存在,当只有Mapper时,最后的结果文件时MapTask的输出 * 7.当既有Mapper又有Reduce时,最后的结果文件时Reduce的输出而Mapper的输出作为中间结果。 * */ public class ExWordCountReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Reducer.Context context) throws IOException, InterruptedException { String result = ""; for(IntWritable v : values) { result = result + "," + v.get(); } //做测试,看一下reducer组件传进来的key和Iterable context.write(key, new Text(result)); } }

然后修改driver,只需在driver中配置reduce即可:

package mrDay1.mapreduce.word; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import mrDay1.mapreduce.word.mapper.ExWordCountMapper; import mrDay1.mapreduce.word.reduce.ExWordCountReducer; public class ExWordCountDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //获取job对象 Job job = Job.getInstance(conf); //设置job方法入口的驱动类 job.setJarByClass(ExWordCountDriver.class); //设置Mapper组件类 job.setMapperClass(ExWordCountMapper.class); //设置mapper的输出key类型 job.setMapOutputKeyClass(LongWritable.class); //设置Mappper的输出value类型,注意Text的导包问题 job.setMapOutputValueClass(Text.class); //设置reduce组件类 job.setReducerClass(ExWordCountReducer.class); //设置reduce输出的key和value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //设置输入路径 FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop01:9000/word")); //设置输出结果路径,要求结果路径事先不能存在 FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/word/result")); //提交job job.waitForCompletion(true); } }

配置完成之后,打jar包并上传到分布式文件系统,执行即可,执行结果如下图。

a ,1 am ,1 hadoop01 ,1 hello ,1,1 i ,1 programer ,1 word ,1

注意:

 ① 要保证MapperTask类中的第三、四泛型类型与ReduceTask类中的第一、二泛型类型相同,因为MapperTask中输出作为ReduceTask中的输入使用,如果不相同则会错误。  ② 要保证MapperTask中的输出key和输出value的类型与driver中设置的输出的key和value相同,否则也会报错。  ReduceTask默认是一个1个分区,在生成结果文件的时候,只会生成一个,默认是以0开始的,比如part-r-00000,如果想要设置成3个分区,只需要在Driver里,加一行配置job.setNumReduceTasks(int tasks); 即可,最终的结果文件,也会有3个,依然是从0开始。比如part-r-00000、part-r-00001、part-r-00002

  关于Partitioner组件和 Combiner组件 写到了下一篇里。

  下一篇,Hadoop学习——MapReduce的组件及简单API(二)



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3