Skip to content

Commit 5a956ec

Browse files
committed
Use function to infer redis key from event
1 parent 8c717ac commit 5a956ec

File tree

4 files changed

+24
-8
lines changed

4 files changed

+24
-8
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ Options:
4242
- `:conn-spec` - [Carmine](https://github.com/ptaoussanis/carmine) redis connection spec. See Carmine docs for more info
4343
- `:pool-spec` - Carmine connection pool spec, basically apache-commons pool options
4444
- `:encoder` - A function to convert Riemann event map to a string. Defaults to `cheshire/generate-string` (JSON serialization), see the source for more info.
45-
- `:key` - The name of the Redis key to output events to. Defaults to `"riemann-events"`
45+
- `:key` - The Redis key to output events to. Defaults to `"riemann-events"`
46+
- `:key-fn` - A function to compute the Redis key from event; Overrides `:key`
4647

4748
### Flapjack:
4849

src/riemann_redis_output/core.clj

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,15 @@
5151
:conn-spec - Carmine spec, e.g. {:host \"redisserver\" :port 6379 :db 13}, see Carmine for details
5252
:pool-spec - Carmin pool spec (Apache commons pool options). See Carmine for details
5353
:key - Redis key to push events to
54+
:key-fn - A function to compute the Redis key from event; overrides `:key`. Signature (fn [event]) -> String
5455
:encoder - the encoding function to convert event map to a string. Signature: (fn [event]) -> String. Defaults to cheshire/generate-string
5556
"
5657
([] (sink {}))
5758
([opts]
58-
(let [{:keys [conn-spec key encoder pool-spec]} (merge default-opts opts)]
59+
(let [{:keys [conn-spec key key-fn encoder pool-spec]} (merge default-opts opts)
60+
key-fn (or key-fn (constantly key))]
5961
(fn [events]
60-
(write {:pool pool-spec :spec conn-spec} key encoder
61-
(if (sequential? events)
62-
events
63-
[events]))))))
62+
(let [events (if (sequential? events) events [events])
63+
events-by-keys (group-by key-fn events)]
64+
(doseq [[key events] events-by-keys]
65+
(write {:pool pool-spec :spec conn-spec} key encoder events)))))))

test/riemann_redis_output/core_test.clj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
[cheshire.core :refer [parse-string]]
44
[riemann-redis-output.core :refer [flapjack-encoder]]))
55

6-
(defn- rand-str [len]
6+
(defn rand-str [len]
77
(apply str (take len (repeatedly #(char (+ (rand 26) 65))))))
88

99
(deftest flapjack

test/riemann_redis_output/integration_test.clj

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
[clj-test-containers.core :as tc]
88
[riemann-redis-output.core :refer [sink]]
99
[cheshire.core :as json]
10-
[taoensso.carmine :as car]))
10+
[taoensso.carmine :as car]
11+
[riemann-redis-output.core-test :refer [rand-str]]))
1112

1213
(def ^:dynamic redis-port 6379)
1314
(defn- redis-fixture [f]
@@ -79,3 +80,15 @@
7980
(car/lpop "test-byte-array"))
8081
redis-event (json/parse-string redis-event true)]
8182
(is (= "tested" (:state redis-event))))))
83+
84+
(deftest output-with-key-field
85+
(let [key-field :redis-key
86+
event {:time 0 :state "ok" :service "test service" :host "localhost" :description (rand-str 10) :redis-key (rand-str 10)}
87+
redis-out (sink {:conn-spec {:port redis-port} :key-fn key-field})]
88+
(with-server (streams/sdo redis-out)
89+
(send-events [event])
90+
(Thread/sleep 50))
91+
(let [redis-event (car/wcar {:pool :none :spec {:host "localhost" :port redis-port}}
92+
(car/lpop (:redis-key event)))
93+
redis-event (json/parse-string redis-event true)]
94+
(is (= (:description event) (:description redis-event))))))

0 commit comments

Comments
 (0)