Skip to content

Commit

Permalink
Streaming context methods (#222)
Browse files Browse the repository at this point in the history
* Added untested methods

* Added tests for StreamingContext methods and fields

* Back to 100% coverage
  • Loading branch information
anthony-khong committed Sep 29, 2020
1 parent 21b51fb commit cc4bc21
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 32 deletions.
2 changes: 1 addition & 1 deletion src/clojure/zero_one/geni/main.clj
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
(-> dataframe g/print-schema)

(require '[midje.repl :refer [autotest]])
(autotest :filter :streaming)
(autotest :filter (every-pred :streaming (complement :slow)))
;(autotest :filter (every-pred (complement :slow) (complement :repl)))

(require '[clojure.pprint])
Expand Down
65 changes: 49 additions & 16 deletions src/clojure/zero_one/geni/streaming.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
[zero-one.geni.spark-context :as spark-context])
(:import
(org.apache.spark.streaming.api.java JavaDStream JavaStreamingContext)
(org.apache.spark.streaming Milliseconds
(org.apache.spark.streaming Duration
Milliseconds
Minutes
Seconds
Time)
(org.apache.spark.sql SparkSession)))
Time)))

;; TODO: count-by-value-and-window,
;; TODO: foreachRDD, reduce-by-window
Expand All @@ -28,37 +28,61 @@

(defn seconds [t] (Seconds/apply t))

(defmulti streaming-context (fn [head & _] (class head)))
(defmethod streaming-context SparkSession [spark duration]
(JavaStreamingContext. (spark-context/java-spark-context spark) duration))
(defn ->duration [value]
(if (instance? Duration value)
value
(milliseconds value)))

(defn socket-text-stream [context hostname port storage]
(.socketTextStream context hostname port storage))
;; StreamingContext
(defn streaming-context [spark duration]
(JavaStreamingContext.
(spark-context/java-spark-context spark)
(->duration duration)))

(defn text-file-stream [context path]
(.textFileStream context path))
(defn await-termination! [context]
(future (.awaitTermination context)))

(defn await-termination-or-timeout! [context timeout]
(future (.awaitTerminationOrTimeout context timeout)))

; TODO: how to test without killing the SparkContext?
; (def close (memfn close))

(defn remember [context duration]
(.remember context (->duration duration)))

(defmulti save-as-text-files! (fn [head & _] (class head)))
(defmethod save-as-text-files! JavaDStream [dstream path]
(save-as-text-files! (.dstream dstream) path))
(defmethod save-as-text-files! :default [dstream path]
(.saveAsTextFiles dstream path ""))

(defn socket-text-stream [context hostname port storage]
(.socketTextStream context hostname port storage))

(defn spark-context [context]
(.sparkContext context))

(defn ssc [context]
(.ssc context))

(defn start! [context]
(future (.start context)))

(defn await-termination! [context]
(future (.awaitTermination context)))
(defn state [context]
(.getState context))
(def get-state state)

(defn stop! [context]
(future (.stop context false true)))

(defn text-file-stream [context path]
(.textFileStream context path))

;; DStream
(defn cache [dstream]
(.cache dstream))

(defn checkpoint [dstream interval]
(.checkpoint dstream interval))

(defn context [dstream]
(.context dstream))

Expand Down Expand Up @@ -128,7 +152,7 @@
(defn wrap-rdd [dstream rdd]
(.wrapRDD dstream (.rdd rdd)))

;; Pair DStream
;; PairDStream
(defn ->java-dstream [dstream]
(.toJavaDStream dstream))

Expand All @@ -138,6 +162,8 @@
([dstream f partitions-or-partitioner]
(.reduceByKey dstream (function/function2 f) partitions-or-partitioner)))

;; StorageLevel

(import-vars
[zero-one.geni.storage
disk-only
Expand All @@ -152,3 +178,10 @@
memory-only-ser-2
none
off-heap])

;; Polymorphic
(defmulti checkpoint (fn [head & _] (class head)))
(defmethod checkpoint JavaStreamingContext [context directory]
(.checkpoint context directory))
(defmethod checkpoint :default [dstream interval]
(.checkpoint dstream interval))
47 changes: 32 additions & 15 deletions test/zero_one/geni/streaming_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
[zero-one.geni.rdd :as rdd]
[zero-one.geni.streaming :as streaming])
(:import
(org.apache.spark.api.java JavaSparkContext)
(org.apache.spark.streaming Duration StreamingContext Time)
(org.apache.spark.streaming.api.java JavaDStream JavaStreamingContext)
(org.apache.spark.streaming.dstream DStream)))
Expand Down Expand Up @@ -47,7 +48,7 @@
(spit read-file (str (:content opts "Hello World!")))
((:action-fn opts identity) dstream)
(Thread/sleep (:sleep-ms opts 50))
(streaming/await-termination! context)
((:terminate-fn opts streaming/await-termination!) context)
@(streaming/stop! context)
(let [result (written-content write-file)
n-retries (:n-retries opts 0)
Expand All @@ -65,7 +66,7 @@
(def dummy-text
(slurp "test/resources/rdd.txt"))

(facts "On DStream methods" :streaming
(facts "On DStream methods" :streaming :slow
(stream-results
{:content dummy-text
:fn #(-> %
Expand Down Expand Up @@ -138,16 +139,23 @@
:action-fn #(let [now (System/currentTimeMillis)]
(assert (nil? (streaming/compute % (+ 100 now)))))})
=> string?
(stream-results
{:content dummy-text
:action-fn #(let [now (System/currentTimeMillis)]
(assert (nil? (streaming/compute %
(streaming/->time (+ 100 now))))))})
=> string?
(stream-results
{:content dummy-text
:fn #(streaming/flat-map % aot/split-spaces)
:finish-fn #(count (string/split % #"\n"))
:expected 522})
=> 522)

(facts "On DStream testing" :streaming
(facts "On DStream testing" :streaming :slow
(stream-results
{:content (range 10)
:terminate-fn #(streaming/await-termination-or-timeout! % 100000)
:fn streaming/cache})
=> (str (range 10) "\n")
(stream-results
Expand Down Expand Up @@ -198,15 +206,24 @@
(streaming/->time 123) => (Time. 123)))

(facts "On StreamingContext" :streaming
(let [context (streaming/streaming-context @defaults/spark (streaming/seconds 1))]
(fact "streaming context instantiatable"
context => (partial instance? JavaStreamingContext))
(fact "retrieving context from a dstream"
(let [dstream (streaming/socket-text-stream context
"localhost"
9999
streaming/memory-only)]
dstream => (partial instance? JavaDStream)
(streaming/dstream dstream) => (partial instance? DStream)
(streaming/context dstream) => (partial instance? StreamingContext)
(rdd/collect (streaming/wrap-rdd dstream (rdd/parallelise [1 2 3]))) => [1 2 3]))))
(let [context (streaming/streaming-context @defaults/spark 1000)]
(fact "streaming context instantiatable"
context => (partial instance? JavaStreamingContext))
(fact "expected basic fields and methods"
(streaming/spark-context context) => (partial instance? JavaSparkContext)
(streaming/ssc context) => (partial instance? StreamingContext)
(.toString (streaming/state context)) => "INITIALIZED"
(streaming/checkpoint context "target/checkpoint/") => nil?
(streaming/remember context 1000) => nil?)
(fact "retrieving context from a dstream"
(let [dstream (streaming/socket-text-stream context
"localhost"
9999
streaming/memory-only)]
dstream => (partial instance? JavaDStream)
(streaming/dstream dstream) => (partial instance? DStream)
(streaming/context dstream) => (partial instance? StreamingContext)
(->> [1 2 3]
rdd/parallelise
(streaming/wrap-rdd dstream)
rdd/collect) => [1 2 3]))))

0 comments on commit cc4bc21

Please sign in to comment.