大数据

您所在的位置:网站首页 mapreduce基础编程实验 大数据

大数据

2023-12-13 03:55| 来源: 网络整理| 查看: 265

文章目录 📚实验目的📚实验平台📚实验内容🐇编程实现文件的合并和去重🐇编程实现对输入文件的排序🐇对指定的表格进行信息挖掘

📚实验目的

1)通过实验掌握基本的 MapReduce 编程方法。

2)掌握用 MapReduce 解决一些常见的数据处理问题,包括数据去重、数据排序和数据挖掘等。

📚实验平台

1)操作系统:Linux;

2)Hadoop 版本:3.2.2;

📚实验内容 🐇编程实现文件的合并和去重

在这里插入图片描述 在这里插入图片描述

package hdfs; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class Merge { //这段代码输出的结果是原始输入数据的全部内容作为key,value为空的键值对。 public static class Map extends Mapper { //在Map类中定义了一个静态变量text,并将其类型设置为Text。 private static Text text = new Text(); //map方法中的参数分别表示输入数据的键、值和上下文对象 //上下文对象可以用于向输出写入数据。 public void map(Object key, Text value, Context context) throws IOException,InterruptedException { //将输入的value数据赋值给text变量 text = value; //text作为key,一个空的Text对象作为value输出。 context.write(text, new Text("")); } } //简单地将Mapper输出的所有键值对的key提取出来作为Reducer的输出。 public static class Reduce extends Reducer { // reduce方法中的参数分别表示输入数据的键、值集合和上下文对象 //上下文对象可以用于向输出写入数据。 public void reduce(Text key, Iterable values, Context context ) throws IOException,InterruptedException { //将输入的key作为key,一个空的Text对象作为value输出 context.write(key, new Text("")); //因为这里没有对values集合进行处理 //所以values中的数据会被忽略掉,只有输入的key被输出。 } } public static void main(String[] args) throws Exception { //首先创建一个Configuration对象,用于存储Hadoop集群中的一些配置信息。 Configuration conf = new Configuration(); //设置Hadoop集群的默认文件系统为hdfs://localhost:9000。 conf.set("fs.default.name","hdfs://localhost:9000"); //接着检查输入参数是否正确 //需要传入两个参数,第一个是输入数据路径,第二个是输出结果路径。 String[] otherArgs = new String[]{"input","output"}; if (otherArgs.length != 2) { //如果参数不满足要求,则输出错误提示并退出程序。 System.err.println("Usage: wordcount "); System.exit(2); } //创建一个Job对象,使用"Merge and duplicate removal"作为任务名称。 Job job = Job.getInstance(conf,"Merge and duplicate removal"); //使用Merge类的class对象来设置job所在的jar包。 job.setJarByClass(Merge.class); //设置Map类作为Mapper job.setMapperClass(Map.class); //设置Reduce类为Combiner和Reducer。 job.setCombinerClass(Reduce.class); job.setReducerClass(Reduce.class); //设置输入数据和输出结果的键值类型。 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //设置输入数据路径。 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //设置输出结果路径 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); //提交任务并等待任务执行完成,根据执行状态返回0或1表示任务执行成功或失败。 System.exit(job.waitForCompletion(true) ? 0 : 1); } } 🐇编程实现对输入文件的排序

在这里插入图片描述

在这里插入图片描述

package hdfs; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class MergeSort { //这段代码将输入的文本数据中的每一个整数作为键,对应的出现次数设置为1作为值 public static class Map extends Mapper { private static IntWritable data = new IntWritable(); public void map(Object key, Text value, Context context) throws IOException,InterruptedException { //输入的key是偏移量,value是那个数值 //首先将输入数据从Text类型转换为String类型,并赋值给text变量。 String text = value.toString(); //然后将text转换为Int类型,并将其封装到IntWritable对象中,赋值给data变量。 data.set(Integer.parseInt(text)); //将data作为输出key,new IntWritable(1)作为输出值value写入上下文中 context.write(data, new IntWritable(1)); } } //在进入reduce之前会有一个partition的过程,但因为我们现在电脑配置的datenode只有一个,所以最后其实都会就进入那一个dateNode。 public static class Partition extends Partitioner { public int getPartition(IntWritable key, IntWritable value, int num_Partition) { //getPartition方法的三个参数分别表示输入键、输入值和分区数。 int Maxnumber = 65223; //首先定义了一个最大数字Maxnumber,并根据分区数计算出每个分区的边界bound //即将Maxnumber均匀地分为num_Partition个部分 int bound = Maxnumber/num_Partition+1; //接着获取当前输入键的整数值keynumber int keynumber = key.get(); for (int i = 0; i //如果找到了对应的分区,则返回该分区的编号i; //否则,如果在所有分区中都没有找到对应的分区,则返回-1,表示出错。 return i; } } return -1; } } //对Map类输出的中间结果按键值排序,为每个键值对添加一个唯一的序号,并将排序后的结果作为最终输出结果。 //在本例中,输出结果是一个序号与整数对应的列表。 public static class Reduce extends Reducer { //Reduce的泛型参数分别表示输入键、输入值、输出键和输出值的类型。 private static IntWritable line_num = new IntWritable(1); public void reduce(IntWritable key, Iterable values, Context context) throws IOException,InterruptedException { //遍历Iterable类型的values参数,将其按照key值排序后输出。 for(IntWritable val : values) { //这里的排序是自动排序 //使用context.write方法将line_num作为输出键,key作为输出值写入上下文中。 context.write(line_num, key); //每输出一个键值对,line_num的值就加1,以保证输出的键值对具有唯一的序号。 line_num = new IntWritable(line_num.get() + 1); } //这里for循环的意义就是避免去重,让key相同的都能遍历输出。 } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.default.name","hdfs://localhost:9000"); String[] otherArgs = new String[]{"input","output"}; if (otherArgs.length != 2) { System.err.println("Usage: wordcount "); System.exit(2); } Job job = Job.getInstance(conf,"Merge and sort"); //设置运行的主类为MergeSort。 job.setJarByClass(MergeSort.class); //设置Mapper类为Map。 job.setMapperClass(Map.class); //设置Reducer类为Reduce。 job.setReducerClass(Reduce.class); //设置Partitioner类为Partition。 job.setPartitionerClass(Partition.class); //设置输出键类型为IntWritable。 job.setOutputKeyClass(IntWritable.class); //设置输出值类型为IntWritable。 job.setOutputValueClass(IntWritable.class); //使用addInputPath()方法将输入路径添加到任务中 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //使用setOutputPath()方法将输出路径设置到任务中。 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); //使用waitForCompletion()方法启动任务,并等待任务完成。 //如果任务执行成功,则返回0;否则,返回1。在最后使用System.exit()方法退出程序。 System.exit(job.waitForCompletion(true) ? 0 : 1); } } 🐇对指定的表格进行信息挖掘

在这里插入图片描述

package hdfs; import java.io.IOException; import java.util.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class map { //用于输出标题 public static int time = 0; //“child_name”、“parent_name”和“relation_type”作为键值对输出到Context对象中。 public static class Map extends Mapper { public void map(Object key, Text value, Context context) throws IOException,InterruptedException { String child_name = new String(); String parent_name = new String(); String relation_type = new String(); String line = value.toString(); int i = 0; while(line.charAt(i) != ' ') {//将输入的每一行文本数据以空格为分隔符 i++; } //将第一个部分作为子节点名称,第二个部分作为父节点名称,以及一个关系类型组成的字符串 String[] values = {line.substring(0,i),line.substring(i+1)}; if(values[0].compareTo("child") != 0) { //如果“child_name”不等于“child”(就不是标题) //key是父亲,1 child_name = values[0]; parent_name = values[1]; relation_type = "1"; context.write(new Text(values[1]), new Text(relation_type+"+"+child_name+"+"+parent_name)); //key是孩子,2 relation_type = "2"; context.write(new Text(values[0]), new Text(relation_type+"+"+child_name+"+"+parent_name)); } } } public static class Reduce extends Reducer { public void reduce(Text key, Iterable values,Context context) throws IOException,InterruptedException { if(time == 0) { //用于标题 context.write(new Text("grand_child"), new Text("grand_parent")); time++; } int grand_child_num = 0; //用于在数组里插入 String grand_child[] = new String[10];//孙辈的数组 int grand_parent_num = 0; //用于在数组里插入 String grand_parent[]= new String[10];//祖辈的数组 Iterator ite = values.iterator(); while(ite.hasNext()) { String record = ite.next().toString(); int len = record.length(); int i = 2; //i=0是relation_type,i=1是“+” if(len == 0) continue; char relation_type = record.charAt(0); String child_name = new String(); String parent_name = new String(); while(record.charAt(i) != '+') { //child,从2开始 child_name = child_name + record.charAt(i); i++; } i=i+1; //一个加号 while(i //父亲,就取孩子的名字,就是孙辈的名字 grand_child[grand_child_num] = child_name; grand_child_num++; } else {//孩子,就取父亲的名字,就是祖辈的名字 grand_parent[grand_parent_num] = parent_name; grand_parent_num++; } } if(grand_parent_num != 0 && grand_child_num != 0 )//全排列 { for(int m = 0;m context.write(new Text(grand_child[m]), new Text(grand_parent[n])); //对每一行进行reduce } } } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.default.name","hdfs://localhost:9000"); String[] otherArgs = new String[]{"input","output"}; if (otherArgs.length != 2) { System.err.println("Usage: wordcount "); System.exit(2); } Job job = Job.getInstance(conf,"Single table join"); //设置运行的主类为map。 job.setJarByClass(map.class); //设置Mapper类为Map。 job.setMapperClass(Map.class); //设置Reducer类为Reduce。 job.setReducerClass(Reduce.class); //设置输入数据和输出结果的键值类型。 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //设置输入数据路径。 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //设置输出结果路径。 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); //使用waitForCompletion()方法启动任务,并等待任务完成。 //如果任务执行成功,则返回0;否则,返回1。在最后使用System.exit()方法退出程序。 System.exit(job.waitForCompletion(true) ? 0 : 1); } }

补充学习博客:MapReduce编程规范及示例编写



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3