文章目录
📚PageRank概述🐇什么是PageRank🐇PageRank的简化模型🐇PageRank的随机浏览模型
📚实验目的📚实验平台📚实验内容🐇在本地编写程序和调试🐇在集群上提交作业并执行🥕Mapreduce方法🥕Spark方法⭐️代码⭐️打包过程
📚PageRank概述
🐇什么是PageRank
PageRank是一种在搜索引擎中根据网页之间相互的链接关系计算网页排名的技术。PageRank是Google用来标识网页的等级或重要性的一种方法。其级别从1到10级,PR值越高说明该网页越受欢迎(越重要)。被许多优质网页所链接的网页,多半也是优质网页。一个网页要想拥有较高的PR值的条件:
有很多网页链接到它有高质量的网页链接到它
🐇PageRank的简化模型
可以把互联网上的各个网页之间的链接关系看成一个有向图。 对于任意网页Pi,它的PageRank值可表示为: ![在这里插入图片描述](https://img-blog.csdnimg.cn/bf38ee520a1c402183e3058858bf407a.png) 从定义上来看,一个网页的PR值就是其他网页的PR值平均分流后传入到的此网页的总PR值。 PR值的计算就是经过多次迭代不断更新PR值直到满足一定的收敛条件。
实际的网络超链接环境没有这么理想化,PageRank会面临两个问题:
排名泄露:指存在网页出度为0,那么网页总的PR值在迭代过程中,指向这一个网页的有向边会不断流失PR值。(因为该网页X的PR值在迭代中用不上,相当于流失掉了)最终整个图的PR值都是0; 排名下沉:整个网页图中若有网页没有入度链接,如节点A所示,其所产生的贡献会被由节点B、C、D构成的强联通分量“吞噬”掉,就会产生排名下沉,节点A的PR值在迭代后会趋向于0。
🐇PageRank的随机浏览模型
假定一个上网者从一个随机的网页开始浏览,上网者不断点击当前网页的链接开始下一次浏览。但是,上网者最终厌倦了,开始了一个随机的网页,随机上网者用以上方式访问一个新网页的概率就等于这个网页的PageRank值,这种随机模型更加接近于用户的浏览行为。
![在这里插入图片描述](https://img-blog.csdnimg.cn/4f3c0513180542809a94f364dfade484.png)
设定任意两个顶点之间都有直接通路,在每个顶点处以概率d按原来蓝色方向转移,以概率1-d按红色方向转移。
![在这里插入图片描述](https://img-blog.csdnimg.cn/63c08aedda5d4268825a52b0851576c6.png)
从模型上来看就是增加了1-d的部分。此时一个网页的PR值不仅仅取决于指向自己的网页这一部分(所占权值为d),还有另外一部分来自于任意一个网页,可以认为有概率1-d是来自于其他网页随即浏览的跳转,值为1/N(总网页数为N)。
![在这里插入图片描述](https://img-blog.csdnimg.cn/a33bddd7fe794eeeb08f7ad62c59b8f5.png)
📚实验目的
PageRank 网页排名的算法,曾是 Google 关键核心技术。用于衡量特定网页相对于搜索引擎索引中的其他网页而言的重要程度。通过对 PageRank 的编程在Hadoop 和 Spark 上的实现,熟练掌握 MapReduce 程序与 Spark 程序在集群上的提交与执行过程,加深对 MapReduce 与 Spark 的理解。
📚实验平台
操作系统:Linux;Hadoop 版本:3.2.2;JDK 版本:1.8;Java IDE:Eclipse 3.8。Spark 版本:集群环境为 3.2.1
📚实验内容
写在前面:本次实验综合参考了此博客(点此直达),本篇本地主要展示Mapreduce,其中Mapreduce的本地提交和集群提交和实验二的方法基本相同。Spark部分借助sbt打包提交到集群,其中sbt打包部分参考博客点此达。
🐇在本地编写程序和调试
input文件DataSet下载链接,数据集中每一行内容的格式:网页+\t+该网页链接到的网页的集合(相互之间用英文逗号分开)。 (图片来源:隆华爱读书我不爱读书所以我没书读)
![在这里插入图片描述](https://img-blog.csdnimg.cn/f2edc83fefbf48a9b13fc3f8002f0b22.png)
![在这里插入图片描述](https://img-blog.csdnimg.cn/e913a6b2717c42998a351ff35e25d984.png)
package pagerank;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class pagerank
{
//用于处理原始输入得到所需输入格式
public static class GraphBuilder
{
public static class GraphBuilderMapper extends Mapper
{
@Override
//map:逐行分析原始数据
//输入:
//key不是page,而是行偏移。value是整个
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
String initialpr = "1.0"; //初始化PR值
String[] temp = value.toString().split("\t");//按tab分片,temp[0]是网页,temp[1]是链接的网页
//输出
//
context.write(new Text(temp[0]), new Text(initialpr + "\t" + temp[1]));
}
}
//Reduce什么也不需要干,因此可以不写Reduce,直接将Map的输出作为最后的输出即可
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
Job job = Job.getInstance(conf, "GraphBuilder");
job.setJarByClass(GraphBuilder.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(GraphBuilderMapper.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
//迭代计算各个网页的PageRank值
public static class PageRankIter
{
private static final double d = 0.85; //damping阻尼系数
public static class PRIterMapper extends Mapper
{
//map:逐行分析数据
//输入:
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
//按tab分片,temp[0]是网页,temp[1]是初始PR值,temp[2]是链接(指向)网页
String[] temp = value.toString().split("\t");
String url = temp[0];
double cur_rank = Double.parseDouble(temp[1]);//转换成double类型的数据
if (temp.length > 2)
{//说明有链接的(指向的)网页
String[] link = temp[2].split(",");//按逗号分片
for (String i : link)
{
//输出:
//对于pageList里的每一个page都输出
context.write(new Text(i), new Text(String.valueOf(cur_rank / link.length)));
}
}
//输出:
//将所有出边tuple[2]传递到Reduce,这里用一个‘&’作为分割,方便后续判断是哪一种输入类型,即value是pr还是pageList。
context.write(new Text(url), new Text("&" + temp[2])); // 做个标记"&"
}
}
public static class PRIterReducer extends Reducer
{
//reduce:按行处理数据
//输入:;
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException
{
double new_rank = 0;//用于累加PR
String outurl = "";
for (Text i : values)
{
//对输入的value首字符进行判断,如果是'&'表示这是一个图结构,那么就保存下来便于待会Reduce输出;如果不是,那么说明是pr值,进行累加。
String temp = i.toString();
if (temp.startsWith("&"))
{//有&标识符,标志着向的信息
outurl = temp.substring(1);//取出向的网页列表
}
else
{//存放的是计算的PR中间值
new_rank += Double.parseDouble(temp);//原始PR+中间计算值
}
}
new_rank = d * new_rank + (1 - d); //加上阻尼系数限制,计算最后的PR
//输出:
//
context.write(key, new Text(String.valueOf(new_rank)+"\t"+outurl));
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
Job job = Job.getInstance(conf, "PageRankIter");
job.setJarByClass(PageRankIter.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(PRIterMapper.class);
job.setReducerClass(PRIterReducer.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
//将PageRank值从大到小输出
public static class RankViewer
{
public static class PRViewerMapper extends Mapper
{
//处理经过迭代处理后输出的Data中间文件
//输入:
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
String[] temp = value.toString().split("\t");//按tab分片,temp[0]是网页,temp[1]是PR值,temp[2]是向的网页信息
// 输出:
//将PR提出来变为key,page作为value(图结构不需要用了,可以抛弃掉了)。
context.write(new DoubleWritable(Double.parseDouble(temp[1])), new Text(temp[0]));
}
}
public static class DescDoubleComparator extends DoubleWritable.Comparator
{
//给定的标准输出是按照降序排序,而Map默认是升序排序,因此需要自定义一个排序函数。基本就是继承原有的类将输出变成相反数即可
//重载key的比较函数,变为从大到小
public float compare(WritableComparator a, WritableComparable b)
{
return -super.compare(a, b);
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)
{
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}
public static class PRViewerReducer extends Reducer
{
//输入:
@Override
protected void reduce(DoubleWritable key, Iterable values, Context context) throws IOException, InterruptedException
{
for (Text i : values)
{
//遍历网页列表,输出:(网页名,保留小数点后10位的PR值)
context.write(new Text("(" + i + "," + String.format("%.10f", key.get()) + ")"), null);
}
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
Job job = Job.getInstance(conf, "RankViewer");
job.setJarByClass(RankViewer.class);
job.setOutputKeyClass(DoubleWritable.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(PRViewerMapper.class);
job.setSortComparatorClass(DescDoubleComparator.class);
job.setReducerClass(PRViewerReducer.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
//以PageRankfunction作为主类,调用前三个函数的main函数
public static class PageRankFunction
{
//进行10次迭代
private static int times = 10;
public static void main(String[] args) throws Exception
{
//建立网页间的连接信息并初始化PR值,结果存入Data0
String[] functionPageRankBuilder = {"", args[1] + "/Data0"};
functionPageRankBuilder[0] = args[0];
GraphBuilder.main(functionPageRankBuilder);
String[] functionPageRankIter = {"", ""}; //迭代操作
for (int i = 0; i args[1] + "/Data" + times, args[1] + "/FinalRank"};
RankViewer.main(functionPageRankViewer);
}
}
// 主函数入口
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs:localhost:9000");
PageRankFunction.main(args);
}
}
补充说明 :关于pagerank的Mapreduce本地运行
首先记得开start-dfs.shRun As后选中Run on Hadoop 选中PageRankfunction后再点OK,以它为主类,调用其他函数的main函数。 以上是选好主类,但是还跑不了👀,得再去设置ArgumentsRun As里选中Run Configurations... 点开之后,将pagerank.PageRankFunction的Arguments设置好input output,然后再点Run。 最后等着就行,等一会儿之后refresh就好。
🐇在集群上提交作业并执行
🥕Mapreduce方法
把代码里的"hdfs://localhost"改为"hdfs://10.102.0.198:9000",一共有4个地方需要改。 修改后通过export,导出jar包,关注Main-Class的设置,这里详见实验二(不想再写一遍了 ![在这里插入图片描述](https://img-blog.csdnimg.cn/cad7bb77a6af4e788edea9b583486f29.png) ![在这里插入图片描述](https://img-blog.csdnimg.cn/86406842711147f8af0f09a9020e39c0.png) 在终端依次输入
集群的服务器地址为 10.102.0.198,用户名和密码为“bigdata_学号”,用户主目录为/home/用户名,hdfs 目录为/user/用户名。集群上的数据集存放目录为 hdfs://10.102.0.198:9000/ex3/input。
scp pagerank.jar bigdata_学号@10.102.0.198:/home/bigdata_学号
ssh bigdata_学号@10.102.0.198
hadoop jar pagerank.jar /ex3/input /user/bigdata_学号/Experiment_3_Hadoop
diff FileSystem, Path}
import java.io.File
object PageRank
{
//删除HDFS上指定的文件。
def hdfsDel(sc: SparkContext, filePath: String): Unit =
{
//第一步将文件路径转换为Path对象
val output = new Path(filePath)
//第二步获得与SparkContext的配置相关联的hadoopConfiguration
val conf = sc.hadoopConfiguration
//然后通过FileSystem.get (conf)获得hdfs文件系统
val hdfs = FileSystem.get(conf)
//最后,检查文件是否存在于文件系统中,如果存在则用hdfs对象对其进行删除
//hdfs.delete(output, true)中的true表明该目录不为空
if (hdfs.exists(output))
hdfs.delete(output, true)
}
def main(args: Array[String]): Unit =
{
//setMaster:设置主节点,使用与类似于hdfs的conf设置
val conf = new SparkConf().setAppName("PageRank_lalayouyi").setMaster("spark://10.102.0.198:7077")
//通过SparkContext得到Spark的上下文,可以连接到文件系统,主要还是得到RDD算子进行操作。
val scontext = new SparkContext(conf)
//-----------------------------进行迭代前的一些准备-------------------------
val d = 0.85//阻尼系数
val iterCount = 10//迭代次数
//从HDFS读取图结构,并把图结构存入内存
val lines = scontext.textFile("hdfs://10.102.0.198:9000/ex3/input", 1)
//map,得到,即
//将每行数据按tab分隔,取分隔后的第一个元素作为key,取分隔后的第二个元素,按逗号分隔成多个元素作为value,返回一个新的RDD,RDD(弹性分布式数据集)
//这一系列操作会被多次使用,最后加个cache表示将此RDD存放内存,增加代码的效率
val links = lines.map(line => (line.split("\t")(0), line.split("\t")(1).split(","))).cache()
//初始化PR值
var ranks = links.mapValues(_ => 1.0)
//------------------------------- 接下来进行迭代---------------------------
for (i
//将每个网页的排名值(rank)平均分配给它所指向的所有链接页面(linkList)
//由一个page输出多个 |