Spark |
您所在的位置:网站首页 › 数据图形分析术语 › Spark |
Spark Graphx
基于Spark Graphx的图形数据分析一、图计算的好处二、图的概念1、图的基本概念2、图的术语(1)顶点(Vertex)和边(Edge)(2)有向图和无向图(3)有环图和无环图(4)度
3、图的经典表示法
三、Spark GraphX1、简介2、GraphX核心抽象3、GraphX API4、图的算子(1)属性算子(2)结构算子a、reverseb、subgraphc、maskd、groupEdges
(3)join算子a、outerJoinVerticesb、joinVertices
7、GraphX API 应用8、练习
四、PageRank1、PageRank(PR)算法2、PageRank应用
五、Pregel1、连通分量2、Pregel概述3、Pregel API
基于Spark Graphx的图形数据分析
官网指导https://spark.apache.org/docs/latest/graphx-programming-guide.html#pregel-api 一、图计算的好处 许多大数据以大规模图或网络的形式呈现许多非图结构的大数据,常会被转换为图模型进行分析图数据结构很好地表达了数据之间的关联性 二、图的概念 1、图的基本概念 图是由顶点集合(vertex)及顶点间的关系集合(边edge)组成的一种网状数据结构 通常表示为二元组:Gragh=(V,E)可以对事物之间的关系建模 应用场景 在地图应用中寻找最短路径社交网络关系网页间超链接关系 2、图的术语 (1)顶点(Vertex)和边(Edge)一般关系图中,事物为顶点,关系为边 定义一个图: Graph=(V,E) 集合V={v1,v2,v3} 集合E={(v1,v2),(v1,v3),(v2,v3)}1、对于每条边,矩阵中相应单元格值为1 2、对于每个循环,矩阵中相应单元格值为2,方便在行或列上求得顶点度数 三、Spark GraphX 1、简介GraphX是Spark提供分布式图计算API GraphX特点 基于内存实现了数据的复用与快速读取通过弹性分布式属性图(Property Graph)统一了图视图与表视图与Spark Streaming、Spark SQL和Spark MLlib等无缝衔接针对某些领域,如社交网络、语言建模等,graph-parallel系统可以高效地执行复杂的图形算法,比一般的data-parallel系统更快 Graphx是将graph-parallel的data-parallel统一到一个系统中。允许用户将数据当成一个图或一个集合RDD,而简化数据移动或复杂操作。 2、GraphX核心抽象 弹性分布式属性图(Resilient Distributed Property Graph) 顶点和边都带属性的有向多重图对Graph视图的所有操作,最终都会转换成其关联的Table视图的RDD操作来完成 3、GraphX API Graph[VD,ED]VertexRDD[VD]EdgeRDD[ED]EdgeTriplet[VD,ED]Edge:样例类VertexId:Long的别名创建一个简单的GraphX: object MyGraghx { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("ghx").setMaster("local[*]") val sc = new SparkContext(conf) //建立所有的点 val vects = sc.makeRDD(Seq((3L,("rxin","stu")),(5L,("zs","prof")),(5L,("ls","prof")),(7L,("xx","pst")))) //建立所有的边 val edges = sc.makeRDD(Seq(Edge(2L,5L,"ts"),Edge(5L,3L,"zd"),Edge(5L,7L,"pi"),Edge(3L,7L,"collab"))) //建立图 val graph = Graph(vects,edges) //要添加新点 val newPoint = sc.parallelize(Array((3L,"hehe"),(5L,"xixi"),(4L,"cici"))) println("***********************************************************") graph.joinVertices(newPoint)((id,src,newval)=>(src._1+"@"+newval,src._2)).vertices.foreach(f=>println(f._2)) println("***********************************************************") println(graph.numEdges,graph.numVertices,graph.edges) println("***********************************************************") graph.vertices.foreach(x=>println(x._1,x._2)) println("***********************************************************") graph.edges.foreach(x=>println(x.srcId,x.dstId,x.attr)) println("***********************************************************") graph.triplets.foreach(x=>println(x.srcAttr,x.dstAttr,x.attr)) println("***********************************************************") graph.inDegrees.foreach(x=>println(x)) println("***********************************************************") graph.degrees.foreach(println) println("***********************************************************") graph.mapEdges(e=>Edge(e.srcId,e.dstId,e.attr+",Hello")).edges.foreach(x=>println(x)) } }输出结果: *********************************************************** (zs@xixi,prof) (xx,pst) null (rxin@hehe,stu) *********************************************************** (4,4,EdgeRDDImpl[13] at RDD at EdgeRDD.scala:41) *********************************************************** (3,(rxin,stu)) (2,null) (7,(xx,pst)) (5,(zs,prof)) *********************************************************** (5,3,zd) (3,7,collab) (2,5,ts) (5,7,pi) *********************************************************** (null,(zs,prof),ts) ((zs,prof),(rxin,stu),zd) ((zs,prof),(xx,pst),pi) ((rxin,stu),(xx,pst),collab) *********************************************************** (7,2) (3,1) (5,1) *********************************************************** (2,1) (3,2) (7,2) (5,3) *********************************************************** Edge(2,5,Edge(2,5,ts,Hello)) Edge(3,7,Edge(3,7,collab,Hello)) Edge(5,3,Edge(5,3,zd,Hello)) Edge(5,7,Edge(5,7,pi,Hello)) 4、图的算子 (1)属性算子 类似于RDD的map操作 mapVerticesmapEdgesmapTriplets class Graph[VD, ED] { def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED] def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2] def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] } val t1_graph = tweeter_graph.mapVertices { case(vertextId, (name, age)) => (vertextId, name) } val t2_graph = tweeter_graph.mapVertices { (vertextId, attr) => (vertextId, attr._1) } val t3_graph = tweeter_graph.mapEdges(e => Edge(e.srcId, e.dstId, e.attr*7.0))这里每一个操作产生一个新图,其顶点和边被用户定义的map函数修改了。 注意: 在每一个实例图结构不受影响。这是这些操作的关键特征,这允许结果图重复利用原始图的结构索引。下面的代码片段逻辑上是等同的,但是第一个没有保存结构索引,其不会从GraphX系统优化中获益。 (2)结构算子 reversesubgraphmaskgroupEdges演化一下这四个结构算子。 首先先创建一个新图,三个参数: users就是所有顶点的rdd(RDD[(VertexId, VD)])relationships就是所有边的集合RDD[Edge[ED]]defaultUser是默认的顶点,也就是说如果relationships里面的源id或者目标id在users里面找不到,就会把找不到的id的那个人当成是defaultUser val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((1L, ("a", "student")), (2L, ("b", "salesman")), (3L, ("c", "programmer")), (4L, ("d", "doctor")), (5L, ("e", "postman")))) val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(1L, 2L, "customer"),Edge(3L, 2L, "customer"), Edge(3L, 4L, "patient"), Edge(5L, 4L, "patient"), Edge(3L, 4L, "friend"), Edge(5L, 99L, "father"))) val defaultUser = ("f", "none") val graph = Graph(users, relationships, defaultUser)然后打印一下这个图: graph.triplets.map( triplet => triplet.srcAttr._1 + " ——(" + triplet.attr + ")——> " + triplet.dstAttr._1 ).collect.foreach(println(_))输出图中各点的关系:打印结果如下,这个graph表示的是a到f这几个人之间的关系,a是b的客户,c是b的客户,c是d的病人,c是d的病人,c是d的朋友,e是f的爸爸 a ——(customer)——> b c ——(customer)——> b c ——(patient)——> d e ——(patient)——> d c ——(friend)——> d e ——(father)——> f a、reversereverse算子的作用就是把edge的方向反过来,在这里就是把每个人的关系反过来一下 代码如下: val reverseGraph = graph.reverse reverseGraph.triplets.map( triplet => triplet.srcAttr._1 + " ——(" + triplet.attr + ")——> " + triplet.dstAttr._1 ).collect.foreach(println(_))输出结果: -------------------------原始关系-------------------------------- a ——(customer)——> b c ——(customer)——> b c ——(patient)——> d e ——(patient)——> d c ——(friend)——> d e ——(father)——> f -------------------------reverse-------------------------------- b ——(customer)——> a b ——(customer)——> c d ——(patient)——> c d ——(patient)——> e d ——(friend)——> c f ——(father)——> e如图比较所有的关系双方都反过来了 b、subgraphsubgraph顾名思义就是取原来graph的子graph,获取子graph肯定是有条件过滤掉一部分数据,剩下来的就是子graph 代码如下: val subGraph = graph.subgraph(vpred = (id, attr) => attr._1 > "b") subGraph.triplets.map( triplet => triplet.srcAttr._1 + " ——(" + triplet.attr + ")——> " + triplet.dstAttr._1 ).collect.foreach(println(_))输出结果: -------------------------原始关系-------------------------------- a ——(customer)——> b c ——(customer)——> b c ——(patient)——> d e ——(patient)——> d c ——(friend)——> d e ——(father)——> f -------------------------subgraph------------------------------- c ——(patient)——> d e ——(patient)——> d c ——(friend)——> d e ——(father)——> f这个子graph,只保留了父graph中第一个属性比b的ascii码大的vertex c、maskmask算子就是求当前graph和另外一个graph的交集 代码如下,我们使用了上一个算子的结果作为当前graph进行mask的参数: val maskGraph = graph.mask(subGraph) maskGraph.triplets.map( triplet => triplet.srcAttr._1 + " ——(" + triplet.attr + ")——> " + triplet.dstAttr._1 ).collect.foreach(println(_))输出结果: -------------------------原始关系-------------------------------- a ——(customer)——> b c ——(customer)——> b c ——(patient)——> d e ——(patient)——> d c ——(friend)——> d e ——(father)——> f ---------------------------mask------------------------------- c ——(patient)——> d e ——(patient)——> d c ——(friend)——> d e ——(father)——> f d、groupEdgesgroupEdges的作用是将2个vertex之间的所有edge进行合并,我们知道graphx处理的是多重图,多重图的特征就是2个顶点之间可能有多个平行边,这里的groupEdges就可以把这些平行边合并 代码如下: val combineGraph = graph .partitionBy(PartitionStrategy.EdgePartition1D) .groupEdges(merge = (e1, e2) => e1 + " and " + e2) combineGraph.triplets.map( triplet => triplet.srcAttr._1 + " ——(" + triplet.attr + ")——> " + triplet.dstAttr._1 ).collect.foreach(println(_))我们这里将平行边的元素用and连接起来了,这里要注意的是,使用groupEdges算子之前,必须先用一下partitionBy,不然不起作用的 打印结果如下: -------------------------原始关系-------------------------------- a ——(customer)——> b c ——(customer)——> b c ——(patient)——> d e ——(patient)——> d c ——(friend)——> d e ——(father)——> f ---------------------------groupEdges------------------------------- a ——(customer)——> b c ——(customer)——> b c ——(patient and friend)——> d e ——(patient)——> d e ——(father)——> f (3)join算子定义一个graph来测试join: val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array( (1L, ("a", "student")), (2L, ("b", "salesman")), (3L, ("c", "programmer")), (4L, ("d", "doctor")), (5L, ("e", "postman")) )) val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(1L, 2L, "customer"), Edge(3L, 2L, "customer"), Edge(3L, 4L, "patient"), Edge(5L, 4L, "patient"), Edge(3L, 4L, "friend"), Edge(5L, 99L, "father"))) val defaultUser = ("f", "none") val graph = Graph(users, relationships, defaultUser)这个graph描述了每个人的名字和工作,这里我们给每个人增加除了名字和工作的其他属性,这个属性就是年龄属性 因此,我们需要定义一个rdd,描述每个人的年龄。 代码如下: val userWithAge: RDD[(VertexId, Int)] = sc.parallelize(Array( (3L, 2), (4L, 19), (5L, 23), (6L, 42), (7L, 59) ))这里我们定义了id为3到7的这5个人的年龄,注意我们原来的graph的所有人的id为1到5,接下来有2种方法来把这个年龄属性加到我们graph中的每个人上面: a、outerJoinVertices第一种方法就是outerJoinVertices,代码如下: graph.outerJoinVertices(userWithAge) { (id, attr, age) => age match { case Some(a) => (attr._1, attr._2, a) case None => (attr._1, attr._2,"none") } }.vertices.collect.foreach(println) b、joinVertices第二种方法就是joinVertices,代码如下: graph.joinVertices(userWithAge) { (id, attr, age) => { (attr._1 + "", attr._2 + "、" + age) }}.vertices.collect.foreach(println)输出结果: -------------------------outerJoinVertices-------------------------------- (1,(a,student,none)) (2,(b,salesman,none)) (3,(c,programmer,2)) (99,(f,none,none)) (4,(d,doctor,19)) (5,(e,postman,23)) -------------------------joinVertices-------------------------------- (1,(a,student)) (2,(b,salesman)) (3,(c,programmer、2)) (99,(f,none)) (4,(d,doctor、19)) (5,(e,postman、23)) 7、GraphX API 应用 计算用户粉丝数量例子2: // 创建顶点集RDD val vertices: RDD[(VertexId, (Int, Int))] = sc.parallelize(Array((1L, (7,-1)), (2L, (3,-1)), (3L, (2,-1)), (4L, (6,-1)))) // 创建边集RDD val relationships: RDD[Edge[Boolean]] = sc.parallelize(Array(Edge(1L, 2L, true), Edge(1L, 4L, true), Edge(2L, 4L, true), Edge(3L, 1L, true), Edge(3L, 4L, true))) // 创建图 val graph = Graph(vertices, relationships) //Pregel val minGraph = graph.pregel(initialMsg, Int.MaxValue, EdgeDirection.Out)(vprog, sendMsg, mergeMsg) minGraph.vertices.collect.foreach{ case (vertexId, (value, original_value)) => println(value) } val initialMsg = 9999 def vprog(vertexId: VertexId, value: (Int, Int), message: Int): (Int, Int) = { if (message == initialMsg) value else (message min value._1, value._1) } def sendMsg(triplet: EdgeTriplet[(Int, Int), Boolean]): Iterator[(VertexId, Int)] = { val sourceVertex = triplet.srcAttr if (sourceVertex._1 == sourceVertex._2) Iterator.empty else Iterator((triplet.dstId, sourceVertex._1)) } def mergeMsg(msg1: Int, msg2: Int): Int = msg1 min msg2//Pregel val minGraph = graph.pregel(initialMsg, Int.MaxValue, EdgeDirection.Out)(vprog, sendMsg, mergeMsg) minGraph.vertices.collect.foreach{ case (vertexId, (value, original_value)) => println(value) } val initialMsg = 9999 def vprog(vertexId: VertexId, value: (Int, Int), message: Int): (Int, Int) = { if (message == initialMsg) value else (message min value._1, value._1) } def sendMsg(triplet: EdgeTriplet[(Int, Int), Boolean]): Iterator[(VertexId, Int)] = { val sourceVertex = triplet.srcAttr if (sourceVertex._1 == sourceVertex._2) Iterator.empty else Iterator((triplet.dstId, sourceVertex._1)) } def mergeMsg(msg1: Int, msg2: Int): Int = msg1 min msg2 ![\[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IZQpa4kP-1597625203080)(../../../imgs/pregel2.png)\]](https://img-blog.csdnimg.cn/20200817085122232.png#pic_center) |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |