文章目录
前言
提示:以下是本篇文章正文内容,下面案例可供参考
一、认识Spark GraphX
1.图的基本概念
2.图计算的应用
淘宝图谱计算平台
Google的PageRank网页排名
新浪微博社交网络分析
淘宝、腾讯的推荐应用
GraphX基础概念
GraphX的发展
二、认识图的创建与存储
1.基础知识
(1)Edge:边对象,存有srcId,dstId,attr三个字段,以及一些操作Edge的方法,例:Edge(srcId,dstId,attr)
(2)RDD[Edge]:存放着Edge对象的RDD
(3)EdgeRDD:完整提供边的各种操作类
(4)RDD[(VertexId,VD)]:存放顶点的RDD,顶点有VertexId和VD两个字段,第一个是顶点Id,第二个是顶点属性。定义一个顶点例如:(VertexId,VD)
(5)VertexRDD:提供顶点的各种操作方法的对象
它们之间的继承关系:
EdgeRDD extends RDD[Edge]
VertexRDD extends RDD[(VertexId,VD)]
图的创建是进行图计算的重要步骤,在Spark GraphX中创建图的方式很多,根据不同的数据集有不同的方法。GraphX有一个类Graph,Graph对象是用户的操作入口,主要包含边属性edge(边)、顶点属性vertex(顶点)、图的创建方法、查询方法和其他转换方法等。其中Graph类中图的创建方法主要有3种,适用于不同类型的输入数据。
方法 | 描述 |
---|---|
Graph(vertices,edges, defaultVertexAttr) | 根据分开存放的顶点数据和边数据创建图 |
Graph.fromEdges(RDD[Edge[ED]], defaultValue) | 根据边数据创建图,数据需要转换成RDD[Edge[ED]类型 |
Graph.fromEdgeTuples(rawEdges: RDD[(VertexId, VertexId)], defaultValue,) | 根据边数据创建图,边数据需要加载为二元组,可以选择是否对边分区 |
GraphLoader.edgeListFile(sc,filename) | 直接通过边数据文件创建图,要求数据按空格分隔 |
2.绘制关系网络图
现有用户信息vertices.txt,有3个字段,分别为用户Id,姓名和职业
另一份数据“edges.txt”是用户之间的关系数据,
有3个字段,第1、2个是用户Id,第3个是第1个用户对于第2个用户的关系,
如“3 7 Collaborator”表示3是7的合作伙伴,“5 3 Advisor”表示5是3的导师,
“2 5 Colleague”表示2是5的同事,“5 7 PI”表示5是7的首席研究员
3 7 Collaborator
5 3 Advisor
2 5 Colleague
5 7 PI
三、使用Spark GraphX创建图
- vertices:RDD[(VertexId,VD)]:“顶点”类型的RDD,其中VertexId为顶点ID(必须为Long类型),VD为顶点属性信息。
- edges:RDD[Edge[ED]]:“边”类型的RDD,Edge类包含srcId(起点,Long类型)、dstId(目标点,Long类型),attr(边属性)三个部分。
- defaultVertexAttr:一个固定的顶点信息,用于当数据中出现顶点缺失时使用。
- “顶点”和“边”的RDD来自不同的数据源,与Spark中其他RDD的建立并没有区别,但是顶点Id要求是Long类型
1.代码操作
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
//顶点RDD【顶点的ID 顶点的属性值】ID必须是Long
val users:RDD[(VertexId,(String,String)))]=sc.textFile("file:///root/vertices.txt").map{line=>val lines=line.split(" ");
(lines(0).toLong,(line(1),lines(2)))}
//边RDD【起始点ID,目标点ID,边属性(边的标注,边的权重等)ID 必须是Long】
val relationships:RDD[Edge[String]]=sc.textFile("file:///root/edges.txt").map{line=> val line=line.split(" ");
Edge(lines(0).toLong.line(1).toLong,line(2))}
//定义一个默认(缺失)用户
val defaultUser=("John Doe","Missing")
//使用RDDs建立一个Graph
val graph_urelate=Graph(users,relationships,defaultUser)
//查看顶点信息
graph_urelate.vertices.collect.foreach(println(_))
//查看边信息
graph_urelate.edges.collect.foreach(println(_))
2.根据边创建图
Graph.fromEdgeTuples()通过边的两个顶点ID组成的二元组创建图,将一条边的起点与目标点放在一个二元组中,通过边的二元组RDD创建图。
- rawEdges:RDD[(VertexId,VertexId)]:其中的数据类型是一个起点与目标点的元组
- defaultValue:VD:默认属性值。
- uniqueEdges: Option[PartitionStrategy] = None:是否对边进行分区选项,可选可不选,默认不分区
fromEdgeTuples仅需要边的起点和目标点,将数据“edges.txt”作为输入数据,通过fromEdgeTuples方法构造图
//读取数据文件
val file=sc.textFile("file:///root/edges.txt");
//使用边的起点和终点创建二元组RDD
val edgesRDD:RDD[(VetexId,VertexId)]=file.map(line=>line.split(" ")).map(line=>(line(0).toLong,line(1).toLong))
//创建图
val graph_fromEdgeTuples=Graph.fromEdgeTuples(edgesRDD,1)
//查看顶点信息
graph_fromEdgeTuples.vertices.collect.foreach(println(_))
//查看边信息
graph_fromEdgeTuples.edges.collect.foreach(println(_))
四、图的缓存及释放
在图计算过程中,如果频繁使用一个图,那么为了节省重新计算的时间,将图进行缓存是必要的。图的缓存和释放缓存主要方法如表所示
方法 | 描述 |
---|---|
cache() | 缓存整个图 |
persist(newLevel:StorageLevel = StorageLevel.MEMORY_ONLY) | 缓存整个图,并指定存储级别 |
unpersistVertices(blocking: Boolean = true) | 释放顶点缓存 |
edges.unpersist(blocking = true) | 释放边缓存 |
1.图缓存方法
- 缓存有cache和persist两种方法
- 对于persist,如果想指定缓存类型为StorageLevel.MEMORY_ONLY,需要导入org.apache.spark.storage.StorageLevel包
操作:
import org.apache.spark.storage.StorageLevel
graph_urelate.cache()
graph_fromEdges.persist()
graph_fromEdgeTuples.persist(StorageLevel.MEMORY_ONLY)
2.图释放缓存方法
- Graph.unpersist(blocking = true):释放整个图缓存
- Graph.unpersistVertices(blocking = true):释放内存中缓存的vertices
- Graph.edges.unpersist(blocking = true):释放边缓存,使用对边进行修改,但会重复使用点进行运算的操作
操作:
graph_fromEdges.unpersist(blocking = true)
graph_fromEdgeTuples.unpersistVertices(blocking = true)
graph_fromEdgeTuples.edges.unpersist(blocking = true)