Spark и Scala: применить функцию к каждому элементу RDD

У меня есть RDD VertexRDD [(VertexId, Long)], структурированный следующим образом:

(533, 1)
(571, 2)
(590, 0)
...

Где каждый элемент состоит из идентификатора вершин (533, 571, 590 и т.д.) И его количества исходящих ребер (1, 2, 0 и т.д.).

Я хочу применить функцию к каждому элементу этого RDD. Эта функция должна выполнять сравнение между количеством исходящих ребер и 4 порогами.

Если количество исходящих ребер меньше или равно одному из 4 порогов, то соответствующий идентификатор вершин должен быть вставлен в Array (или другую подобную структуру данных), чтобы получить в конце 4 структуры данных, каждый из которых содержит ids вершин, удовлетворяющих условию с соответствующим порогом.

Мне нужно, чтобы идентификаторы, которые удовлетворяли сравнению с тем же порогом, который должен быть накоплен в одной и той же структуре данных. Как я могу параллельно и реализовать этот подход с помощью Spark и Scala?

Мой код:

val usersGraphQuery = "MATCH (u1:Utente)-[p:PIU_SA_DI]->(u2:Utente) RETURN id(u1), id(u2), type(p)"
val usersGraph = neo.rels(usersGraphQuery).loadGraph[Any, Any]
val numUserGraphNodes = usersGraph.vertices.count
val numUserGraphEdges = usersGraph.edges.count
val maxNumOutDegreeEdgesPerNode = numUserGraphNodes - 1
// get id and number of outgoing edges of each node from the graph
// except those that have 0 outgoing edges (default behavior of the outDegrees API)
var userNodesOutDegreesRdd: VertexRDD[Int] = usersGraph.outDegrees
/* userNodesOutDegreesRdd.foreach(println) * Now you can see * (533, 1) * (571, 2) */
// I also get ids of nodes with zero outgoing edges
var fixedGraph: Graph[Any, Any] = usersGraph.outerJoinVertices(userNodesOutDegreesRdd)( (vid: Any, defaultOutDegrees: Any, outDegOpt: Option[Any]) => outDegOpt.getOrElse(0L) )
var completeUserNodesOutDregreesRdd = fixedGraph.vertices
/* completeUserNodesOutDregreesRdd.foreach(println)
* Now you can see
* (533, 1)
* (571, 2)
* (590, 0) <--
*/
// 4 thresholds that identify the 4 clusters of User nodes based on the number of their outgoing edges
var soglia25: ****** = (maxNumOutDegreeEdgesPerNode.to******/100)*25
var soglia50: ****** = (maxNumOutDegreeEdgesPerNode.to******/100)*50
var soglia75: ****** = (maxNumOutDegreeEdgesPerNode.to******/100)*75
var soglia100: ****** = maxNumOutDegreeEdgesPerNode
println("soglie: "+soglia25+", "+soglia50+", "+soglia75+", "+soglia100)
// containers of individual clusters
var lowSAUsers = new ListBuffer[(Long, Any)]()
var mediumLowSAUsers = new ListBuffer[(Long, Any)]()
var mediumHighSAUsers = new ListBuffer[(Long, Any)]()
var highSAUsers = new ListBuffer[(Long, Any)]()
// overall container of the 4 clusters
var clustersContainer = new ListBuffer[ (String, ListBuffer[(Long, Any)]) ]()
// I WANT PARALLEL FROM HERE -----------------------------------------------
// from RDD to Array
var completeUserNodesOutDregreesArray = completeUserNodesOutDregreesRdd.take(numUserGraphNodes.toInt)
// analizzo ogni nodo Utente e lo assegno al cluster di appartenenza
for(i<-0 to numUserGraphNodes.toInt-1) { // confronto il valore del numero di archi in uscita (convertito in stringa) // con le varie soglie per determinare in quale classe inserire il relativo nodo Utente if( (completeUserNodesOutDregreesArray(i)._2).toString().toLong <= soglia25 ) { println("ok soglia25 ") lowSAUsers += completeUserNodesOutDregreesArray(i) }else if( (completeUserNodesOutDregreesArray(i)._2).toString().toLong <= soglia50 ){ println("ok soglia50 ") mediumLowSAUsers += completeUserNodesOutDregreesArray(i) }else if( (completeUserNodesOutDregreesArray(i)._2).toString().toLong <= soglia75 ){ println("ok soglia75 ") mediumHighSAUsers += completeUserNodesOutDregreesArray(i) }else if( (completeUserNodesOutDregreesArray(i)._2).toString().toLong <= soglia100 ){ println("ok soglia100 ") highSAUsers += completeUserNodesOutDregreesArray(i) }
}
// I put each cluster in the final container
clustersContainer += Tuple2("lowSAUsers", lowSAUsers)
clustersContainer += Tuple2("mediumLowSAUsers", mediumLowSAUsers)
clustersContainer += Tuple2("mediumHighSAUsers", mediumHighSAUsers)
clustersContainer += Tuple2("highSAUsers", highSAUsers)
/* clustersContainer.foreach(println) * Now you can see * (lowSAUsers,ListBuffer((590,0))) * (mediumLowSAUsers,ListBuffer((533,1))) * (mediumHighSAUsers,ListBuffer()) * (highSAUsers,ListBuffer((571,2))) */
// ---------------------------------------------------------------------
1 ответ

как насчет создания массива кортежей, представляющих разные ячейки:

val bins = Seq(0, soglia25, soglia50, soglia75, soglia100).sliding(2) .map(seq => (seq(0), seq(1))).toArray

Затем для каждого элемента вашего RDD вы найдете соответствующий бит, сделайте его ключом, преобразуйте id в Seq и уменьшите ключ:

def getBin(bins: Array[(******, ******)], value: Int): Int = { bins.indexWhere {case (a: ******, b: ******) => a < value && b >= value}
}
userNodesOutDegreesRdd.map { case (id, value) => (getBin(bins, value), Seq(id))
}.reduceByKey(_ ++ _)

licensed under cc by-sa 3.0 with attribution.