Чтение из Кассандры с использованием Spark Streaming

У меня проблема, когда я использую искрообразование для чтения из Кассандры.

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md#reading-from-cassandra-from-the-streamingcontext

Как ссылка выше, я использую

val rdd = ssc.cassandraTable("streaming_test", "key_value").select("key", "value").where("fu = ?", 3)

чтобы выбрать данные из cassandra, но кажется, что поток искры имеет только один запрос один раз, но я хочу, чтобы он продолжал запрашивать с использованием интервала 10 senconds.

Мой код следующий, пожелайте ответа.

Спасибо!

import org.apache.spark._
import org.apache.spark.streaming._
import com.datastax.spark.connector.streaming._
import org.apache.spark.rdd._
import scala.collection.mutable.Queue
object SimpleApp {
def main(args: Array[String]){
 val conf = new SparkConf().setAppName("scala_streaming_test").set("spark.cassandra.connection.host", "127.0.0.1")
 val ssc = new StreamingContext(conf, Seconds(10))
 val rdd = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu")
 //rdd.collect().foreach(println)
 val rddQueue = new Queue[RDD[com.datastax.spark.connector.CassandraRow]]()
 val dstream = ssc.queueStream(rddQueue)
 dstream.print()
 ssc.start()
 rdd.collect().foreach(println)
 rddQueue += rdd
 ssc.awaitTermination()
}

}

2 ответа

Вы можете создать ConstantInputDStream с входом CassandraRDD. ConstantInputDStream предоставит один и тот же RDD для каждого интервала потоковой передачи, и, выполнив действие над этим RDD, вы инициируете материализацию линии RDD, что приведет к выполнению запроса на Cassandra каждый раз.

Убедитесь, что запрашиваемые данные не растут без ограничений, чтобы избежать увеличения времени запроса и в результате возникает нестабильный процесс потоковой передачи.

Что-то вроде этого должно сделать трюк (используя ваш код в качестве отправной точки):

import org.apache.spark.streaming.dstream.ConstantInputDStream
val ssc = new StreamingContext(conf, Seconds(10))
val cassandraRDD = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu")
val dstream = new ConstantInputDStream(ssc, cassandraRDD)
dstream.foreachRDD{ rdd => 
 // any action will trigger the underlying cassandra query, using collect to have a simple output
 println(rdd.collect.mkString("\n")) 
}
ssc.start()
ssc.awaitTermination()


У меня была такая же проблема, и я нашел решение, создав подкласс класса InputDStream. Необходимо определить методы start() и compute().

start() может использоваться для приготовления. Основная логика находится в compute(). Он должен вернуть Option[RDD[T]]. Чтобы сделать класс гибким, определяется признак InputStreamQuery.

trait InputStreamQuery[T] {
 // where clause condition for partition key
 def partitionCond : (String, Any)
 // function to return next partition key
 def nextValue(v:Any) : Option[Any]
 // where clause condition for clustering key
 def whereCond : (String, (T) => Any)
 // batch size
 def batchSize : Int
}

Для таблицы Cassandra keyspace.test создайте test_by_date, которая реорганизует таблицу ключом раздела date.

CREATE TABLE IF NOT exists keyspace.test
(id timeuuid, date text, value text, primary key (id))
CREATE MATERIALIZED VIEW IF NOT exists keyspace.test_by_date AS
SELECT *
FROM keyspace.test
WHERE id IS NOT NULL 
PRIMARY KEY (date, id)
WITH CLUSTERING ORDER BY ( id ASC );

Одна возможная реализация для таблицы test должна быть

class class Test(id:UUID, date:String, value:String)
trait InputStreamQueryTest extends InputStreamQuery[Test] {
 val dateFormat = "uuuu-MM-dd"
 // set batch size as 10 records
 override def batchSize: Int = 10
 // partitioning key conditions, query string and initial value
 override def partitionCond: (String, Any) = ("date = ?", "2017-10-01")
 // clustering key condition, query string and function to get clustering key from the instance
 override def whereCond: (String, Test => Any) = (" id > ?", m => m.id)
 // return next value of clustering key. ex) '2017-10-02' for input value '2017-10-01'
 override def nextValue(v: Any): Option[Any] = {
 import java.time.format.DateTimeFormatter
 val formatter = DateTimeFormatter.ofPattern( dateFormat)
 val nextDate = LocalDate.parse(v.asInstanceOf[String], formatter).plusDays(1)
 if ( nextDate.isAfter( LocalDate.now()) ) None
 else Some( nextDate.format(formatter))
 }
}

Его можно использовать в классе CassandraInputStream следующим образом.

class CassandraInputStream[T: ClassTag]
(_ssc: StreamingContext, keyspace:String, table:String)
(implicit rrf: RowReaderFactory[T], ev: ValidRDDType[T]) 
extends InputDStream[T](_ssc) with InputStreamQuery[T] {
var lastElm:Option[T] = None
var partitionKey : Any = _
override def start(): Unit = {
 // find a partition key which stores some records
 def findStartValue(cql : String, value:Any): Any = {
 val rdd = _ssc.sparkContext.cassandraTable[T](keyspace, table).where(cql, value).limit(1)
 if (rdd.cassandraCount() > 0 ) value
 else {
 nextValue(value).map( findStartValue( cql, _)).getOrElse( value)
 }
 }
 // get query string and initial value from partitionCond method
 val (cql, value) = partitionCond
 partitionKey = findStartValue(cql, value)
}
override def stop(): Unit = {}
override def compute(validTime: Time): Option[RDD[T]] = {
 val (cql, _) = partitionCond
 val (wh, whKey) = whereCond
 def fetchNext( patKey: Any) : Option[CassandraTableScanRDD[T]] = {
 // query with partitioning condition
 val query = _ssc.sparkContext.cassandraTable[T](keyspace, table).where( cql, patKey)
 val rdd = lastElm.map{ x =>
 query.where( wh, whKey(x)).withAscOrder.limit(batchSize)
 }.getOrElse( query.withAscOrder.limit(batchSize))
 if ( rdd.cassandraCount() > 0 ) {
 // store the last element of this RDD
 lastElm = Some(rdd.collect.last)
 Some(rdd)
 }
 else {
 // find the next partition key which stores data
 nextValue(patKey).flatMap{ k =>
 partitionKey = k
 fetchNext(k)}
 }
 }
 fetchNext( partitionKey)
}
}

Объединяя все классы,

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(10))
val dstream = new CassandraInputStream[Test](ssc, "keyspace", "test_by_date") with InputStreamQueryTest
dstream.map(println).saveToCassandra( ... )
ssc.start()
ssc.awaitTermination()

licensed under cc by-sa 3.0 with attribution.