Skip to content

Commit 8390d24

Browse files
committed
Filter out Properties passed into StreamsRegistry
1 parent 8e5e756 commit 8390d24

File tree

2 files changed

+51
-2
lines changed

2 files changed

+51
-2
lines changed

src/java/io/operatr/kpow/StreamsRegistry.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import org.apache.kafka.streams.KafkaStreams;
88
import org.apache.kafka.streams.Topology;
99

10+
import java.util.ArrayList;
1011
import java.util.Properties;
1112

1213
public class StreamsRegistry implements AutoCloseable {
@@ -25,6 +26,40 @@ public String getId() {
2526

2627
private final Object agent;
2728

29+
public static Properties filterProperties(Properties props) {
30+
ArrayList<String> allowedKeys = new ArrayList<>();
31+
allowedKeys.add("security.protocol");
32+
allowedKeys.add("sasl.mechanism");
33+
allowedKeys.add("sasl.jaas.config");
34+
allowedKeys.add("sasl.login.callback.handler.class");
35+
allowedKeys.add("ssl.keystore.location");
36+
allowedKeys.add("ssl.keystore.password");
37+
allowedKeys.add("ssl.key.password");
38+
allowedKeys.add("ssl.keystore.type");
39+
allowedKeys.add("ssl.keymanager.algorithm");
40+
allowedKeys.add("ssl.truststore.location");
41+
allowedKeys.add("ssl.truststore.password");
42+
allowedKeys.add("ssl.truststore.type");
43+
allowedKeys.add("ssl.trustmanager.algorithm");
44+
allowedKeys.add("ssl.endpoint.identification.algorithm");
45+
allowedKeys.add("ssl.provider");
46+
allowedKeys.add("ssl.cipher.suites");
47+
allowedKeys.add("ssl.protocol");
48+
allowedKeys.add("ssl.enabled.protocols");
49+
allowedKeys.add("ssl.secure.random.implementation");
50+
allowedKeys.add("ssl.keystore.key");
51+
allowedKeys.add("ssl.keystore.certificate.chain");
52+
allowedKeys.add("ssl.truststore.certificates");
53+
allowedKeys.add("bootstrap.servers");
54+
Properties nextProps = new Properties();
55+
for (String key : allowedKeys) {
56+
if (props.containsKey(key)) {
57+
nextProps.setProperty(key, String.valueOf(props.get(key)));
58+
}
59+
}
60+
return nextProps;
61+
}
62+
2863
public StreamsRegistry(Properties props) {
2964
IFn require = Clojure.var("clojure.core", "require");
3065
require.invoke(Clojure.read("io.operatr.kpow.agent"));
@@ -33,7 +68,8 @@ public StreamsRegistry(Properties props) {
3368
IFn serdesFn = Clojure.var("io.operatr.kpow.serdes", "transit-json-serializer");
3469
Serializer keySerializer = (Serializer) serdesFn.invoke();
3570
Serializer valSerializer = (Serializer) serdesFn.invoke();
36-
KafkaProducer producer = new KafkaProducer<>(props, keySerializer, valSerializer);
71+
Properties producerProps = filterProperties(props);
72+
KafkaProducer producer = new KafkaProducer<>(producerProps, keySerializer, valSerializer);
3773
agent = agentFn.invoke(producer);
3874
}
3975

test/io/operatr/agent_test.clj

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,20 @@
44
[io.operatr.kpow.agent :as agent])
55
(:import (org.apache.kafka.streams Topology StreamsBuilder KafkaStreams$State)
66
(org.apache.kafka.common MetricName Metric)
7-
(org.apache.kafka.clients.producer Producer)))
7+
(org.apache.kafka.clients.producer Producer)
8+
(io.operatr.kpow StreamsRegistry)
9+
(java.util Properties)))
10+
11+
(defn ^Properties ->props [m]
12+
(let [props (Properties.)]
13+
(doseq [[k v] m]
14+
(.put props k v))
15+
props))
16+
17+
(deftest filter-props
18+
(is (empty? (StreamsRegistry/filterProperties (->props {"fo" "bar"}))))
19+
(is (= {"bootstrap.servers" "xyz"}
20+
(into {} (StreamsRegistry/filterProperties (->props {"fo" "bar" "bootstrap.servers" "xyz"}))))))
821

922
(defn ^Topology test-topology
1023
[]

0 commit comments

Comments
 (0)