Hadoop3

您所在的位置:网站首页 宿迁陆集确诊病例 Hadoop3

Hadoop3

2024-07-03 07:58| 来源: 网络整理| 查看: 265

一、COVID-19 案例

上篇文章对 MapReduce 进行了介绍,并编写了 WordCount 经典案例的实现,本篇为继续加深 MapReduce 的用法,实践 COVID-19 新冠肺炎案例,下面是上篇文章的地址:

https://blog.csdn.net/qq_43692950/article/details/127195121

COVID-19,简称“新冠肺炎”,世界卫生组织命名为“2019冠状病毒病” [1-2] ,是指2019新型冠状病毒感染导致的肺炎。现有美国 2021-01-28 号,各个县county的新冠疫情累计案例信息,包括确诊病例和死亡病例,数据格式如下所示:

date(日期),county(县),state(州),fips(县编码code),cases(累计确诊病例),deaths(累计死亡病例) 2021-01-28,Pike,Alabama,01109,2704,35 2021-01-28,Randolph,Alabama,01111,1505,37 2021-01-28,Russell,Alabama,01113,3675,16 2021-01-28,Shelby,Alabama,01117,19878,141 2021-01-28,St. Clair,Alabama,01115,8047,147 2021-01-28,Sumter,Alabama,01119,925,28 2021-01-28,Talladega,Alabama,01121,6711,114 2021-01-28,Tallapoosa,Alabama,01123,3258,112 2021-01-28,Tuscaloosa,Alabama,01125,22083,283 2021-01-28,Walker,Alabama,01127,6105,185 2021-01-28,Washington,Alabama,01129,1454,27

数据集下载

https://download.csdn.net/download/qq_43692950/86805389

二、计算各个州的累积cases、deaths

创建 VO 类存储 cases、deaths 个数:

@Data @AllArgsConstructor @NoArgsConstructor public class CountVO implements Writable { private Long cases;//确诊病例数 private Long deaths;//死亡病例数 public void set(long cases, long deaths) { this.cases = cases; this.deaths = deaths; } /** * 序列化方法 */ @Override public void write(DataOutput out) throws IOException { out.writeLong(cases); out.writeLong(deaths); } /** * 反序列化方法 注意顺序 */ @Override public void readFields(DataInput in) throws IOException { this.cases = in.readLong(); this.deaths =in.readLong(); } @Override public String toString() { return cases +"\t"+ deaths; } }

创建 Mapper 类,截取出州和 cases、deaths,以州为 key ,CountVO 为 Value :

public class SumMapper extends Mapper { Text outKey = new Text(); CountVO outValue = new CountVO(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split(","); //州 outKey.set(fields[2]); //Covid数据 确诊病例 死亡病例 outValue.set(Long.parseLong(fields[fields.length-2]),Long.parseLong(fields[fields.length-1])); context.write(outKey,outValue); } }

创建 Reducer ,对 cases、deaths 累加:

public class SumReducer extends Reducer { CountVO outValue = new CountVO(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { long totalCases = 0; long totalDeaths =0; //累加统计 for (CountVO value : values) { totalCases += value.getCases(); totalDeaths +=value.getDeaths(); } outValue.set(totalCases,totalDeaths); context.write(key,outValue); } }

创建驱动类,加载上面的 Mapper 和 Reducer :

public class SumDriver extends Configured implements Tool { public static void main(String[] args) throws Exception{ //配置文件对象 Configuration conf = new Configuration(); int status = ToolRunner.run(conf, new SumDriver(), args); System.exit(status); } @Override public int run(String[] args) throws Exception { // 配置本次作业的输入数据路径 和输出数据路径,通过参数传递进来, // 如果输入是一个目录,则会读取目录下的所有文件汇总到进行处理 Path input = new Path(args[0]); Path output = new Path(args[1]); // 输出目录必须为空,如果不为空则会报错提示 FileSystem fs = FileSystem.get(getConf()); if(fs.exists(output)){ fs.delete(output,true); } // 创建作业实例 Job job = Job.getInstance(getConf(), SumDriver.class.getSimpleName()); // 设置作业驱动类 job.setJarByClass(SumDriver.class); // 设置作业mapper reducer类 job.setMapperClass(SumMapper.class); job.setReducerClass(SumReducer.class); // 设置作业mapper阶段输出key value数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(CountVO.class); //设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(CountVO.class); // 配置作业的输入数据路径 FileInputFormat.addInputPath(job, input); // 配置作业的输出数据路径 FileOutputFormat.setOutputPath(job, output); return job.waitForCompletion(true)? 0:1; } }

数据集目录和输出目录通过参数传递进来,这里我将数据集放在了 D:/test/input 下:

在这里插入图片描述

如果是打包后放在 hadoop 集群运行,则:

hadoop jar # 或者 yarn jar

在这里插入图片描述 运行成功后,到输出目录查看结果:

在这里插入图片描述 已成功统计出相关结果。

三、对上面计算的结果根据deaths进行倒叙排列

上麦已经计算出了每个州的cases、deaths,如果还需要根据deaths进行倒叙排列的话,我们可以针对上面 job 输出的结果在进行处理,利用 MapReduce 中key的排序行为,将上个 job 的 value 作为本次 job 的key。

对 CountVO 进行修改,通过实现 Comparable 实现排序的效果,不过在上面我们已经实现了 Writable接口,在上篇文章中就讲到 Hadoop 为我们提供了 WritableComparable 已经实现好了 Writable, Comparable ,下面将 CountVO 中的 Writable 换成 WritableComparable:

@Data @AllArgsConstructor @NoArgsConstructor public class CountVO implements WritableComparable { private Long cases;//确诊病例数 private Long deaths;//死亡病例数 public void set(long cases, long deaths) { this.cases = cases; this.deaths = deaths; } /** * 序列化方法 */ @Override public void write(DataOutput out) throws IOException { out.writeLong(cases); out.writeLong(deaths); } /** * 反序列化方法 注意顺序 */ @Override public void readFields(DataInput in) throws IOException { this.cases = in.readLong(); this.deaths =in.readLong(); } @Override public String toString() { return cases +"\t"+ deaths; } @Override public int compareTo(CountVO o) { return this.deaths.equals(o.getDeaths()) ? 0 : (this.deaths - o.getDeaths() > 0 ? -1 : 1); } }

compareTo 方法用于将当前对象与方法的参数进行比较。如果指定的数与参数相等返回0。如果指定的数小于参数返回 -1。如果指定的数大于参数返回 1。

创建 Mapper,key 为 CountVO:

public class SortSumMapper extends Mapper { CountVO outKey = new CountVO(); Text outValue = new Text(); @Override protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("\t"); outKey.set(Long.parseLong(fields[1]),Long.parseLong(fields[2])); outValue.set(fields[0]); context.write(outKey,outValue); } }

编写 Reducer, 无需做任何操作直接 write 即可

public class SortSumReducer extends Reducer { @Override protected void reduce(CountVO key, Iterable values, Context context) throws IOException, InterruptedException { Text outKey = values.iterator().next(); context.write(outKey,key); } }

编写驱动类:

public class SortSumDriver extends Configured implements Tool { public static void main(String[] args) throws Exception{ //配置文件对象 Configuration conf = new Configuration(); int status = ToolRunner.run(conf, new SortSumDriver(), args); System.exit(status); } @Override public int run(String[] args) throws Exception { // 配置本次作业的输入数据路径 和输出数据路径,通过参数传递进来, // 如果输入是一个目录,则会读取目录下的所有文件汇总到进行处理 Path input = new Path(args[0]); Path output = new Path(args[1]); // 输出目录必须为空,如果不为空则会报错提示 FileSystem fs = FileSystem.get(getConf()); if(fs.exists(output)){ fs.delete(output,true); } // 创建作业实例 Job job = Job.getInstance(getConf(), SortSumDriver.class.getSimpleName()); // 设置作业驱动类 job.setJarByClass(SortSumDriver.class); // 设置作业mapper reducer类 job.setMapperClass(SortSumMapper.class); job.setReducerClass(SortSumReducer.class); // 设置作业mapper阶段输出key value数据类型 job.setMapOutputKeyClass(CountVO.class); job.setMapOutputValueClass(Text.class); //设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(CountVO.class); // 配置作业的输入数据路径 FileInputFormat.addInputPath(job, input); // 配置作业的输出数据路径 FileOutputFormat.setOutputPath(job, output); return job.waitForCompletion(true)? 0:1; } }

将上个 job 的结果放在 D:/test/input1 下,执行该驱动类: 在这里插入图片描述

在这里插入图片描述 执行成功后,到输出目录查看结果:

在这里插入图片描述 已经实现根据 死亡病例进行倒叙排列。

四、对每个州的 deaths 筛选出Top3的县

修改 CountVO :

@Data @AllArgsConstructor @NoArgsConstructor public class CountVO implements WritableComparable { private String county;//县 private Long cases;//确诊病例数 private Long deaths;//死亡病例数 public CountVO(CountVO vo){ this.county = vo.getCounty(); this.cases = vo.getCases(); this.deaths = vo.getDeaths(); } public void set(long cases, long deaths, String county) { this.cases = cases; this.deaths = deaths; this.county = county; } /** * 序列化方法 */ @Override public void write(DataOutput out) throws IOException { out.writeLong(cases); out.writeLong(deaths); out.writeUTF(county); } /** * 反序列化方法 注意顺序 */ @Override public void readFields(DataInput in) throws IOException { this.cases = in.readLong(); this.deaths = in.readLong(); this.county = in.readUTF(); } @Override public String toString() { return county + "\t" + cases + "\t" + deaths; } @Override public int compareTo(CountVO o) { return this.deaths.equals(o.getDeaths()) ? 0 : (this.deaths - o.getDeaths() > 0 ? -1 : 1); } }

修改 SumMapper 类:

public class SumMapper extends Mapper { Text outKey = new Text(); CountVO outValue = new CountVO(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split(","); //州 outKey.set(fields[2]); //Covid数据 确诊病例 死亡病例 县 outValue.set(Long.parseLong(fields[fields.length - 2]), Long.parseLong(fields[fields.length - 1]), fields[1]); context.write(outKey, outValue); } }

修改 SumReducer 类:

public class SumReducer extends Reducer { CountVO outValue = new CountVO(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { List vList = new ArrayList(); values.forEach(v -> vList.add(new CountVO(v))); vList.sort(CountVO::compareTo); vList.stream().filter(Objects::nonNull).limit(3).forEach(c -> { outValue.set(c.getCases(), c.getDeaths(), c.getCounty()); try { context.write(key, outValue); } catch (Exception e) { e.printStackTrace(); } }); } }

驱动类无需修改,下面执行 Job ,到输出目录查看结果:

在这里插入图片描述 已经计算出了每个州的死亡病例 Top3 。

五、将二、三两个任务合并在一起执行

上面第三点依赖于第二点的结果,但是上面是分成了两个驱动类执行,在 MapReduce 中提供了工作流,可以通过一个提交来完成原来需要提交2次的任务。

修改驱动类:

public class SumDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); ControlledJob ctrljob1 = getJob1(conf); ControlledJob ctrljob2 = getJob2(conf); //设置依赖job的依赖关系 ctrljob2.addDependingJob(ctrljob1); // 主控制容器,控制上面的总的两个子作业 JobControl jobCtrl = new JobControl("mainCtrl"); // 添加到总的JobControl里,进行控制 jobCtrl.addJob(ctrljob1); jobCtrl.addJob(ctrljob2); // 在子线程启动 Thread t = new Thread(jobCtrl); t.start(); while(true) { if (jobCtrl.allFinished()) {// 如果作业成功完成,就打印成功作业的信息 System.out.println(jobCtrl.getSuccessfulJobList()); jobCtrl.stop(); break; } } } private static ControlledJob getJob1(Configuration conf) throws IOException { Job job = Job.getInstance(conf, SumDriver.class.getSimpleName()); job.setJarByClass(SumDriver.class); // 设置作业mapper reducer类 job.setMapperClass(SumMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(CountVO.class); job.setReducerClass(SumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(CountVO.class); Path input = new Path("D:/test/input"); Path output = new Path("D:/test/output"); FileSystem fs = FileSystem.get(conf); if (fs.exists(output)) { fs.delete(output, true); } FileInputFormat.addInputPath(job, input); FileOutputFormat.setOutputPath(job, output); //转化成受控作业 ControlledJob ctrljob = new ControlledJob(conf); ctrljob.setJob(job); return ctrljob; } private static ControlledJob getJob2(Configuration conf) throws IOException { Job job = Job.getInstance(conf, SumDriver.class.getSimpleName()); // 设置作业驱动类 job.setJarByClass(SumDriver.class); // 设置作业mapper reducer类 job.setMapperClass(SortSumMapper.class); job.setMapOutputKeyClass(CountVO.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(SortSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(CountVO.class); Path input = new Path("D:/test/output"); Path output = new Path("D:/test/output1"); FileSystem fs = FileSystem.get(conf); if (fs.exists(output)) { fs.delete(output, true); } FileInputFormat.addInputPath(job, input); FileOutputFormat.setOutputPath(job, output); //转化成受控作业 ControlledJob ctrljob = new ControlledJob(conf); ctrljob.setJob(job); return ctrljob; } }

执行后可以看到两个结果目录: 在这里插入图片描述 在这里插入图片描述



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3