You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Jolan Rensen edited this page May 10, 2022
·
2 revisions
A popular Spark extension is Spark Streaming.
Of course the Kotlin Spark API also introduces a more Kotlin-esque approach to write your streaming programs.
There are examples for use with a checkpoint, Kafka and SQL in the examples module.
Example
We shall also provide a quick example below:
// Automatically provides ssc: JavaStreamingContext which starts and awaits termination or timeout
withSparkStreaming(batchDuration =Durations.seconds(1), timeout =10_000) { // this: KSparkStreamingSession
setRunAfterStart {
println("Stream is started")
}
// create input stream for, for instance, Netcat: `$ nc -lk 9999`val lines:JavaReceiverInputDStream<String> = ssc.socketTextStream("localhost", 9999)
// split input stream on spaceval words:JavaDStream<String> = lines.flatMap { it.split("").iterator() }
// perform action on each formed RDD in the stream
words.foreachRDD { rdd:JavaRDD<String>, _:Time->// to convert the JavaRDD to a Dataset, we need a spark session using the RDD context
withSpark(rdd) { // this: KSparkSessionval dataframe:Dataset<TestRow> = rdd.map { TestRow(word = it) }.toDS()
dataframe
.groupByKey { it.word }
.count()
.show()
// +-----+--------+// | key|count(1)|// +-----+--------+// |hello| 1|// | is| 1|// | a| 1|// | this| 1|// | test| 3|// +-----+--------+
}
}
}
Spark session
Note that withSparkStreaming {} does not provide a spark session in the context. This is because it needs to be created from the right SparkConf depending on what you're doing with the data stream.
This is why we provide withSpark(sc: SparkConf) {} inside the KSparkStreamingSession as well as two helper functions for when you already have an instance of ssc: JavaStreamingContext or an RDD.
For instance, if you want to create a dataset inside the KSparkStreamingSession context or you want to broadcast a variable, you can create an instance of a spark session from the ssc: JavaStreamingContext like
A feature of Spark Streaming is checkpointing. This can also be done easily from the Kotlin API like:
withSparkStreaming(batchDuration =..., checkPointPath ="/path/to/checkpoint") {
// contents will only only be run the first time// the second time, the stream transformations will be read from the checkpoint
}
NOTE: If setRunAfterStart {} is used, this is also only executed if the checkpoint is empty.
Key / Value streams
Using the Java API, it's necessary to use mapToPair {} to get a JavaPairDStream and specific key/value functions like reduceByKey {} on a JavaDStream. In Kotlin, however, we have extension functions, which makes it possible to have these types of functions accessible directly on JavaStream<Tuple2<*, *>>, so we can simply do:
val wordCounts:JavaDStream<Tuple2<String, Int>> = words
.map { it X1 }
.reduceByKey { a:Int, b:Int-> a + b }