1010 [taoensso.carmine :as car]
1111 [riemann-redis-output.core-test :refer [rand-str]]))
1212
13- (def ^:dynamic redis-port 6379 )
13+ (def ^:dynamic redis-spec {:port 6379 })
14+
1415(defn- redis-fixture [f]
1516 (let [redis-container (-> (tc/create {:image-name " docker.io/library/redis:6-alpine"
1617 :exposed-ports [6379 ]
1718 :wait-for {:strategy :exposed-ports }})
1819 tc/start!)
1920 port (get-in redis-container [:mapped-ports 6379 ])]
20- (binding [redis-port port]
21+ (binding [redis-spec { : port port} ]
2122 (f ))
2223 (tc/stop! redis-container)))
2324
2425(defmacro with-server [stream & forms]
2526 `(let [server# (tcp-server )
2627 core# (transition! (core ) {:services [server#]
27- :streams [~stream]})]
28+ :streams [~stream]})]
2829 ~@forms
2930 (stop! core#)))
3031
3839
3940(deftest output
4041 (let [event {:time 0 :state " ok" :service " test service" :host " localhost" }
41- redis-out (sink {:conn-spec { :port redis-port} :key " test-event" })]
42+ redis-out (sink {:conn-spec redis-spec :key " test-event" })]
4243 (with-server (streams/with :state " tested"
43- redis-out)
44- (send-events [event])
45- (Thread/sleep 50 ))
46- (let [redis-event (car/wcar {:pool :none :spec {:host " localhost" :port redis-port}}
47- (car/lpop " test-event" ))
48- redis-event (json/parse-string redis-event true )]
44+ redis-out)
45+ (send-events [event]))
46+ (let [redis-event (car/wcar {:pool :none :spec redis-spec}
47+ (car/blpop " test-event" 1 ))
48+ redis-event (json/parse-string (second redis-event) true )]
4949 (is (= " tested" (:state redis-event))))))
5050
5151(deftest output-batch
5252 (let [event {:host " localhost" :service " test" :state " ok" }
53- events (map #(assoc event :metric % :time %) (range 10 ))
54- redis-spec {:port redis-port}
53+ events (map #(assoc event :metric % :time %) (range 10 ))
5554 redis-out (sink {:conn-spec redis-spec :key " test-batch-events" })]
5655 (with-server (streams/with :state " tested"
5756 (streams/batch 4 1 redis-out))
58- (send-events events)
59- (Thread/sleep 50 ))
57+ (send-events events))
6058 ; we don't wait for the batch dt window to close, so only 8 message are expected
6159 (is (>= 8 (car/wcar {:spec redis-spec}
6260 (car/llen " test-batch-events" ))))
6361 (is (= (assoc event :state " tested" )
64- (-> (car/wcar {:spec redis-spec} (car/rpop " test-batch-events" ))
62+ (-> (car/wcar {:spec redis-spec} (car/blpop " test-batch-events" 1 ))
63+ second
6564 (json/parse-string true )
6665 (select-keys (keys event)))))))
6766
6867(deftest output-byte-array
6968 (let [ba-encoder (fn [e] (->> e
70- json/generate-string
71- (map byte)
72- byte-array))
69+ json/generate-string
70+ (map byte)
71+ byte-array))
7372 event {:time 0 :state " ok" :service " test service" :host " localhost" }
74- redis-out (sink {:conn-spec { :port redis-port} :key " test-byte-array" :encoder ba-encoder})]
73+ redis-out (sink {:conn-spec redis-spec :key " test-byte-array" :encoder ba-encoder})]
7574 (with-server (streams/with :state " tested"
7675 redis-out)
77- (send-events [event])
78- (Thread/sleep 50 ))
79- (let [redis-event (car/wcar {:pool :none :spec {:host " localhost" :port redis-port}}
80- (car/lpop " test-byte-array" ))
81- redis-event (json/parse-string redis-event true )]
76+ (send-events [event]))
77+ (let [redis-event (car/wcar {:pool :none :spec redis-spec}
78+ (car/blpop " test-byte-array" 1 ))
79+ redis-event (json/parse-string (second redis-event) true )]
8280 (is (= " tested" (:state redis-event))))))
8381
8482(deftest output-with-key-field
8583 (let [key-field :redis-key
8684 event {:time 0 :state " ok" :service " test service" :host " localhost" :description (rand-str 10 ) :redis-key (rand-str 10 )}
87- redis-out (sink {:conn-spec {:port redis-port} :key-fn key-field})]
85+ redis-out (sink {:conn-spec redis-spec :key-fn key-field})]
86+ (with-server (streams/sdo redis-out)
87+ (send-events [event]))
88+ (let [redis-event (car/wcar {:pool :none :spec redis-spec}
89+ (car/blpop (:redis-key event) 1 ))
90+ redis-event (json/parse-string (second redis-event) true )]
91+ (is (= (:description event) (:description redis-event))))))
92+
93+ (deftest output-with-multiple-keys
94+ (let [events (for [_ (range 10 )]
95+ {:time 0 :state " ok" :service " test service" :host " localhost" :description (rand-str 10 )})
96+ redis-out (sink {:conn-spec redis-spec :key-fn #(str " riemann-" (mod (hash (:description %)) 3 ))})]
8897 (with-server (streams/sdo redis-out)
89- (send-events [event])
90- (Thread/sleep 50 ))
91- (let [redis-event (car/wcar {:pool :none :spec {:host " localhost" :port redis-port}}
92- (car/lpop (:redis-key event)))
93- redis-event (json/parse-string redis-event true )]
94- (is (= (:description event) (:description redis-event))))))
98+ (send-events events))
99+ (let [redis {:pool :none :spec redis-spec}
100+ lists (for [x (range 3 )] (str " riemann-" x))
101+ list-lens (car/wcar redis (doseq [l lists] (car/llen l)))
102+ redis-events (car/wcar {:pool :none :spec redis-spec}
103+ (doseq [[l len] (zipmap lists list-lens)]
104+ (dotimes [_ len] (car/lpop l))))
105+ redis-events (map #(json/parse-string % true ) redis-events)]
106+ (is (> 1 (count (some #(> 0 %) list-lens))))
107+ (is (= 10 (count redis-events))))))
0 commit comments