|
2 | 2 | # SPDX-License-Identifier: Apache-2.0 |
3 | 3 | """Tests for the TimingManager service.""" |
4 | 4 |
|
5 | | -from unittest.mock import MagicMock, patch |
| 5 | +from unittest.mock import AsyncMock, MagicMock, patch |
6 | 6 |
|
7 | 7 | import pytest |
8 | 8 |
|
9 | 9 | from aiperf.common.config import ServiceConfig, UserConfig |
10 | 10 | from aiperf.common.enums import TimingMode |
11 | 11 | from aiperf.common.environment import Environment |
12 | 12 | from aiperf.common.messages import ( |
| 13 | + CommandMessage, |
13 | 14 | DatasetConfiguredNotification, |
14 | 15 | ProfileConfigureCommand, |
15 | 16 | ) |
@@ -225,3 +226,89 @@ async def test_dataset_notification_before_configure( |
225 | 226 | call_kwargs = mock_factory.call_args.kwargs |
226 | 227 | assert "dataset_metadata" in call_kwargs |
227 | 228 | assert call_kwargs["dataset_metadata"] == mock_dataset_metadata |
| 229 | + |
| 230 | + |
| 231 | +class TestTimingManagerGarbageCollection: |
| 232 | + """Test suite for TimingManager garbage collection control.""" |
| 233 | + |
| 234 | + def _create_timing_manager( |
| 235 | + self, service_config: ServiceConfig, user_config: UserConfig |
| 236 | + ) -> TimingManager: |
| 237 | + """Create a TimingManager instance (ZMQ is globally mocked).""" |
| 238 | + return TimingManager( |
| 239 | + service_config=service_config, |
| 240 | + user_config=user_config, |
| 241 | + service_id="test-timing-manager", |
| 242 | + ) |
| 243 | + |
| 244 | + @pytest.fixture |
| 245 | + def service_config(self): |
| 246 | + """Service configuration fixture.""" |
| 247 | + return ServiceConfig() |
| 248 | + |
| 249 | + @pytest.fixture |
| 250 | + def user_config(self): |
| 251 | + """User config fixture.""" |
| 252 | + return UserConfig.model_construct( |
| 253 | + endpoint=MagicMock(), |
| 254 | + _timing_mode=TimingMode.REQUEST_RATE, |
| 255 | + ) |
| 256 | + |
| 257 | + @pytest.fixture |
| 258 | + def configured_manager(self, service_config, user_config): |
| 259 | + """Create a configured timing manager with strategy.""" |
| 260 | + manager = self._create_timing_manager(service_config, user_config) |
| 261 | + manager._credit_issuing_strategy = MagicMock() |
| 262 | + manager._credit_issuing_strategy.start = AsyncMock() |
| 263 | + manager._credit_issuing_strategy.stop = AsyncMock() |
| 264 | + manager.initialized_event.set() |
| 265 | + return manager |
| 266 | + |
| 267 | + @pytest.mark.asyncio |
| 268 | + async def test_gc_disabled_on_profiling_start(self, configured_manager): |
| 269 | + """Test that garbage collection is collected, frozen, and disabled when profiling starts.""" |
| 270 | + with patch("aiperf.timing.timing_manager.gc") as mock_gc: |
| 271 | + await configured_manager._on_start_profiling( |
| 272 | + CommandMessage.model_construct(service_id="test-controller") |
| 273 | + ) |
| 274 | + |
| 275 | + assert mock_gc.collect.called |
| 276 | + assert mock_gc.freeze.called |
| 277 | + assert mock_gc.disable.called |
| 278 | + |
| 279 | + # Verify correct order: collect -> freeze -> disable |
| 280 | + calls = mock_gc.method_calls |
| 281 | + call_names = [c[0] for c in calls] |
| 282 | + collect_idx = call_names.index("collect") |
| 283 | + freeze_idx = call_names.index("freeze") |
| 284 | + disable_idx = call_names.index("disable") |
| 285 | + assert collect_idx < freeze_idx < disable_idx |
| 286 | + |
| 287 | + @pytest.mark.asyncio |
| 288 | + async def test_gc_enabled_on_stop(self, configured_manager): |
| 289 | + """Test that garbage collection is unfrozen and re-enabled when timing manager stops.""" |
| 290 | + with patch("aiperf.timing.timing_manager.gc") as mock_gc: |
| 291 | + await configured_manager._timing_manager_stop() |
| 292 | + |
| 293 | + assert mock_gc.unfreeze.called |
| 294 | + assert mock_gc.enable.called |
| 295 | + |
| 296 | + # Verify correct order: unfreeze -> enable |
| 297 | + calls = mock_gc.method_calls |
| 298 | + call_names = [c[0] for c in calls] |
| 299 | + unfreeze_idx = call_names.index("unfreeze") |
| 300 | + enable_idx = call_names.index("enable") |
| 301 | + assert unfreeze_idx < enable_idx |
| 302 | + |
| 303 | + @pytest.mark.asyncio |
| 304 | + async def test_gc_enabled_on_stop_without_strategy( |
| 305 | + self, service_config, user_config |
| 306 | + ): |
| 307 | + """Test that GC is re-enabled even if no strategy was configured.""" |
| 308 | + manager = self._create_timing_manager(service_config, user_config) |
| 309 | + |
| 310 | + with patch("aiperf.timing.timing_manager.gc") as mock_gc: |
| 311 | + await manager._timing_manager_stop() |
| 312 | + |
| 313 | + assert mock_gc.unfreeze.called |
| 314 | + assert mock_gc.enable.called |
0 commit comments