Повторное использование кода между искровым потоком и периодическим режимом для отдельных элементов

Я новичок в Spark, и я хочу реализовать лямбда-архитектуру, используя искровой поток и искровую партию.

В Интернете я нашел следующую статью:

http://blog.cloudera.com/blog/2014/08/building-lambda-architecture-with-spark-streaming/

Это хорошо для некоторых из моих анализов, но я не думаю, что это решение возможно в том случае, если необходимо найти отдельные элементы.

Если вы хотите найти отдельные элементы на JavaRDD, вы можете использовать этот метод. DStreams - это наборы RDD, поэтому, если вы примените

transform((rdd) -> rdd.distinct())

в Dstream вы будете выполнять отдельные на каждом rdd потока, чтобы вы могли найти отдельные элементы в каждом RDD, а не на всем DStream.

Может быть написано так, как будто это немного запутывает, поэтому позвольте мне пояснить пример:

У меня есть следующие элементы:

Apple
Pear
Banana
Peach
Apple
Pear

В пакетном приложении:

JavaRDD<string> elemsRDD=sc.textFile(exFilePath).distinct()
</string>

Ребенка RDD будет содержать:

Apple
Pear
Banana
Peach

Если я правильно понял, это должно быть поведение для потока:

предположим, что у нас есть время пакета 1s и окно 2s:

Первый RDD:

Apple
Pear
Banana

Второе RDD:

Peach
Apple
Pear
JavaDStream<string> elemsStream=(getting from whathever source)
childStream = elemsStream.transform((rdd) -> rdd.distinct())
childStream.forEachRDD...
</string>

закончится с 2 Rdds: Во-первых:

Apple
Pear
Banana

Во-вторых:

Peach
Apple
Pear

Это отличное отношение к RDD, но не относится к DStream.

Мое решение для потоковой части было следующим:

JavaDStream<hashset<string>> distinctElems = elemsStream.map( (elem) -> { HashSet<string> htSet = new HashSet<string>(); htSet.add(elem); return htSet; }).reduce((sp1, sp2) -> { sp1.addAll(sp2); return sp1; });
</string></string></hashset<string>

Таким образом, результат:

Apple
Pear
Banana
Peach

в качестве пакетного режима. Однако это решение потребует непроизводительных затрат на обслуживание и имеет риск ошибок, возникающих в результате дублирования кодовых баз.

Есть ли лучший способ достичь одного и того же результата, используя как можно больше код для пакетного режима?

Спасибо заранее.

1 ответ

Ваше решение элегантно.

У меня есть другое решение, оно менее элегантно, чем ваше, но я не знаю, эффективнее ли оно. Это мое решение, основанное на mapToPairFunction

JavaPairDStream<string, integer=""> distinctElems = elemsStream .mapToPair(event -> new Tuple2<string, integer="">(event,1));
distinctElems = distinctElems.reduceByKey((t1, t2) -> t1);
</string,></string,>

Я думаю, что это более эффективно, но я не могу его протестировать.

licensed under cc by-sa 3.0 with attribution.