Skip to content

Commit 3e63840

Browse files
committed
allow for watching future values with watcher
1 parent 7e7883e commit 3e63840

File tree

2 files changed

+18
-12
lines changed

2 files changed

+18
-12
lines changed

nats/js/kv.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,9 @@ async def purge_deletes(self, olderthan: int = 30 * 60) -> bool:
278278
watcher = await self.watchall()
279279
delete_markers = []
280280
async for update in watcher:
281+
if update is None:
282+
break
283+
281284
if update.operation == KV_DEL or update.operation == KV_PURGE:
282285
delete_markers.append(update)
283286

@@ -300,11 +303,11 @@ async def status(self) -> BucketStatus:
300303
return KeyValue.BucketStatus(stream_info=info, bucket=self._name)
301304

302305
class KeyWatcher:
306+
STOP_ITER = StopIterSentinel()
303307

304308
def __init__(self, js):
305309
self._js = js
306-
self._updates: asyncio.Queue[KeyValue.Entry
307-
| None] = asyncio.Queue(maxsize=256)
310+
self._updates: asyncio.Queue[KeyValue.Entry | None | StopIterSentinel] = asyncio.Queue(maxsize=256)
308311
self._sub = None
309312
self._pending: Optional[int] = None
310313

@@ -317,6 +320,7 @@ async def stop(self):
317320
stop will stop this watcher.
318321
"""
319322
await self._sub.unsubscribe()
323+
await self._updates.put(KeyValue.KeyWatcher.STOP_ITER)
320324

321325
async def updates(self, timeout=5.0):
322326
"""
@@ -331,10 +335,10 @@ def __aiter__(self):
331335
return self
332336

333337
async def __anext__(self):
334-
entry = await self._updates.get()
335-
if not entry:
336-
raise StopAsyncIteration
337-
else:
338+
while True:
339+
entry = await self._updates.get()
340+
if isinstance(entry, StopIterSentinel):
341+
raise StopAsyncIteration
338342
return entry
339343

340344
async def watchall(self, **kwargs) -> KeyWatcher:

nats/js/object_store.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
ObjectDeletedError,
3434
ObjectNotFoundError,
3535
)
36-
from nats.js.kv import MSG_ROLLUP_SUBJECT
36+
from nats.js.kv import MSG_ROLLUP_SUBJECT, StopIterSentinel
3737

3838
VALID_BUCKET_RE = re.compile(r"^[a-zA-Z0-9_-]+$")
3939
VALID_KEY_RE = re.compile(r"^[-/_=\.a-zA-Z0-9]+$")
@@ -427,10 +427,11 @@ async def update_meta(
427427
await self._js.purge_stream(self._stream, subject=meta_subj)
428428

429429
class ObjectWatcher:
430+
STOP_ITER = StopIterSentinel()
430431

431432
def __init__(self, js):
432433
self._js = js
433-
self._updates = asyncio.Queue(maxsize=256)
434+
self._updates: asyncio.Queue[Union[api.ObjectInfo, None, StopIterSentinel]] = asyncio.Queue(maxsize=256)
434435
self._sub = None
435436
self._pending: Optional[int] = None
436437

@@ -457,10 +458,11 @@ def __aiter__(self):
457458
return self
458459

459460
async def __anext__(self):
460-
entry = await self._updates.get()
461-
if not entry:
462-
raise StopAsyncIteration
463-
else:
461+
while True:
462+
entry = await self._updates.get()
463+
464+
if isinstance(entry, StopIterSentinel):
465+
raise StopAsyncIteration
464466
return entry
465467

466468
async def watch(

0 commit comments

Comments
 (0)