使用 MapReduce 和 Hive 进行数据分析(1)

您所在的位置:网站首页 天干年份从高到低怎么排 使用 MapReduce 和 Hive 进行数据分析(1)

使用 MapReduce 和 Hive 进行数据分析(1)

2024-07-12 02:59| 来源: 网络整理| 查看: 265

实验-在电影库中查找我的演员评分最高的5部电影 【实验要求】

结合本门课程学过的知识,编写 MapReduce 程序,按评分从高到低排序该演员参演电影(如果同分则优先列出年份较近的,例如2000年上映的A电影和1995年上映的B电影同分,则排序应该为A,B),找到评分最高的前5部电影的名称,上映年份和评分。

【提示1 序列化和排序】

运行HadoopSerializationTest.java 单元测试。此单元测试展示了在 MapReduce 中,键值对中的对象序列化的代码。

如果只需要对对象进行序列化,只需要实现 org.apache.hadoop.io.Writable接口。

如果同时要对对象进行序列化,同时也需要对对象进行排序,需要实现org.apache.hadoop.io.WritableComparable接口。MapReduce 的排序主要是利用了在 Shuffle 阶段对键值对中的键(Key)进行的排序。

【提示2 如何对把 JSON 转换为 Java 对象】

运行JsonParseTest.java 单元测试。此单元测试展示了如果把 JSON 的字符串转换为 Java 对象。

【提示3 如何对 Mapper 进行测试】

运行MapperTest.java 单元测试。此单元测试展示了如何对 MapReduce 中的 Mapper 进行测试。

package hadooptraining1; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class ActorBestMoviesMain { public static void main(String[] args) throws Exception{ // 创建一个 job Job job = Job.getInstance(new Configuration()); job.setJarByClass(ActorBestMoviesMain.class); //这里访问的是 HDFS 上jar包路径 job.addFileToClassPath(new Path("/lib/fastjson-1.2.62.jar")); // 指定 job 的 mapper 和输出的类型 k2 v2 job.setMapperClass(ActorBestMoviesMapper.class); job.setMapOutputKeyClass(MovieInfo.class); job.setMapOutputValueClass(NullWritable.class); //job.setSortComparatorClass(cls); // 指定 job 的输入和输出的路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 执行任务 job.waitForCompletion(true); } } package hadooptraining1; import com.alibaba.fastjson.JSON; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class ActorBestMoviesMapper extends Mapper{ public static final Log log = LogFactory.getLog(ActorBestMoviesMapper.class); @Override protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException { String val=value1.toString(); MovieInfo m=JSON.parseObject(val, MovieInfo.class); if(m.getActorSet().contains("演员的名字")){ log.info(m.getTitle()); context.write(m, NullWritable.get()); } } } package hadooptraining1; import com.alibaba.fastjson.JSON; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class ActorBestMoviesMapper2 extends Mapper{ public static final Log log = LogFactory.getLog(ActorBestMoviesMapper2.class); @Override protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException { String val=value1.toString(); MovieInfo m=JSON.parseObject(val, MovieInfo.class); if(m.getActor().indexOf("演员的名字")!=-1){ //1.把actor切为数组 String[] actorArray=m.getActor().split(","); //2. 循环数组写入,每个演员对应电影信息写入到K2 for(String actor:actorArray) { context.write(new Text(""), NullWritable.get()); } } } } package hadooptraining1; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import org.apache.hadoop.io.Writable; /** * Hadoop 序列化和反序列化工具 */ public class HadoopSerializationUtil { /** * 序列化 * @param writable * @return * @throws IOException */ public static byte[] serialize(Writable writable) throws IOException { // create bytes ByteArrayOutputStream ByteArrayOutputStream out = new ByteArrayOutputStream(); // create DataOutputStream DataOutputStream dataout = new DataOutputStream(out); // call write method writable.write(dataout); dataout.close(); // bytes return out.toByteArray(); } /** * 反序列化 * @param writable * @param bytes * @throws Exception */ public static void deserialize(Writable writable, byte[] bytes) throws Exception { // create ByteArrayInputStream ByteArrayInputStream in = new ByteArrayInputStream(bytes); // create DataInputStream DataInputStream datain = new DataInputStream(in); // read fields writable.readFields(datain); datain.close(); } } package hadooptraining1; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.HashSet; import java.util.Set; public class MovieInfo implements WritableComparable { private String title; private int year; private String type; private float star; private String director; private String actor; private String time; private String film_page; private String doubanId; public Set getActorSet(){ Set set=new HashSet(); if(actor!=null){ String[] as=actor.split(","); String trimActorName=null; for(String a:as){ trimActorName=a.trim(); if(!"".equals(trimActorName)){ set.add(trimActorName); } } } return set; } public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } public int getYear() { return year; } public void setYear(int year) { this.year = year; } public String getType() { return type; } public void setType(String type) { this.type = type; } public float getStar() { return star; } public void setStar(float star) { this.star = star; } public String getDirector() { return director; } public void setDirector(String director) { this.director = director; } public String getActor() { return actor; } public void setActor(String actor) { this.actor = actor; } public String getFilm_page() { return film_page; } public void setFilm_page(String film_page) { this.film_page = film_page; } public String getTime() { return time; } public void setTime(String time) { this.time = time; } public String getDoubanId() { if(film_page!=null){ doubanId=film_page.substring(33,film_page.length()-1); } return doubanId; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.title); out.writeInt(this.year); out.writeUTF(this.type); out.writeFloat(this.star); out.writeUTF(this.director); out.writeUTF(this.actor); out.writeUTF(this.time); out.writeUTF(this.film_page); } @Override public void readFields(DataInput in) throws IOException { this.title=in.readUTF(); this.year=in.readInt(); this.type=in.readUTF(); this.star=in.readFloat(); this.director=in.readUTF(); this.actor=in.readUTF(); this.time=in.readUTF(); this.film_page=in.readUTF(); } @Override public int compareTo(MovieInfo o) { if(o==null){ return -1; }else{ if(star>o.star){ return -1; }else if(star==o.star){ return year>o.year?-1:1; }else{ return 1; } } } @Override public String toString(){ return title+"("+year+"),"+"评分:"+star; } } 以下为测试代码 package hadooptraining1.test; import hadoop9999.training.exp1.HadoopSerializationUtil; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.Writable; import org.junit.Test; import java.io.*; import static org.junit.Assert.assertEquals; /** * Hadoop 序列化测试 */ public class HadoopSerializationTest { private static class MovieInfo implements WritableComparable { private String title; private float star; private String actor; @Override public String toString() { return title+","+star+actor; } @Override public int compareTo(Object o) { int result=1; if(o instanceof MovieInfo){ //转换对象类型 MovieInfo mov=(MovieInfo)o; if(star>mov.getStar()){ result=-1;//返回-1说明当前对象排在前面 } } return result; } /** * 序列化,把对象内容写入流 * @param dataOutput * @throws IOException */ @Override public void write(DataOutput dataOutput) throws IOException { //注意 write 属性的顺序必须要和 read 属性顺序一致 dataOutput.writeUTF(title); dataOutput.writeFloat(star); dataOutput.writeUTF(actor); } /** * 反序列化,从流中读取数据转换为对象。 * @param dataInput * @throws IOException */ @Override public void readFields(DataInput dataInput) throws IOException { //注意 write 属性的顺序必须要和 read 属性顺序一致 title=dataInput.readUTF(); star=dataInput.readFloat(); actor=dataInput.readUTF(); } public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } public float getStar() { return star; } public void setStar(float star) { this.star = star; } public String getActor() { return actor; } public void setActor(String actor) { this.actor = actor; } } /** * 测试 Hadoop 的序列化 */ @Test public void testSerialization() { MovieInfo m1=new MovieInfo(); m1.setTitle("你好,李焕英"); m1.setStar(8.0F); m1.setActor(" 贾玲,张小斐,沈腾,陈赫,刘佳"); String filePath="d:/test.txt"; OutputStream output=null; try { //把m1对象内容序列化保存到文件 output=new FileOutputStream(filePath); output.write(HadoopSerializationUtil.serialize(m1)); } catch (IOException e) { e.printStackTrace(); } finally{ if(output!=null) { try { output.close(); } catch (IOException e) { e.printStackTrace(); } } } MovieInfo m2=new MovieInfo(); InputStream input=null; try { //把m1对象内容序列化保存到文件 InputStream is = new FileInputStream(filePath); ByteArrayOutputStream buffer = new ByteArrayOutputStream(); int nRead; byte[] data = new byte[16384]; //读取文件内容并转换为byte[] while ((nRead = is.read(data, 0, data.length)) != -1) { buffer.write(data, 0, nRead); } HadoopSerializationUtil.deserialize(m2,buffer.toByteArray()); } catch (IOException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } finally{ if(input!=null) { try { input.close(); } catch (IOException e) { e.printStackTrace(); } } } //对比序列化 assertEquals(m2.getTitle(),m1.getTitle()); } } package hadooptraining1.test; import com.alibaba.fastjson.JSON; import org.junit.Test; import static org.junit.Assert.assertEquals; public class JsonParseTest { private final static String TEST_JSON ="{'_id':{'$oid':'5ad89b396afaf881d81e6ce3'},'title':'快手枪手快枪手','year':'2016','type':'喜剧,动作','star':5.1,'director':'潘安子','actor':'林更新,张静初,腾格尔,刘晓庆,锦荣,曾江,施予斐,含笑,文淇','pp':16518,'time':115,'film_page':'https://movie.douban.com/subject/26219893/'}"; private static class MovieInfo{ private String title; private float star; private String actor; private String film_page; public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } public float getStar() { return star; } public void setStar(float star) { this.star = star; } public String getActor() { return actor; } public void setActor(String actor) { this.actor = actor; } public String getFilm_page() { return film_page; } public void setFilm_page(String film_page) { this.film_page = film_page; } } @Test public void testJsonParse(){ MovieInfo m= JSON.parseObject(TEST_JSON,MovieInfo.class); assertEquals("快手枪手快枪手",m.getTitle()); } } package hadooptraining1.test; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; import org.junit.Test; import java.io.IOException; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; public class MapperTest { /** * Wordcount 的 Mapper */ private static final class WcMapper extends Mapper { @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { String data=v1.toString(); // 分词 String[] words=data.split(" "); // 输出 k2 v2 for(String w:words){ context.write(new Text(w),new IntWritable(1)); } } } @Test public void testMapper() throws IOException, InterruptedException { Context context = mock(Context.class); WcMapper mapper=new WcMapper(); //模拟v1="hello world" mapper.map(new LongWritable(1),new Text("hello world"),context); verify(context).write(new Text("hello"), new IntWritable(1)); verify(context).write(new Text("world"), new IntWritable(1)); } } 常见问题:

1、如何对mven项目进行打包

在开发代码的时候,我们都不可避免要引用到外面的依赖包。如何在打包的时候把外面依赖包的class文件打包进我们的jar包里面?需要修改pom文件里的build方式。在pom文件里面这样设置(

junit junit 4.12 test ........ org.apache.maven.plugins maven-assembly-plugin 2.5.5 false jar-with-dependencies hadoop.partOne.movieMain //项目的名字 make-assembly package single

2、Could not transfer artifact 报错解决

public aliyun nexus http://maven.aliyun.com/nexus/content/groups/public/ true public aliyun nexus http://maven.aliyun.com/nexus/content/groups/public/ true false

无缘无故报了这个问题,可以在pom依赖里添加上面代码,再执行,可以彻底解决。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3