@@ -94,8 +94,6 @@ def __init__(
9494 self ._pending_next_msgs_calls = None
9595 self ._pending_size = 0
9696 self ._wait_for_msgs_task = None
97- # For compatibility with tests that expect _message_iterator
98- self ._message_iterator = None
9997
10098 # For JetStream enabled subscriptions.
10199 self ._jsi : Optional [JetStreamContext ._JSI ] = None
@@ -147,14 +145,6 @@ async def _message_generator(self) -> AsyncIterator[Msg]:
147145 if self ._closed :
148146 break
149147
150- # Check if wrapper was cancelled (for compatibility with tests).
151- if (
152- hasattr (self , "_message_iterator" )
153- and self ._message_iterator
154- and self ._message_iterator ._unsubscribed_future .done ()
155- ):
156- break
157-
158148 # Check max message limit based on how many we've yielded so far.
159149 if self ._max_msgs > 0 and yielded_count >= self ._max_msgs :
160150 break
@@ -177,9 +167,6 @@ async def _message_generator(self) -> AsyncIterator[Msg]:
177167
178168 # Check if we should auto-unsubscribe after yielding this message.
179169 if self ._max_msgs > 0 and yielded_count >= self ._max_msgs :
180- # Cancel the wrapper too for consistency.
181- if hasattr (self , "_message_iterator" ) and self ._message_iterator :
182- self ._message_iterator ._cancel ()
183170 break
184171 except asyncio .CancelledError :
185172 pass
@@ -271,8 +258,7 @@ def _start(self, error_cb):
271258 pass
272259 else :
273260 # For async iteration, we now use a generator directly via the messages property
274- # But we create a compatibility wrapper for tests
275- self ._message_iterator = _CompatibilityIteratorWrapper (self )
261+ pass
276262
277263 async def drain (self ):
278264 """
@@ -343,8 +329,6 @@ def _stop_processing(self) -> None:
343329 """
344330 if self ._wait_for_msgs_task and not self ._wait_for_msgs_task .done ():
345331 self ._wait_for_msgs_task .cancel ()
346- if hasattr (self , "_message_iterator" ) and self ._message_iterator :
347- self ._message_iterator ._cancel ()
348332
349333 # Only put sentinel if there are active async generators
350334 try :
@@ -394,18 +378,3 @@ async def _wait_for_msgs(self, error_cb) -> None:
394378 self ._stop_processing ()
395379 except asyncio .CancelledError :
396380 break
397-
398-
399- class _CompatibilityIteratorWrapper :
400- """
401- Compatibility wrapper that provides the same interface as the old _SubscriptionMessageIterator
402- but uses the more efficient generator internally.
403- """
404-
405- def __init__ (self , sub : Subscription ) -> None :
406- self ._sub = sub
407- self ._unsubscribed_future : asyncio .Future [bool ] = asyncio .Future ()
408-
409- def _cancel (self ) -> None :
410- if not self ._unsubscribed_future .done ():
411- self ._unsubscribed_future .set_result (True )
0 commit comments