Skip to content

Commit edbdf19

Browse files
committed
clean up project tasks class
1 parent e3b717f commit edbdf19

File tree

1 file changed

+15
-20
lines changed

1 file changed

+15
-20
lines changed

src/backend/base/langflow/api/v1/mcp_projects.py

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1251,34 +1251,33 @@ async def ensure_session_manager_running(self) -> None:
12511251
if self._manager_started:
12521252
return
12531253

1254-
shutdown_event, started_event = self.create_events()
1255-
stack = get_project_task_group_tg()
1256-
self._runner_task = stack.create_task(
1257-
self._run_session_manager(shutdown_event, started_event)
1254+
self._shutdown_event = asyncio.Event()
1255+
self._started_event = asyncio.Event()
1256+
tg = get_project_task_group_tg()
1257+
self._runner_task = tg.create_task(
1258+
self._run_session_manager()
12581259
)
12591260
self._manager_started = True
12601261
await logger.adebug("Streamable HTTP manager started for project %s", self.project_id)
1261-
await started_event.wait()
1262+
await self._started_event.wait()
12621263

1263-
async def _run_session_manager(self, shutdown_event: asyncio.Event, started_event: asyncio.Event) -> None:
1264+
async def _run_session_manager(self) -> None:
12641265
"""Own the lifecycle of the project's Streamable HTTP session manager."""
12651266
try:
12661267
async with self.session_manager.run():
1267-
started_event.set()
1268-
await shutdown_event.wait()
1268+
self._started_event.set()
1269+
await self._shutdown_event.wait()
12691270
except asyncio.CancelledError:
12701271
raise
12711272
except Exception as exc: # noqa: BLE001
12721273
await logger.aexception(
12731274
f"Project {self.project_id} Streamable HTTP manager crashed: {exc}"
12741275
)
12751276
finally:
1276-
started_event.set()
1277+
self._started_event.set() # prevent hanging if run() fails
12771278
self._runner_task = None
1278-
if self._shutdown_event is shutdown_event:
1279-
self._shutdown_event = None
1280-
if self._started_event is started_event:
1281-
self._started_event = None
1279+
self._shutdown_event = None
1280+
self._started_event = None
12821281
self._manager_started = False
12831282
await logger.adebug("Streamable HTTP manager stopped for project %s", self.project_id)
12841283

@@ -1287,13 +1286,6 @@ def request_shutdown(self) -> None:
12871286
if self._shutdown_event and not self._shutdown_event.is_set():
12881287
self._shutdown_event.set()
12891288

1290-
def create_events(self) -> tuple[asyncio.Event, asyncio.Event]:
1291-
"""Create the shutdown and started events."""
1292-
shutdown_event = asyncio.Event()
1293-
started_event = asyncio.Event()
1294-
self._shutdown_event = shutdown_event
1295-
self._started_event = started_event
1296-
return shutdown_event, started_event
12971289

12981290
# Cache of project MCP servers
12991291
project_mcp_servers: dict[str, ProjectMCPServer] = {}
@@ -1359,17 +1351,20 @@ def get_task_group(self) -> asyncio.TaskGroup:
13591351
"""Get the shared project task group."""
13601352
return self._task_group
13611353

1354+
13621355
_project_task_group = ProjectTaskGroup()
13631356

13641357

13651358
async def start_project_task_group() -> None:
13661359
"""Initialize the shared project task group."""
13671360
await _project_task_group.start()
13681361

1362+
13691363
def get_project_task_group_tg() -> asyncio.TaskGroup:
13701364
"""Get the shared project task group."""
13711365
return _project_task_group.get_task_group()
13721366

1367+
13731368
async def stop_project_task_group() -> None:
13741369
"""Close the shared project task group."""
13751370
await _project_task_group.stop()

0 commit comments

Comments
 (0)