Возьмите верхнюю N после groupBy и обработайте их как RDD

Я хочу получить верхние N элементы после groupByKey RDD и преобразовать тип topNPerGroup (ниже) в RDD[(String, Int)], где List[Int] значения flatten

data -

val data = sc.parallelize(Seq("foo"->3, "foo"->1, "foo"->2,
 "bar"->6, "bar"->5, "bar"->4))

Верхние N элементы для каждой группы вычисляются как:

val topNPerGroup: RDD[(String, List[Int]) = data.groupByKey.map { 
 case (key, numbers) => 
 key -> numbers.toList.sortBy(-_).take(2)
}

Результат

(bar,List(6, 5))
(foo,List(3, 2))

который был напечатан

topNPerGroup.collect.foreach(println)

Если я достиг, topNPerGroup.collect.foreach(println) будет генерировать (ожидаемый результат!)

(bar, 6)
(bar, 5)
(foo, 3)
(foo, 2)
3 ответа

Ваш вопрос немного запутан, но я думаю, что он делает то, что вы ищете:

val flattenedTopNPerGroup = 
 topNPerGroup.flatMap({case (key, numbers) => numbers.map(key -> _)})

а в repl он выдает то, что вы хотите:

flattenedTopNPerGroup.collect.foreach(println)
(foo,3)
(foo,2)
(bar,6)
(bar,5)


Я недавно сталкивался с этой проблемой, но моя потребность была немного иной в том, что мне нужны верхние значения K для каждого ключа с набором данных, подобным (key: Int, (domain: String, count: Long)). Хотя ваш набор данных проще, по-прежнему существует проблема масштабирования/производительности с помощью groupByKey, как указано в документации.

При вызове набора данных (K, V) пары возвращает набор данных (K, Iterable). Примечание. Если вы группируете, чтобы выполнить агрегирование (например, сумма или среднее) по каждому ключу, используя reduceByKey или combByKey даст гораздо лучшую производительность.

В моем случае я столкнулся с проблемами очень быстро, потому что мой Iterable в (K, Iterable<v>)</v> был очень большим, > 1 миллион, поэтому сортировка и захват верхней N стали очень дорогими и создавали потенциальные проблемы с памятью.

После некоторого копания, см. ссылки ниже, вот полный пример использования combByKey для выполнения той же задачи таким образом, который будет выполняться и масштабироваться.

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object TopNForKey {
 var SampleDataset = List(
 (1, ("apple.com", 3L)),
 (1, ("google.com", 4L)),
 (1, ("stackoverflow.com", 10L)),
 (1, ("reddit.com", 15L)),
 (2, ("slashdot.org", 11L)),
 (2, ("samsung.com", 1L)),
 (2, ("apple.com", 9L)),
 (3, ("microsoft.com", 5L)),
 (3, ("yahoo.com", 3L)),
 (3, ("google.com", 4L)))
 //sort and trim a traversable (String, Long) tuple by _2 value of the tuple
 def topNs(xs: TraversableOnce[(String, Long)], n: Int) = {
 var ss = List[(String, Long)]()
 var min = Long.MaxValue
 var len = 0
 xs foreach { e =>
 if (len < n || e._2 > min) {
 ss = (e :: ss).sortBy((f) => f._2)
 min = ss.head._2
 len += 1
 }
 if (len > n) {
 ss = ss.tail
 min = ss.head._2
 len -= 1
 }
 }
 ss
 }
 def main(args: Array[String]): Unit = {
 val topN = 2
 val sc = new SparkContext("local", "TopN For Key")
 val rdd = sc.parallelize(SampleDataset).map((t) => (t._1, t._2))
 //use combineByKey to allow spark to partition the sorting and "trimming" across the cluster
 val topNForKey = rdd.combineByKey(
 //seed a list for each key to hold your top N with your first record
 (v) => List[(String, Long)](v),
 //add the incoming value to the accumulating top N list for the key
 (acc: List[(String, Long)], v) => topNs(acc ++ List((v._1, v._2)), topN).toList,
 //merge top N lists returned from each partition into a new combined top N list
 (acc: List[(String, Long)], acc2: List[(String, Long)]) => topNs(acc ++ acc2, topN).toList)
 //print results sorting for pretty
 topNForKey.sortByKey(true).foreach((t) => {
 println(s"key: ${t._1}")
 t._2.foreach((v) => {
 println(s"----- $v")
 })
 })
 }
}

И что я получаю в возвращающемся rdd...

(1, List(("google.com", 4L),
 ("stackoverflow.com", 10L))
(2, List(("apple.com", 9L),
 ("slashdot.org", 15L))
(3, List(("google.com", 4L),
 ("microsoft.com", 5L))

Ссылки

https://www.mail-archive.com/[removed_email]che.org/msg16827.html

qaru.site/questions/1566064/...

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions


Spark 1.4.0 решает вопрос.

Взгляните на https://github.com/apache/spark/commit/5***************************************

Здесь используется BoundedPriorityQueue с aggregateByKey

def topByKey(num: Int)(implicit ord: Ordering[V]): RDD[(K, Array[V])] = {
 self.aggregateByKey(new BoundedPriorityQueue[V](num)(ord))(
 seqOp = (queue, item) => {
 queue += item
 },
 combOp = (queue1, queue2) => {
 queue1 ++= queue2
 }
 ).mapValues(_.toArray.sorted(ord.reverse)) // This is an min-heap, so we reverse the order.
}

licensed under cc by-sa 3.0 with attribution.