0
点赞
收藏
分享

微信扫一扫

Spark组件之GraphX学习8--随机图生成和TopK最大入度


更多代码请见:​​https://github.com/xubo245/SparkLearning​​


1.解释

前一篇为最大入度,但是有时候需要求topK,这篇博文求的TopK的入度,出度和度的求法类似


2.代码:

/**
* @author xubo
* ref http://spark.apache.org/docs/1.5.2/graphx-programming-guide.html
* time 20160503
*/

package org.apache.spark.graphx.learning

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.VertexId
import org.apache.spark.graphx.VertexRDD
import org.apache.spark.graphx.util.GraphGenerators
import org.jets3t.apps.synchronize.Synchronize
import breeze.linalg.reverse
import breeze.linalg.reverse

object GraphGeneratorsAndTopK {

val K = 3
var arr = new Array[(Int, Int)](K)
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("GraphGeneratorsAndTopK").setMaster("local[4]")
// Assume the SparkContext has already been constructed
val sc = new SparkContext(conf)

// Import random graph generation library
// Create a graph with "age" as the vertex property. Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 10).mapVertices((id, _) => id.toDouble)
// Compute the number of older followers and their total age

println("Graph:");
println("sc.defaultParallelism:" + sc.defaultParallelism);
println("vertices:");
graph.vertices.collect.foreach(println(_))
println("edges:");
graph.edges.collect.foreach(println(_))
println("count:" + graph.edges.count);
println("\ninDegrees");
graph.inDegrees.foreach(println)

for (i <- 0 until K) {
arr(i) = (0, 0)
}

// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
if (a._2 > b._2) a else b
}

// Define a reduce operation to compute the highest degree vertex
def min(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
if (a._2 < b._2) a else b
}
def minInt(a: (Int, Int), b: (Int, Int)): (Int, Int) = {
if (a._2 < b._2) a else b
}

// arr.reduce(minInt)

println("\ntopK:K=" + K);
def topK(a: (VertexId, Int)): Unit = {
if (a._2 >= arr.reduce(minInt)._2) {
arr = arr.sortBy(_._2).reverse
var tmp = (a._1.toInt, a._2)
var flag = true
for (i <- 0 until arr.length) {
if (a._2 >= arr(i)._2) { //newest max,remove = and last max
if (flag == true) {
for (j <- i + 1 until arr.length reverse) {
arr(j) = arr(j - 1)
}
arr(i) = tmp
}
flag = false
}
}
}
}

graph.inDegrees.foreach(topK(_))
arr.foreach(println)
println("end");
}
}


3.结果:

Graph:
sc.defaultParallelism:4
vertices:
(4,4.0)
(0,0.0)
(8,8.0)
(1,1.0)
(9,9.0)
(5,5.0)
(6,6.0)
(2,2.0)
(3,3.0)
(7,7.0)
edges:
Edge(0,1,1)
Edge(0,3,1)
Edge(0,6,1)
Edge(0,7,1)
Edge(0,8,1)
Edge(1,2,1)
Edge(1,4,1)
Edge(1,6,1)
Edge(1,8,1)
Edge(2,0,1)
Edge(2,0,1)
Edge(2,1,1)
Edge(2,4,1)
Edge(2,7,1)
Edge(2,8,1)
Edge(3,0,1)
Edge(3,1,1)
Edge(3,2,1)
Edge(3,5,1)
Edge(3,6,1)
Edge(3,7,1)
Edge(4,0,1)
Edge(4,0,1)
Edge(4,3,1)
Edge(4,7,1)
Edge(5,2,1)
Edge(5,4,1)
Edge(5,9,1)
Edge(6,0,1)
Edge(6,0,1)
Edge(6,1,1)
Edge(6,3,1)
Edge(6,3,1)
Edge(6,4,1)
Edge(6,9,1)
Edge(7,2,1)
Edge(7,3,1)
Edge(7,6,1)
Edge(8,2,1)
Edge(8,2,1)
Edge(8,3,1)
Edge(8,4,1)
Edge(8,4,1)
Edge(8,4,1)
Edge(9,5,1)
Edge(9,5,1)
Edge(9,7,1)
count:47

inDegrees
(1,4)
(9,2)
(5,3)
(3,6)
(7,5)
(6,4)
(2,6)
(4,7)
(0,7)
(8,3)

topK:K=3
(4,7)
(0,7)
(3,6)
end


参考

【1】 http://spark.apache.org/docs/1.5.2/graphx-programming-guide.html

【2】​​https://github.com/xubo245/SparkLearning​​




举报

相关推荐

0 条评论