MapReduce基础编程(自定义序列化、自定义分区、自定义排序、自定义分组)

您所在的位置:网站首页 自定义序列设置的步骤是 MapReduce基础编程(自定义序列化、自定义分区、自定义排序、自定义分组)

MapReduce基础编程(自定义序列化、自定义分区、自定义排序、自定义分组)

2024-07-06 08:07| 来源: 网络整理| 查看: 265

文章目录 1 MapReduce自定义对象序列化StateBean.javamapper.javareducer.javadriver.java 2 MapReduce自定义排序sortBean.javamapper.javareducer.javadriver.java 3 MapReduce自定义分区StatePartitioner.javamapper.javareducer.javadriver.java 4 MapReduce自定义分组groupBean.javamapper.javagroupingComparator.javareducer.java(自定义分组关键)driver.java 该篇内容以covid数据来进行实验

1 MapReduce自定义对象序列化

数据说明:date(日期),county(县),state(州),fips(县编码code),cases(累计确诊病例),deaths(累计死亡病例)。

在这里插入图片描述

问题:统计美国2021-01-28,每个州state新增确诊案例数、新增死亡案例数

分析:

自定义对象CovidCountBean,用于封装每个县的确诊病例数和死亡病例数。注意自定义对象需要实现Hadoop的序列化机制。以州作为map阶段输出的key,以CovidCountBean作为value,这样属于同一个州的数据就会变成一组进行reduce处理,进行累加即可得出每个州累计确诊病例。 StateBean.java

因为mr中数据传递是键值对的形式,而当有多个值作为key或者value输出时,可以将其封装成一个对象Bean,这样就可以更加方便值的获取和读取。

在封装成对象时,除了要添加参数构造(空参、全参)、gettingSetting、toString以外,还需要实现hadoop序列化接口Writable。

在封装对象时,可以添加一个类似与全参构造的方法set,方便一次性传值,就不需要一次一次的set变量了。(以下案例就有涉及)

注意:如果该自定义对象作为key还需要重写comparable接口

package covid.Bean; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class StateBean implements Writable { //确诊病例数 private long Cases; //死亡病例数 private long deaths; public StateBean() { } public StateBean(long cases, long deaths) { Cases = cases; this.deaths = deaths; } public void set(long cases, long deaths) { Cases = cases; this.deaths = deaths; } public long getCases() { return Cases; } public void setCases(long cases) { Cases = cases; } public long getDeaths() { return deaths; } public void setDeaths(long deaths) { this.deaths = deaths; } @Override public String toString() { return Cases +"\t"+deaths; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(Cases); dataOutput.writeLong(deaths); } @Override public void readFields(DataInput dataInput) throws IOException { this.Cases = dataInput.readLong(); this.deaths = dataInput.readLong(); } } mapper.java 值得注意的是,数据第一行是不处理的,一次可以通过读取数据时默认的key.get()来判断,第一个值是0。要注意每次set值的时候数据类型,要保持一致。 package covid.Bean; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class mapper extends Mapper { Text outputKey = new Text(); StateBean outputvalue = new StateBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 跳过第一行数据 if (key.get() > 0){ String[] sl = value.toString().split(","); //将state数据作为key值,为了后续reduce分组求和 outputKey.set(sl[1]); //注意包装类中cases数据类型为Long,因此需要转换类型 outputvalue.setCases(Long.parseLong(sl[3])); outputvalue.setDeaths(Long.parseLong(sl[4])); context.write(outputKey,outputvalue); } } } reducer.java package covid.Bean; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class reducer extends Reducer { StateBean outputValue = new StateBean(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { long TotalCases = 0; long TotalDeaths = 0; for (StateBean value:values){ TotalCases += value.getCases(); TotalDeaths += value.getDeaths(); } outputValue.set(TotalCases,TotalDeaths); context.write(key,outputValue); } } driver.java

模板式写法,多写即可

package covid.Bean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class driver { public static void main(String[] args) throws Exception { //配置文件对象 Configuration conf = new Configuration(); //创建job实例 Job job = Job.getInstance(conf, driver.class.getSimpleName()); job.setJarByClass(driver.class); //设置与mapper的连接 job.setMapperClass(mapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(StateBean.class); //设置与reducer的连接 job.setReducerClass(reducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(StateBean.class); //输入路径 FileInputFormat.setInputPaths(job,new Path("E:\\InAndOut\\hadoop\\Input\\USCovid\\us-states.csv")); //输出路径 Path out = new Path("E:\\InAndOut\\hadoop\\Output\\Bean"); FileOutputFormat.setOutputPath(job,out); //判断文件是否存在 FileSystem fs = FileSystem.get(conf); if (fs.exists(out)){ fs.delete(out,true); } //提交并等待完成 boolean b = job.waitForCompletion(true); System.exit(b?0:1); } }

总结:该案例逻辑简单,主要是要明白自定义对象序列化的使用。

2 MapReduce自定义排序

问题:将美国2021-01-28,每个州state的确诊案例数进行倒序排序。

分析:

如果你的需求中需要根据某个属性进行排序,不妨把这个属性作为key。因为MapReduce中key有默认排序行为的。如果你的需求是正序,并且数据类型是Hadoop封装好的类型。这种情况下不需要任何修改,直接使用Hadoop类型作为key即可。因为Hadoop封装好的类型已经实现了排序规则。如果你的需求是倒序,或者数据类型是自定义对象。需要重写排序规则。bean对象实现Comparable接口重写CompareTo方法。 sortBean.java

WritableComparable接口实现要在后面规定其泛型,例如

WritableComparable接口只是Writable和Comparable合并写法。

自定义排序规则:返回 0-等于、负数-小于、正数-大于。而倒序精髓:如果大于,那么强制返回负数。

this.Cases表示当前的数,而o.getCases()表示其他的数。

package covid.sort; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class sortBean implements WritableComparable { //确诊病例数 private long Cases; //死亡病例数 private long deaths; public sortBean() { } public sortBean(long cases, long deaths) { Cases = cases; this.deaths = deaths; } public void set(long cases, long deaths) { Cases = cases; this.deaths = deaths; } public long getCases() { return Cases; } public void setCases(long cases) { Cases = cases; } public long getDeaths() { return deaths; } public void setDeaths(long deaths) { this.deaths = deaths; } @Override public String toString() { return Cases + "\t" + deaths; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(Cases); dataOutput.writeLong(deaths); } @Override public void readFields(DataInput dataInput) throws IOException { this.Cases = dataInput.readLong(); this.deaths = dataInput.readLong(); } //自定义排序规则 @Override public int compareTo(sortBean o) { return this.Cases - o.getCases() > 0 ? -1 : (this.Cases - o.getCases() reducetask有多个(>2)–>默认只有1个,如何有多个?—>可以设置,job.setNumReduceTasks(N)—>当有多个reducetask 意味着数据分区---->默认分区规则是什么? hashPartitioner—>默认分区规则符合你的业务需求么?---->符合,直接使用—>不符合,自定义分区。

StatePartitioner.java 自定义分区,需要继承Partitioner并重写 getPartition方法。getPartition返回值为正数,分区号要求是连续的并从0开始的,例如分5个区,那分区的返回值只能是0、1、2、3、4这5个数字。 package covid.partition; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; import java.util.HashMap; public class StatePartitioner extends Partitioner { //模拟美国各州数据字典 实际中可以从redis中快速查询 如果数据不大也可以使用数据集合保存 public static HashMap stateMap = new HashMap(); static{ stateMap.put("Alabama", 0); stateMap.put("Arkansas", 1); stateMap.put("California", 2); stateMap.put("Florida", 3); stateMap.put("Indiana", 4); } @Override public int getPartition(Text key, Text value, int i) { Integer code = stateMap.get(key.toString()); //判断是否是指定的那几个州,如果是返回对应的值,如果不是返回5 if (code!=null){ return code; } return 5; } } mapper.java

输入数据和案例一相同

package covid.partition; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class mapper extends Mapper { Text outkey = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] sl = value.toString().split(","); outkey.set(sl[1]); context.write(outkey,value); } } reducer.java package covid.partition; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class reducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { for (Text value:values){ context.write(value,NullWritable.get()); } } } driver.java 通过 job.setNumReduceTasks(n);来设置reduceTask个数(输出文件的个数)关联自定义分区规则job.setPartitionerClass(StatePartitioner.class);重点:n要>=分区个数,如果大于分区个数,那么多下的文件不会有数据,但如果小于分区数,会出现编译错误。 package covid.partition; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class driver { public static void main(String[] args) throws Exception { //配置文件对象 Configuration conf = new Configuration(); //创建job实例 Job job = Job.getInstance(conf, covid.sort.driver.class.getSimpleName()); job.setJarByClass(covid.sort.driver.class); //设置与mapper的连接 job.setMapperClass(mapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //设置与reducer的连接 job.setReducerClass(reducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //设置分区个数,以及分区规则 job.setNumReduceTasks(6); job.setPartitionerClass(StatePartitioner.class); //输入路径 FileInputFormat.setInputPaths(job,new Path("E:\\InAndOut\\hadoop\\Input\\USCovid\\us-states.csv")); //输出路径 Path out = new Path("E:\\InAndOut\\hadoop\\Output\\statePartitioner"); FileOutputFormat.setOutputPath(job,out); //判断文件是否存在 FileSystem fs = FileSystem.get(conf); if (fs.exists(out)){ fs.delete(out,true); } //提交并等待完成 boolean b = job.waitForCompletion(true); System.exit(b?0:1); } } 4 MapReduce自定义分组

分组概述:

分组在发生在reduce阶段,决定了同一个reduce中哪些数据将组成一组去调用reduce方法处理。默认分组规则是:key相同的就会分为一组(前后两个key直接比较是否相等)。需要注意的是,在reduce阶段进行分组之前,因为进行了数据排序,因此排序+分组将会使得key一样的数据一定被分到同一组,一组去调用reduce方法处理。

自定义分组规则:

写类继承 WritableComparator,重写Compare方法。只要Compare方法返回为0,MapReduce框架在分组的时候就会认为前后两个相等,分为一组。在job对象中进行设置,让自己的重写分组类生效。job.setGroupingComparatorClass(xxxx.class);

问题:统计美国2021-01-28,每个州state的确诊案例数最多的县是哪一个。

分析:

在map阶段将“州state、县county、县确诊病例cases”通过自定义对象封装,作为key输出;重写对象的排序规则,首先根据州的正序排序,如果州相等,按照确诊病例数cases倒序排序,发送到reduce。在reduce端利用自定义分组规则,将州state相同的分为一组,然后取第一个即是最大值。 groupBean.java 注意:在这个类中重写自定义排序规则Comparable值得注意的是,字符串的compareTo方法,优先比较两个字符串的第一个字符的ASCII大小,如果第一个相同就比较两个字符串的第二个字符,依次往后推。如果第一个字符串大,则返回正整数,完全相等返回零,第一个字符串小,则返回负数。 package covid.group; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class groupBean implements WritableComparable { private String state; private String country; private long cases; public groupBean() { } public groupBean(String state, String country, long cases) { this.state = state; this.country = country; this.cases = cases; } public void set(String state, String country, long cases) { this.state = state; this.country = country; this.cases = cases; } public String getState() { return state; } public void setState(String state) { this.state = state; } public String getCountry() { return country; } public void setCountry(String country) { this.country = country; } public long getCases() { return cases; } public void setCases(long cases) { this.cases = cases; } @Override public String toString() { return state +'\t' + country + '\t' + cases; } //根据州state正序进行排序 如果州相同 则根据确诊数量cases倒序排序 @Override public int compareTo(groupBean o) { int result; int i =state.compareTo(o.getState()); if (i > 0){ result = 1; }else if (io.getCases()?-1:1; } return result; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(state); dataOutput.writeUTF(country); dataOutput.writeLong(cases); } @Override public void readFields(DataInput dataInput) throws IOException { this.state = dataInput.readUTF(); this.country = dataInput.readUTF(); this.cases = dataInput.readLong(); } } mapper.java

该mapper中将所需内容都封装到key中,原因是自定义分组和排序要求的数据必须在key中。

package covid.group; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class mapper extends Mapper { groupBean outputKey = new groupBean(); NullWritable outputValue = NullWritable.get(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if (key.get()>0){ String[] sl = value.toString().split(","); outputKey.set(sl[2],sl[1],Long.parseLong(sl[4])); context.write(outputKey,outputValue); } } } groupingComparator.java 自定义分组规则写在该类中,要求继承WritableComparator类,并重写compare方法。该类中还必须写该类的空参构造,向父类返回key数据类型的反射。也就是以下代码: public groupingComparator() { super(groupBean.class,true); } compare方法返回值分为,零和非零两种,如果是零认为相同分为一组,非零则代表不是一组。该方法中的WritableComparable a, WritableComparable b两个值是两个不同数据的key,需要对其进行类型转换才能使用。 package covid.group; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class groupingComparator extends WritableComparator { public groupingComparator() { super(groupBean.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { //类型转换 groupBean aBean = (groupBean) a; groupBean bBean = (groupBean) b; return aBean.getState().compareTo(bBean.getState()); } } reducer.java(自定义分组关键)

思考:按照key中的一个变量进行分组,但是reduce方法的变量是key和value的迭代,key是怎么和valude一一对应呢?

情况1:不迭代value,直接输出kv,发现key是分组中第一个kv对所对应的key。情况2:一边迭代,一边输出kv,此时key会随着value的变化而变化,并与之对应。情况3:迭代完values最终输出一次value,此时key是分组中最后一个key。 package covid.group; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class reducer extends Reducer { @Override protected void reduce(groupBean key, Iterable values, Context context) throws IOException, InterruptedException { //不遍历迭代器,此时key就是分组中的第一个key就是该州确诊病例人数最多的县对应的数据 context.write(key,NullWritable.get()); } } driver.java

需要在驱动类中设置关联自定义分组规则。

package covid.group; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class driver { public static void main(String[] args) throws Exception { //配置文件对象 Configuration conf = new Configuration(); //创建job实例 Job job = Job.getInstance(conf, covid.sort.driver.class.getSimpleName()); job.setJarByClass(covid.sort.driver.class); //设置与mapper的连接 job.setMapperClass(mapper.class); job.setMapOutputKeyClass(groupBean.class); job.setMapOutputValueClass(NullWritable.class); //设置与reducer的连接 job.setReducerClass(reducer.class); job.setOutputKeyClass(groupBean.class); job.setOutputValueClass(NullWritable.class); //设置自定义分组 job.setGroupingComparatorClass(groupingComparator.class); //输入路径 FileInputFormat.setInputPaths(job,new Path("E:\\InAndOut\\hadoop\\Input\\USCovid\\us-counties.csv")); //输出路径 Path out = new Path("E:\\InAndOut\\hadoop\\Output\\group"); FileOutputFormat.setOutputPath(job,out); //判断文件是否存在 FileSystem fs = FileSystem.get(conf); if (fs.exists(out)){ fs.delete(out,true); } //提交并等待完成 boolean b = job.waitForCompletion(true); System.exit(b?0:1); } }


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3