Skip to content

Commit 71751ed

Browse files
committed
Use asyncio.QueueShutdown when available
Signed-off-by: Waldemar Quevedo <[email protected]>
1 parent adc5c10 commit 71751ed

File tree

3 files changed

+52
-21
lines changed

3 files changed

+52
-21
lines changed

nats/src/nats/aio/client.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import re
2323
import ssl
2424
import string
25+
import sys
2526
import time
2627
from collections import UserString
2728
from dataclasses import dataclass
@@ -59,6 +60,9 @@
5960
)
6061
from .transport import TcpTransport, Transport, WebSocketTransport
6162

63+
# Python 3.13+ has QueueShutDown exception for cleaner queue termination
64+
_HAS_QUEUE_SHUTDOWN = sys.version_info >= (3, 13)
65+
6266
try:
6367
from importlib.metadata import version
6468

@@ -1797,15 +1801,19 @@ async def _process_msg(
17971801
if sub._jsi:
17981802
await sub._jsi.check_for_sequence_mismatch(msg)
17991803

1800-
# Send sentinel after reaching max messages for non-callback subscriptions.
1801-
if max_msgs_reached and not sub._cb and sub._active_consumers is not None and sub._active_consumers > 0:
1802-
# Send one sentinel per active consumer to unblock them all.
1803-
for _ in range(sub._active_consumers):
1804-
try:
1805-
sub._pending_queue.put_nowait(None)
1806-
except Exception:
1807-
# Queue might be full or closed, that's ok
1808-
break
1804+
# Unblock waiting consumers after reaching max messages for non-callback subscriptions.
1805+
if max_msgs_reached and not sub._cb:
1806+
try:
1807+
if _HAS_QUEUE_SHUTDOWN:
1808+
# Python 3.13+: Use queue shutdown for graceful termination.
1809+
sub._pending_queue.shutdown()
1810+
elif sub._active_consumers is not None and sub._active_consumers > 0:
1811+
# Python < 3.13: Send sentinels for each active consumer
1812+
for _ in range(sub._active_consumers):
1813+
sub._pending_queue.put_nowait(None)
1814+
except Exception:
1815+
# Queue might be closed or full.
1816+
pass
18091817

18101818
def _build_message(
18111819
self,

nats/src/nats/aio/subscription.py

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from __future__ import annotations
1616

1717
import asyncio
18+
import sys
1819
from typing import (
1920
TYPE_CHECKING,
2021
AsyncIterator,
@@ -31,6 +32,16 @@
3132
if TYPE_CHECKING:
3233
from nats.js import JetStreamContext
3334

35+
# Python 3.13+ has QueueShutDown exception for cleaner queue termination.
36+
_HAS_QUEUE_SHUTDOWN = sys.version_info >= (3, 13)
37+
if _HAS_QUEUE_SHUTDOWN:
38+
from asyncio import QueueShutDown
39+
else:
40+
# For older Python versions, we'll use a custom exception
41+
class QueueShutDown(Exception):
42+
pass
43+
44+
3445
DEFAULT_SUB_PENDING_MSGS_LIMIT = 512 * 1024
3546
DEFAULT_SUB_PENDING_BYTES_LIMIT = 128 * 1024 * 1024
3647

@@ -84,8 +95,10 @@ def __init__(
8495
self._pending_msgs_limit = pending_msgs_limit
8596
self._pending_bytes_limit = pending_bytes_limit
8697
self._pending_queue: asyncio.Queue[Msg] = asyncio.Queue(maxsize=pending_msgs_limit)
87-
# Track active consumers (both async generators and next_msg calls) for non-callback subscriptions.
88-
if cb is None:
98+
99+
# For Python < 3.13, we need to track active consumers for sentinel-based termination
100+
# For Python 3.13+, we use QueueShutDown which doesn't require tracking.
101+
if not _HAS_QUEUE_SHUTDOWN and cb is None:
89102
self._active_consumers = 0 # Counter of active consumers waiting for messages
90103
else:
91104
self._active_consumers = None
@@ -135,8 +148,10 @@ async def _message_generator(self) -> AsyncIterator[Msg]:
135148
Async generator that yields messages directly from the subscription queue.
136149
"""
137150
yielded_count = 0
151+
138152
if self._active_consumers is not None:
139153
self._active_consumers += 1
154+
140155
try:
141156
while True:
142157
# Check if subscription was cancelled/closed.
@@ -151,6 +166,8 @@ async def _message_generator(self) -> AsyncIterator[Msg]:
151166
msg = await self._pending_queue.get()
152167
except asyncio.CancelledError:
153168
break
169+
except QueueShutDown:
170+
break
154171

155172
# Check for sentinel value which signals generator to stop.
156173
if msg is None:
@@ -224,7 +241,6 @@ async def next_msg(self, timeout: Optional[float] = 1.0) -> Msg:
224241
if self._cb:
225242
raise errors.Error("nats: next_msg cannot be used in async subscriptions")
226243

227-
# Track this next_msg call
228244
if self._active_consumers is not None:
229245
self._active_consumers += 1
230246

@@ -243,8 +259,11 @@ async def next_msg(self, timeout: Optional[float] = 1.0) -> Msg:
243259
if self._conn.is_closed:
244260
raise errors.ConnectionClosedError
245261
raise
262+
except QueueShutDown:
263+
if self._conn.is_closed:
264+
raise errors.ConnectionClosedError
265+
raise errors.TimeoutError
246266
finally:
247-
# Untrack this next_msg call.
248267
if self._active_consumers is not None:
249268
self._active_consumers -= 1
250269

@@ -352,14 +371,17 @@ def _stop_processing(self) -> None:
352371
if self._wait_for_msgs_task and not self._wait_for_msgs_task.done():
353372
self._wait_for_msgs_task.cancel()
354373

355-
# Send sentinels to unblock waiting consumers
374+
# Unblock waiting consumers
356375
try:
357-
if self._pending_queue and self._active_consumers is not None and self._active_consumers > 0:
358-
# Send one sentinel for each active consumer (both generators and next_msg calls)
359-
for _ in range(self._active_consumers):
360-
self._pending_queue.put_nowait(None)
376+
if self._pending_queue:
377+
if _HAS_QUEUE_SHUTDOWN:
378+
# Python 3.13+: Use queue shutdown for graceful termination.
379+
self._pending_queue.shutdown()
380+
elif self._active_consumers is not None and self._active_consumers > 0:
381+
# Python < 3.13: Send sentinels for each active consumer.
382+
for _ in range(self._active_consumers):
383+
self._pending_queue.put_nowait(None)
361384
except Exception:
362-
# Queue might be closed or full, that's ok
363385
pass
364386

365387
async def _wait_for_msgs(self, error_cb) -> None:
@@ -401,3 +423,5 @@ async def _wait_for_msgs(self, error_cb) -> None:
401423
self._stop_processing()
402424
except asyncio.CancelledError:
403425
break
426+
except QueueShutDown:
427+
break

nats/src/nats/js/client.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -883,15 +883,14 @@ def __init__(
883883
self._cb = sub._cb
884884
self._future = sub._future
885885
self._closed = sub._closed
886-
self._active_generators = sub._active_generators
886+
self._active_consumers = sub._active_consumers
887887

888888
# Per subscription message processor.
889889
self._pending_msgs_limit = sub._pending_msgs_limit
890890
self._pending_bytes_limit = sub._pending_bytes_limit
891891
self._pending_queue = sub._pending_queue
892892
self._pending_size = sub._pending_size
893893
self._wait_for_msgs_task = sub._wait_for_msgs_task
894-
self._pending_next_msgs_calls = sub._pending_next_msgs_calls
895894

896895
async def consumer_info(self) -> api.ConsumerInfo:
897896
"""

0 commit comments

Comments
 (0)