Skip to content
39 changes: 32 additions & 7 deletions async-nats/src/jetstream/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1026,21 +1026,46 @@ impl Store {
/// # }
/// ```
pub async fn keys(&self) -> Result<Keys, HistoryError> {
let subject = format!("{}>", self.prefix.as_str());
self.keys_with_filters(vec![">"]).await
}

let consumer = self
.stream
.create_consumer(super::consumer::push::OrderedConfig {
pub async fn keys_with_filters(
&self,
filters: impl IntoIterator<Item = &str>,
) -> Result<Keys, HistoryError> {
let mut config: super::consumer::push::OrderedConfig =
super::consumer::push::OrderedConfig {
deliver_subject: self.stream.context.client.new_inbox(),
description: Some("kv history consumer".to_string()),
filter_subject: subject,
headers_only: true,
replay_policy: super::consumer::ReplayPolicy::Instant,
// We only need to know the latest state for each key, not the whole history
deliver_policy: DeliverPolicy::LastPerSubject,
..Default::default()
})
.await?;
};

let mut filters = filters.into_iter().map(|f| format!("{}{}", self.prefix, f));

match (filters.next(), filters.next()) {
(Some(first), None) => {
config.filter_subject = first;
}
(Some(first), Some(_second)) => {
#[cfg(feature = "server_2_10")]
{
config.filter_subjects = vec![first, _second];
config.filter_subjects.extend(filters);
}
#[cfg(not(feature = "server_2_10"))]
{
config.filter_subject = first;
// maybe a warning
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the way to go.

As said before - copy-paste is the way to go, for now.

}
}
_ => {}
}

let consumer = self.stream.create_consumer(config).await?;

let entries = History {
done: consumer.info.num_pending == 0,
Expand Down
67 changes: 67 additions & 0 deletions async-nats/tests/kv_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,73 @@ mod kv {
keys.sort();
assert_eq!(vec!["bar", "foo"], keys);

#[cfg(feature = "server_2_10")]
{
let mut keys_with_filter = kv
.keys_with_filters(vec!["bar"])
.await
.unwrap()
.try_collect::<Vec<String>>()
.await
.unwrap();
keys_with_filter.sort();
assert_eq!(vec!["bar"], keys_with_filter);

kv.put("foo1.bar", 37.to_string().into()).await.unwrap();
kv.put("foo1.baz.boo", 73.to_string().into()).await.unwrap();
kv.put("foo1.baz.baz", 89.to_string().into()).await.unwrap();

let mut keys_with_filters = kv
.keys_with_filters(vec!["foo", "bar"])
.await
.unwrap()
.try_collect::<Vec<String>>()
.await
.unwrap();
keys_with_filters.sort();
assert_eq!(vec!["bar", "foo"], keys_with_filters);

let mut keys_with_filters = kv
.keys_with_filters(vec!["foo1.*.*"])
.await
.unwrap()
.try_collect::<Vec<String>>()
.await
.unwrap();
keys_with_filters.sort();
assert_eq!(vec!["foo1.baz.baz", "foo1.baz.boo"], keys_with_filters);

let mut keys_with_filters = kv
.keys_with_filters(vec!["foo1.*.*", "foo1.*"])
.await
.unwrap()
.try_collect::<Vec<String>>()
.await
.unwrap();
keys_with_filters.sort();
assert_eq!(
vec!["foo1.bar", "foo1.baz.baz", "foo1.baz.boo"],
keys_with_filters
);

let mut keys_with_filters = kv
.keys_with_filters(vec!["*.baz.*"])
.await
.unwrap()
.try_collect::<Vec<String>>()
.await
.unwrap();

keys_with_filters.sort();
assert_eq!(vec!["foo1.baz.baz", "foo1.baz.boo"], keys_with_filters);

// cleanup the keys
kv.delete("foo1.bar").await.unwrap();
kv.delete("foo1.baz.boo").await.unwrap();
kv.delete("foo1.baz.baz").await.unwrap();
}
// filters like "foo.b*" should not return anything because it's not a valid filter

// Delete a key and make sure it doesn't show up in the keys list
kv.delete("bar").await.unwrap();
let keys = kv
Expand Down