55import string
66import sys
77import traceback
8+ import importlib .metadata
89from types import FunctionType
910
10- from async_generator ._impl import ANextIter
11-
12- from trio import Queue , WouldBlock , BrokenStreamError
11+ from trio import WouldBlock , open_memory_channel , BrokenResourceError
1312from trio ._highlevel_serve_listeners import _run_handler
14- from ._version import __version__
1513from trio .abc import Instrument
16- from trio .hazmat import current_task , Task
14+ from trio .lowlevel import current_task , Task
1715
1816
1917# inspiration: https://github.com/python-trio/trio/blob/master/notes-to-self/print-task-tree.py
@@ -26,9 +24,6 @@ def walk_coro_stack(coro):
2624 # A real coroutine
2725 yield coro .cr_frame , coro .cr_frame .f_lineno
2826 coro = coro .cr_await
29- elif isinstance (coro , ANextIter ):
30- # black hole
31- return
3227 else :
3328 # A generator decorated with @types.coroutine
3429 yield coro .gi_frame , coro .gi_frame .f_lineno
@@ -58,7 +53,7 @@ def __init__(self):
5853 self ._is_monitoring = False
5954 # semi-arbitrary size, because otherwise we'll be dropping events
6055 # no clue how to make this better, alas.
61- self ._monitoring_queue = Queue ( capacity = 100 )
56+ self ._tx , self . _rx = open_memory_channel ( 100 )
6257
6358 @staticmethod
6459 def get_root_task () -> Task :
@@ -123,9 +118,11 @@ def _add_to_monitoring_queue(self, item):
123118 # if it's our own handler, skip it!
124119 if loc ["handler" ] == self .listen_on_stream :
125120 return
121+ if task .coro .cr_code == self .do_monitor .__code__ :
122+ return
126123
127124 try :
128- self ._monitoring_queue . put_nowait (item )
125+ self ._tx . send_nowait (item )
129126 except WouldBlock :
130127 return
131128
@@ -138,7 +135,7 @@ async def listen_on_stream(self, stream):
138135 async def main_loop (self , stream ):
139136 """Runs the main loop of the monitor."""
140137 # send the banner
141- version = __version__
138+ version = importlib . metadata . version ( "trio" )
142139 await stream .send_all (
143140 b"Connected to the Trio monitor, using "
144141 b"trio " + version .encode (encoding = "ascii" ) + b"\n "
@@ -165,7 +162,7 @@ async def main_loop(self, stream):
165162 finally :
166163 self ._is_monitoring = False
167164 # empty out the queue
168- self ._monitoring_queue = Queue ( capacity = 100 )
165+ self ._tx , self . _rx = open_memory_channel ( 100 )
169166
170167 try :
171168 fn = getattr (self , "command_{}" .format (name ))
@@ -195,7 +192,8 @@ async def main_loop(self, stream):
195192 async def do_monitor (self , stream ):
196193 """Livefeeds information about the running program."""
197194 prefix = "[FEED] "
198- async for item in self ._monitoring_queue :
195+ while True :
196+ item = await self ._rx .receive ()
199197 key = item [0 ]
200198
201199 if key == "task_spawned" :
@@ -232,7 +230,7 @@ async def do_monitor(self, stream):
232230
233231 try :
234232 await stream .send_all (message .encode ("ascii" ) + b"\n " )
235- except BrokenStreamError : # client disconnected on us
233+ except BrokenResourceError : # client disconnected on us
236234 return
237235
238236 # command definitions
0 commit comments