From cc4bc21ac7cef1dbb556396dfdc1cfcd6b573b45 Mon Sep 17 00:00:00 2001 From: Anthony Khong Date: Tue, 29 Sep 2020 09:17:21 +0700 Subject: [PATCH] Streaming context methods (#222) * Added untested methods * Added tests for StreamingContext methods and fields * Back to 100% coverage --- src/clojure/zero_one/geni/main.clj | 2 +- src/clojure/zero_one/geni/streaming.clj | 65 +++++++++++++++++++------ test/zero_one/geni/streaming_test.clj | 47 ++++++++++++------ 3 files changed, 82 insertions(+), 32 deletions(-) diff --git a/src/clojure/zero_one/geni/main.clj b/src/clojure/zero_one/geni/main.clj index 523ca6b7..098ae228 100644 --- a/src/clojure/zero_one/geni/main.clj +++ b/src/clojure/zero_one/geni/main.clj @@ -65,7 +65,7 @@ (-> dataframe g/print-schema) (require '[midje.repl :refer [autotest]]) - (autotest :filter :streaming) + (autotest :filter (every-pred :streaming (complement :slow))) ;(autotest :filter (every-pred (complement :slow) (complement :repl))) (require '[clojure.pprint]) diff --git a/src/clojure/zero_one/geni/streaming.clj b/src/clojure/zero_one/geni/streaming.clj index 23aaab43..2d1bc0d5 100644 --- a/src/clojure/zero_one/geni/streaming.clj +++ b/src/clojure/zero_one/geni/streaming.clj @@ -11,11 +11,11 @@ [zero-one.geni.spark-context :as spark-context]) (:import (org.apache.spark.streaming.api.java JavaDStream JavaStreamingContext) - (org.apache.spark.streaming Milliseconds + (org.apache.spark.streaming Duration + Milliseconds Minutes Seconds - Time) - (org.apache.spark.sql SparkSession))) + Time))) ;; TODO: count-by-value-and-window, ;; TODO: foreachRDD, reduce-by-window @@ -28,15 +28,28 @@ (defn seconds [t] (Seconds/apply t)) -(defmulti streaming-context (fn [head & _] (class head))) -(defmethod streaming-context SparkSession [spark duration] - (JavaStreamingContext. (spark-context/java-spark-context spark) duration)) +(defn ->duration [value] + (if (instance? Duration value) + value + (milliseconds value))) -(defn socket-text-stream [context hostname port storage] - (.socketTextStream context hostname port storage)) +;; StreamingContext +(defn streaming-context [spark duration] + (JavaStreamingContext. + (spark-context/java-spark-context spark) + (->duration duration))) -(defn text-file-stream [context path] - (.textFileStream context path)) +(defn await-termination! [context] + (future (.awaitTermination context))) + +(defn await-termination-or-timeout! [context timeout] + (future (.awaitTerminationOrTimeout context timeout))) + +; TODO: how to test without killing the SparkContext? +; (def close (memfn close)) + +(defn remember [context duration] + (.remember context (->duration duration))) (defmulti save-as-text-files! (fn [head & _] (class head))) (defmethod save-as-text-files! JavaDStream [dstream path] @@ -44,21 +57,32 @@ (defmethod save-as-text-files! :default [dstream path] (.saveAsTextFiles dstream path "")) +(defn socket-text-stream [context hostname port storage] + (.socketTextStream context hostname port storage)) + +(defn spark-context [context] + (.sparkContext context)) + +(defn ssc [context] + (.ssc context)) + (defn start! [context] (future (.start context))) -(defn await-termination! [context] - (future (.awaitTermination context))) +(defn state [context] + (.getState context)) +(def get-state state) (defn stop! [context] (future (.stop context false true))) +(defn text-file-stream [context path] + (.textFileStream context path)) + +;; DStream (defn cache [dstream] (.cache dstream)) -(defn checkpoint [dstream interval] - (.checkpoint dstream interval)) - (defn context [dstream] (.context dstream)) @@ -128,7 +152,7 @@ (defn wrap-rdd [dstream rdd] (.wrapRDD dstream (.rdd rdd))) -;; Pair DStream +;; PairDStream (defn ->java-dstream [dstream] (.toJavaDStream dstream)) @@ -138,6 +162,8 @@ ([dstream f partitions-or-partitioner] (.reduceByKey dstream (function/function2 f) partitions-or-partitioner))) +;; StorageLevel + (import-vars [zero-one.geni.storage disk-only @@ -152,3 +178,10 @@ memory-only-ser-2 none off-heap]) + +;; Polymorphic +(defmulti checkpoint (fn [head & _] (class head))) +(defmethod checkpoint JavaStreamingContext [context directory] + (.checkpoint context directory)) +(defmethod checkpoint :default [dstream interval] + (.checkpoint dstream interval)) diff --git a/test/zero_one/geni/streaming_test.clj b/test/zero_one/geni/streaming_test.clj index 589a13e1..0b1bf3f5 100644 --- a/test/zero_one/geni/streaming_test.clj +++ b/test/zero_one/geni/streaming_test.clj @@ -9,6 +9,7 @@ [zero-one.geni.rdd :as rdd] [zero-one.geni.streaming :as streaming]) (:import + (org.apache.spark.api.java JavaSparkContext) (org.apache.spark.streaming Duration StreamingContext Time) (org.apache.spark.streaming.api.java JavaDStream JavaStreamingContext) (org.apache.spark.streaming.dstream DStream))) @@ -47,7 +48,7 @@ (spit read-file (str (:content opts "Hello World!"))) ((:action-fn opts identity) dstream) (Thread/sleep (:sleep-ms opts 50)) - (streaming/await-termination! context) + ((:terminate-fn opts streaming/await-termination!) context) @(streaming/stop! context) (let [result (written-content write-file) n-retries (:n-retries opts 0) @@ -65,7 +66,7 @@ (def dummy-text (slurp "test/resources/rdd.txt")) -(facts "On DStream methods" :streaming +(facts "On DStream methods" :streaming :slow (stream-results {:content dummy-text :fn #(-> % @@ -138,6 +139,12 @@ :action-fn #(let [now (System/currentTimeMillis)] (assert (nil? (streaming/compute % (+ 100 now)))))}) => string? + (stream-results + {:content dummy-text + :action-fn #(let [now (System/currentTimeMillis)] + (assert (nil? (streaming/compute % + (streaming/->time (+ 100 now))))))}) + => string? (stream-results {:content dummy-text :fn #(streaming/flat-map % aot/split-spaces) @@ -145,9 +152,10 @@ :expected 522}) => 522) -(facts "On DStream testing" :streaming +(facts "On DStream testing" :streaming :slow (stream-results {:content (range 10) + :terminate-fn #(streaming/await-termination-or-timeout! % 100000) :fn streaming/cache}) => (str (range 10) "\n") (stream-results @@ -198,15 +206,24 @@ (streaming/->time 123) => (Time. 123))) (facts "On StreamingContext" :streaming - (let [context (streaming/streaming-context @defaults/spark (streaming/seconds 1))] - (fact "streaming context instantiatable" - context => (partial instance? JavaStreamingContext)) - (fact "retrieving context from a dstream" - (let [dstream (streaming/socket-text-stream context - "localhost" - 9999 - streaming/memory-only)] - dstream => (partial instance? JavaDStream) - (streaming/dstream dstream) => (partial instance? DStream) - (streaming/context dstream) => (partial instance? StreamingContext) - (rdd/collect (streaming/wrap-rdd dstream (rdd/parallelise [1 2 3]))) => [1 2 3])))) + (let [context (streaming/streaming-context @defaults/spark 1000)] + (fact "streaming context instantiatable" + context => (partial instance? JavaStreamingContext)) + (fact "expected basic fields and methods" + (streaming/spark-context context) => (partial instance? JavaSparkContext) + (streaming/ssc context) => (partial instance? StreamingContext) + (.toString (streaming/state context)) => "INITIALIZED" + (streaming/checkpoint context "target/checkpoint/") => nil? + (streaming/remember context 1000) => nil?) + (fact "retrieving context from a dstream" + (let [dstream (streaming/socket-text-stream context + "localhost" + 9999 + streaming/memory-only)] + dstream => (partial instance? JavaDStream) + (streaming/dstream dstream) => (partial instance? DStream) + (streaming/context dstream) => (partial instance? StreamingContext) + (->> [1 2 3] + rdd/parallelise + (streaming/wrap-rdd dstream) + rdd/collect) => [1 2 3]))))