diff --git a/CLAUDE.md b/CLAUDE.md index b9e105c69..53b881560 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -78,7 +78,7 @@ except Exception as e: **Logging**: Lambda for expensive logs: `self.debug(lambda: f"{len(self._x())}")`. Direct string: `self.info("Starting")`. -**JSON**: Always orjson: `orjson.loads(s)`, `orjson.dumps(d)` +**JSON**: Always msgspec: `msgspec.json.decode(s)`, `msgspec.json.encode(d)` ## Mixins & Base Classes diff --git a/pyproject.toml b/pyproject.toml index 3d33c7184..bd396122d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,8 +30,8 @@ dependencies = [ "ffmpeg-python~=0.2.0", "jinja2~=3.1.5", # NOTE: Versions prior to 3.1.5 have vuln exploits "jmespath~=1.0.1", + "msgspec~=0.20.0", "numpy~=1.26.4", - "orjson~=3.10.18", "pillow~=11.1.0", "prometheus_client~=0.23.1", "psutil~=7.0.0", diff --git a/src/aiperf/common/config/config_validators.py b/src/aiperf/common/config/config_validators.py index 0941b3479..c5c199f12 100644 --- a/src/aiperf/common/config/config_validators.py +++ b/src/aiperf/common/config/config_validators.py @@ -5,7 +5,7 @@ from pathlib import Path from typing import Any -import orjson +import msgspec from aiperf.common.enums.service_enums import ServiceType from aiperf.common.utils import load_json_str @@ -151,7 +151,7 @@ def parse_str_or_dict_as_tuple_list(input: Any | None) -> list[tuple[str, Any]] if input.startswith("{"): try: return [(key, value) for key, value in load_json_str(input).items()] - except orjson.JSONDecodeError as e: + except msgspec.DecodeError as e: raise ValueError( f"User Config: {input} - must be a valid JSON string" ) from e diff --git a/src/aiperf/common/config/user_config.py b/src/aiperf/common/config/user_config.py index f4d615861..a15b7fffe 100644 --- a/src/aiperf/common/config/user_config.py +++ b/src/aiperf/common/config/user_config.py @@ -5,7 +5,7 @@ from pathlib import Path from typing import Annotated, Any -from orjson import JSONDecodeError +import msgspec from pydantic import BeforeValidator, Field, model_validator from typing_extensions import Self @@ -141,7 +141,7 @@ def _should_use_fixed_schedule_for_mooncake_trace(self) -> bool: try: data = load_json_str(line) return "timestamp" in data and data["timestamp"] is not None - except (JSONDecodeError, KeyError): + except (msgspec.DecodeError, KeyError): continue except (OSError, FileNotFoundError): _logger.warning( diff --git a/src/aiperf/common/messages/base_messages.py b/src/aiperf/common/messages/base_messages.py index a8eeef947..af1b3671a 100644 --- a/src/aiperf/common/messages/base_messages.py +++ b/src/aiperf/common/messages/base_messages.py @@ -3,7 +3,7 @@ import time from typing import ClassVar -import orjson +import msgspec from pydantic import Field from aiperf.common.enums.message_enums import MessageType @@ -44,10 +44,10 @@ def __str__(self) -> str: return self.model_dump_json(exclude_none=True) def to_json_bytes(self) -> bytes: - """Serialize message to JSON bytes using orjson for optimal performance. + """Serialize message to JSON bytes using msgspec for optimal performance. - This method uses orjson for high-performance serialization (6x faster for - large records >20KB). It automatically excludes None fields to minimize + This method uses msgspec for high-performance serialization (ultra-fast + serialization for large records). It automatically excludes None fields to minimize message size. Returns: @@ -57,7 +57,7 @@ def to_json_bytes(self) -> bytes: Prefer this method over model_dump_json() for ZMQ message passing and other high-throughput scenarios. """ - return orjson.dumps(self.model_dump(exclude_none=True, mode="json")) + return msgspec.json.encode(self.model_dump(exclude_none=True, mode="json")) class RequiresRequestNSMixin(Message): diff --git a/src/aiperf/common/mixins/buffered_jsonl_writer_mixin.py b/src/aiperf/common/mixins/buffered_jsonl_writer_mixin.py index dde09f27e..f8264d888 100644 --- a/src/aiperf/common/mixins/buffered_jsonl_writer_mixin.py +++ b/src/aiperf/common/mixins/buffered_jsonl_writer_mixin.py @@ -7,7 +7,7 @@ from typing import Generic import aiofiles -import orjson +import msgspec from aiperf.common.environment import Environment from aiperf.common.hooks import on_init, on_stop @@ -57,18 +57,18 @@ def __init__( async def _open_file(self) -> None: """Open the file handle for writing in binary mode (called automatically on initialization).""" async with self._file_lock: - # Binary mode for optimal performance with orjson + # Binary mode for optimal performance with msgspec self._file_handle = await aiofiles.open(self.output_file, mode="wb") async def buffered_write(self, record: BaseModelT) -> None: """Write a Pydantic model to the buffer with automatic flushing. - This method serializes the provided Pydantic model to JSON bytes using orjson + This method serializes the provided Pydantic model to JSON bytes using msgspec and adds it to the internal buffer. If the buffer reaches the configured batch size, it automatically flushes the buffer to disk. - Uses binary mode with orjson for optimal performance: - - 6x faster for large records (>20KB) + Uses binary mode with msgspec for optimal performance: + - Ultra-fast serialization for large records - No encode/decode overhead - Efficient for all record sizes @@ -76,9 +76,11 @@ async def buffered_write(self, record: BaseModelT) -> None: record: A Pydantic BaseModel instance to write """ try: - # Serialize to bytes using orjson (faster for large records) + # Serialize to bytes using msgspec (faster for large records) # Use exclude_none=True to omit None fields (smaller output) - json_bytes = orjson.dumps(record.model_dump(exclude_none=True, mode="json")) + json_bytes = msgspec.json.encode( + record.model_dump(exclude_none=True, mode="json") + ) buffer_to_flush = None async with self._buffer_lock: diff --git a/src/aiperf/common/models/record_models.py b/src/aiperf/common/models/record_models.py index 481c37145..a9d2192e8 100644 --- a/src/aiperf/common/models/record_models.py +++ b/src/aiperf/common/models/record_models.py @@ -7,7 +7,7 @@ from functools import cached_property from typing import Any, AnyStr -import orjson +import msgspec from pydantic import ( Field, RootModel, @@ -250,7 +250,7 @@ def get_json(self) -> JsonObject | None: if not self.text: return None return load_json_str(self.text) - except orjson.JSONDecodeError: + except msgspec.DecodeError: return None @@ -347,7 +347,7 @@ def get_json(self) -> JsonObject | None: if data_content in ("", None, "[DONE]"): return None return load_json_str(data_content) - except orjson.JSONDecodeError: + except msgspec.DecodeError: return None diff --git a/src/aiperf/common/models/sequence_distribution.py b/src/aiperf/common/models/sequence_distribution.py index e5af7451f..ad9ed9e70 100644 --- a/src/aiperf/common/models/sequence_distribution.py +++ b/src/aiperf/common/models/sequence_distribution.py @@ -35,8 +35,8 @@ import re from dataclasses import dataclass +import msgspec import numpy as np -import orjson from aiperf.common import random_generator as rng from aiperf.common.aiperf_logger import AIPerfLogger @@ -362,7 +362,7 @@ def _parse_pairs_from_json(cls, json_str: str) -> list[SequenceLengthPair]: """Parse JSON format and extract pairs: {"pairs": [{"isl": 256, "isl_stddev": 10, "osl": 128, "osl_stddev": 5, "prob": 40}, ...]}""" try: data = load_json_str(json_str) - except orjson.JSONDecodeError as e: + except msgspec.DecodeError as e: raise ValueError(f"Invalid JSON format: {e}") from e if "pairs" not in data: diff --git a/src/aiperf/common/utils.py b/src/aiperf/common/utils.py index 80a2e4350..6fa2929d8 100644 --- a/src/aiperf/common/utils.py +++ b/src/aiperf/common/utils.py @@ -7,7 +7,7 @@ from collections.abc import Callable from typing import Any -import orjson +import msgspec from aiperf.common import aiperf_logger from aiperf.common.aiperf_logger import AIPerfLogger @@ -90,11 +90,9 @@ def load_json_str( run validation checks on the object. Defaults to identity function. """ try: - # Note: orjson may not parse JSON the same way as Python's standard json library, - # notably being stricter on UTF-8 conformance. - # Refer to https://github.com/ijl/orjson?tab=readme-ov-file#str for details. - return func(orjson.loads(json_str)) - except orjson.JSONDecodeError as e: + # Note: msgspec is strict on UTF-8 conformance and provides fast JSON parsing + return func(msgspec.json.decode(json_str)) + except msgspec.DecodeError as e: snippet = json_str[:200] + ("..." if len(json_str) > 200 else "") _logger.exception(f"Failed to parse JSON string: '{snippet}' - {e!r}") raise diff --git a/src/aiperf/endpoints/template_endpoint.py b/src/aiperf/endpoints/template_endpoint.py index 5b863908b..578014de6 100644 --- a/src/aiperf/endpoints/template_endpoint.py +++ b/src/aiperf/endpoints/template_endpoint.py @@ -8,7 +8,7 @@ import jinja2 import jmespath -import orjson +import msgspec from aiperf.common.decorators import implements_protocol from aiperf.common.enums import EndpointType @@ -145,8 +145,8 @@ def format_payload(self, request_info: RequestInfo) -> dict[str, Any]: rendered = self._template.render(**template_vars) try: - payload = orjson.loads(rendered) - except orjson.JSONDecodeError as e: + payload = msgspec.json.decode(rendered) + except msgspec.DecodeError as e: self.error(f"Template did not render valid JSON: {rendered} - {e!r}") raise ValueError( f"Template did not render valid JSON {e!r}: {rendered[:100]}" diff --git a/src/aiperf/transports/aiohttp_transport.py b/src/aiperf/transports/aiohttp_transport.py index e15306586..79c359a43 100644 --- a/src/aiperf/transports/aiohttp_transport.py +++ b/src/aiperf/transports/aiohttp_transport.py @@ -7,7 +7,7 @@ from collections.abc import Mapping from typing import Any -import orjson +import msgspec from aiperf.common.enums import TransportType from aiperf.common.exceptions import NotInitializedError @@ -148,8 +148,8 @@ async def send_request( url = self.build_url(request_info) headers = self.build_headers(request_info) - # Serialize with orjson for performance - json_str = orjson.dumps(payload).decode("utf-8") + # Serialize with msgspec for performance + json_str = msgspec.json.encode(payload).decode("utf-8") record = await self.aiohttp_client.post_request(url, json_str, headers) record.request_headers = headers except Exception as e: diff --git a/tests/aiperf_mock_server/config.py b/tests/aiperf_mock_server/config.py index b929bf5c4..ab242811e 100644 --- a/tests/aiperf_mock_server/config.py +++ b/tests/aiperf_mock_server/config.py @@ -2,11 +2,11 @@ # SPDX-License-Identifier: Apache-2.0 """Mock server configuration.""" -import json import logging import os from typing import Annotated, Any, Literal +import msgspec from cyclopts import Parameter from pydantic import Field, model_validator from pydantic_settings import BaseSettings, SettingsConfigDict @@ -153,5 +153,5 @@ def _get_env_key(config_key: str) -> str: def _serialize_env_value(value: Any) -> str: """Serialize value for environment variable storage.""" if isinstance(value, list | dict): - return json.dumps(value) + return msgspec.json.encode(value).decode("utf-8") return str(value) diff --git a/tests/integration/test_gpu_telemetry.py b/tests/integration/test_gpu_telemetry.py index afc8a38f7..83c5f4a04 100644 --- a/tests/integration/test_gpu_telemetry.py +++ b/tests/integration/test_gpu_telemetry.py @@ -4,7 +4,7 @@ import platform -import orjson +import msgspec import pytest from aiperf.common.models.telemetry_models import TelemetryRecord @@ -98,7 +98,7 @@ async def test_gpu_telemetry_export( # Validate each line is valid JSON and can be parsed as TelemetryRecord for line in lines: - record_dict = orjson.loads(line) + record_dict = msgspec.json.decode(line) record = TelemetryRecord.model_validate(record_dict) # Verify required fields are present diff --git a/tests/integration/test_random_generator_canary.py b/tests/integration/test_random_generator_canary.py index 5d9c03203..f398afee2 100644 --- a/tests/integration/test_random_generator_canary.py +++ b/tests/integration/test_random_generator_canary.py @@ -8,9 +8,9 @@ that any changes to the codebase don't silently break determinism. """ -import json from pathlib import Path +import msgspec import pytest from aiperf.common.utils import load_json_str @@ -115,5 +115,5 @@ def _save_reference(self, data: dict) -> None: data: Inputs data to save as reference """ self.REFERENCE_FILE.parent.mkdir(parents=True, exist_ok=True) - with open(self.REFERENCE_FILE, "w", encoding="utf-8") as f: - json.dump(data, f, indent=2) + with open(self.REFERENCE_FILE, "wb") as f: + f.write(msgspec.json.format(msgspec.json.encode(data), indent=2)) diff --git a/tests/integration/utils.py b/tests/integration/utils.py index 127b96011..7d860eacf 100644 --- a/tests/integration/utils.py +++ b/tests/integration/utils.py @@ -3,11 +3,10 @@ """Utility functions for integration tests.""" import base64 -import json import subprocess from pathlib import Path -import orjson +import msgspec from aiperf.common.aiperf_logger import AIPerfLogger from tests.integration.models import VideoDetails @@ -34,7 +33,7 @@ def create_rankings_dataset(tmp_path: Path, num_entries: int) -> Path: {"name": "passages", "contents": [f"AI passage {i}"]}, ] } - f.write(orjson.dumps(entry).decode("utf-8") + "\n") + f.write(msgspec.json.encode(entry).decode("utf-8") + "\n") return dataset_path @@ -62,7 +61,7 @@ def extract_base64_video_details(base64_data: str) -> VideoDetails: ] result = subprocess.run(cmd, input=video_bytes, capture_output=True, check=True) - probe_data = json.loads(result.stdout) + probe_data = msgspec.json.decode(result.stdout) format_info = probe_data["format"] video_stream = next(s for s in probe_data["streams"] if s["codec_type"] == "video") diff --git a/tests/unit/common/messages/test_messages.py b/tests/unit/common/messages/test_messages.py index 62b33dfaf..ec44ce473 100644 --- a/tests/unit/common/messages/test_messages.py +++ b/tests/unit/common/messages/test_messages.py @@ -1,8 +1,6 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 -import json - -import orjson +import msgspec import pytest from aiperf.common.enums import LifecycleState, MessageType, ServiceType @@ -31,7 +29,9 @@ def test_status_message(): "request_ns": 1234567890, "request_id": "test", } - assert json.loads(message.model_dump_json(exclude_none=True)) == json.loads( + assert msgspec.json.decode( + message.model_dump_json(exclude_none=True) + ) == msgspec.json.decode( '{"message_type":"status","state":"running","service_id":"test","service_type":"worker","request_ns":1234567890,"request_id":"test"}' ) @@ -49,7 +49,9 @@ def test_status_message(): "service_type": ServiceType.WORKER, "request_ns": 1234567890, } - assert json.loads(message.model_dump_json(exclude_none=True)) == json.loads( + assert msgspec.json.decode( + message.model_dump_json(exclude_none=True) + ) == msgspec.json.decode( '{"message_type":"status","state":"initialized","service_id":"test","service_type":"worker","request_ns":1234567890}' ) @@ -78,7 +80,7 @@ def test_to_json_bytes_excludes_none_fields(self): ) json_bytes = message.to_json_bytes() - parsed = orjson.loads(json_bytes) + parsed = msgspec.json.decode(json_bytes) # request_id should not be in the output assert "request_id" not in parsed @@ -96,7 +98,7 @@ def test_to_json_bytes_includes_non_none_fields(self): ) json_bytes = message.to_json_bytes() - parsed = orjson.loads(json_bytes) + parsed = msgspec.json.decode(json_bytes) assert parsed["message_type"] == "status" assert parsed["state"] == "initialized" @@ -134,11 +136,11 @@ def test_to_json_bytes_equivalent_to_model_dump_json(self): # Old way old_bytes = message.model_dump_json(exclude_none=True).encode("utf-8") - old_parsed = json.loads(old_bytes) + old_parsed = msgspec.json.decode(old_bytes) # New way new_bytes = message.to_json_bytes() - new_parsed = orjson.loads(new_bytes) + new_parsed = msgspec.json.decode(new_bytes) # Should produce equivalent JSON assert old_parsed == new_parsed @@ -164,7 +166,7 @@ def test_to_json_bytes_with_complex_nested_data(self): ) json_bytes = message.to_json_bytes() - parsed = orjson.loads(json_bytes) + parsed = msgspec.json.decode(json_bytes) # Verify nested structure is preserved assert parsed["error"]["type"] == "TestError" @@ -226,8 +228,8 @@ def test_to_json_bytes_multiple_messages_independence(self): assert restored1.service_id == "service-1" assert restored2.service_id == "service-2" - def test_to_json_bytes_uses_orjson(self): - """Test that to_json_bytes() output is valid orjson format.""" + def test_to_json_bytes_is_valid_json(self): + """Test that to_json_bytes() output is valid JSON format.""" message = StatusMessage( state=LifecycleState.RUNNING, service_id="test", @@ -237,8 +239,8 @@ def test_to_json_bytes_uses_orjson(self): json_bytes = message.to_json_bytes() - # Should be parseable by orjson - parsed = orjson.loads(json_bytes) + # Should be parseable by msgspec + parsed = msgspec.json.decode(json_bytes) assert isinstance(parsed, dict) assert "message_type" in parsed @@ -250,7 +252,7 @@ def test_to_json_bytes_empty_optional_fields(self): ) json_bytes = message.to_json_bytes() - parsed = orjson.loads(json_bytes) + parsed = msgspec.json.decode(json_bytes) # Should only contain required fields and message_type assert "service_id" in parsed @@ -342,7 +344,7 @@ class TestMessageStringRepresentation: def test_message_str_json_output(self, message, expected_present, expected_absent): """Test that __str__() returns valid JSON with correct field inclusion/exclusion.""" str_output = str(message) - parsed = json.loads(str_output) + parsed = msgspec.json.decode(str_output) # Check expected fields are present for field in expected_present: diff --git a/tests/unit/common/mixins/test_buffered_jsonl_writer_mixin.py b/tests/unit/common/mixins/test_buffered_jsonl_writer_mixin.py index 66b13ede4..d08ccc295 100644 --- a/tests/unit/common/mixins/test_buffered_jsonl_writer_mixin.py +++ b/tests/unit/common/mixins/test_buffered_jsonl_writer_mixin.py @@ -2,10 +2,10 @@ # SPDX-License-Identifier: Apache-2.0 import asyncio -import json import tempfile from pathlib import Path +import msgspec import pytest from pydantic import BaseModel @@ -66,7 +66,7 @@ async def write_records(task_id: int): lines = [line.strip() for line in f.readlines()] assert len(lines) == expected_total for line in lines: - assert "id" in json.loads(line) + assert "id" in msgspec.json.decode(line) @pytest.mark.asyncio @pytest.mark.parametrize( diff --git a/tests/unit/common/models/test_auto_routed_messages.py b/tests/unit/common/models/test_auto_routed_messages.py index 472b977ae..0e6bb5964 100644 --- a/tests/unit/common/models/test_auto_routed_messages.py +++ b/tests/unit/common/models/test_auto_routed_messages.py @@ -2,8 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 """Tests for AutoRoutedModel-based message routing.""" -import json - +import msgspec import pytest from aiperf.common.enums import ( @@ -149,7 +148,7 @@ def test_json_string_routing(self, base_message_data): "state": "running", "service_type": "worker", } - msg = Message.from_json(json.dumps(data)) + msg = Message.from_json(msgspec.json.encode(data)) assert_routed_to(msg, StatusMessage, state=LifecycleState.RUNNING) @pytest.mark.parametrize( @@ -180,9 +179,9 @@ def test_unknown_discriminator_value_falls_back_to_base_class(self): "input_transform,description", [ (lambda d: d, "dict (no parsing)"), - (lambda d: json.dumps(d), "JSON string"), - (lambda d: json.dumps(d).encode("utf-8"), "bytes"), - (lambda d: bytearray(json.dumps(d).encode("utf-8")), "bytearray"), + (lambda d: msgspec.json.encode(d).decode("utf-8"), "JSON string"), + (lambda d: msgspec.json.encode(d), "bytes"), + (lambda d: bytearray(msgspec.json.encode(d)), "bytearray"), ], ) # fmt: skip def test_from_json_input_types(self, input_transform, description): diff --git a/tests/unit/common/models/test_sequence_distribution.py b/tests/unit/common/models/test_sequence_distribution.py index 2ef7c89f5..a57359ced 100644 --- a/tests/unit/common/models/test_sequence_distribution.py +++ b/tests/unit/common/models/test_sequence_distribution.py @@ -445,8 +445,8 @@ def test_invalid_format_parsing(self): "{'pairs': []}", # Single quotes instead of double quotes ], ) - def test_malformed_json_raises_orjson_error(self, malformed_json): - """Test that malformed JSON syntax raises ValueError with orjson.JSONDecodeError.""" + def test_malformed_json_raises_msgspec_error(self, malformed_json): + """Test that malformed JSON syntax raises ValueError with msgspec.DecodeError.""" with pytest.raises(ValueError, match="Invalid JSON format"): DistributionParser.parse(malformed_json) diff --git a/tests/unit/dataset/loader/conftest.py b/tests/unit/dataset/loader/conftest.py index 126dc7786..8e39fdaec 100644 --- a/tests/unit/dataset/loader/conftest.py +++ b/tests/unit/dataset/loader/conftest.py @@ -24,9 +24,15 @@ def create_jsonl_file(): def _create_file(content_lines): nonlocal filename - with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f: + with tempfile.NamedTemporaryFile(mode="wb", suffix=".jsonl", delete=False) as f: for line in content_lines: - f.write(line + "\n") + # Handle both bytes and str for backwards compatibility + if isinstance(line, str): + f.write(line.encode("utf-8") + b"\n") + elif isinstance(line, bytes): + f.write(line + b"\n") + else: + f.write(line + b"\n") filename = f.name return filename diff --git a/tests/unit/dataset/loader/test_multi_turn.py b/tests/unit/dataset/loader/test_multi_turn.py index 3119b2cb1..18455c136 100644 --- a/tests/unit/dataset/loader/test_multi_turn.py +++ b/tests/unit/dataset/loader/test_multi_turn.py @@ -1,8 +1,7 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 -import json - +import msgspec import pytest from aiperf.common.enums import CustomDatasetType @@ -125,7 +124,7 @@ class TestMultiTurnDatasetLoader: def test_load_simple_conversation(self, create_jsonl_file, default_user_config): """Test loading a simple multi-turn conversation.""" content = [ - json.dumps( + msgspec.json.encode( { "session_id": "conv_001", "turns": [ @@ -157,7 +156,7 @@ def test_load_simple_conversation(self, create_jsonl_file, default_user_config): def test_load_multiple_conversations(self, create_jsonl_file, default_user_config): """Test loading multiple conversations from file.""" content = [ - json.dumps( + msgspec.json.encode( { "session_id": "session_A", "turns": [ @@ -165,7 +164,7 @@ def test_load_multiple_conversations(self, create_jsonl_file, default_user_confi ], } ), - json.dumps( + msgspec.json.encode( { "session_id": "session_B", "turns": [ @@ -193,7 +192,7 @@ def test_load_conversation_without_session_id( ): """Test loading conversation without explicit session_id generates UUID.""" content = [ - json.dumps( + msgspec.json.encode( { "turns": [ {"text": "Anonymous conversation"}, @@ -221,7 +220,7 @@ def test_load_conversation_without_session_id( def test_load_multimodal_conversation(self, create_jsonl_file, default_user_config): """Test loading conversation with multimodal content.""" content = [ - json.dumps( + msgspec.json.encode( { "session_id": "multimodal_chat", "turns": [ @@ -259,7 +258,7 @@ def test_load_multimodal_conversation(self, create_jsonl_file, default_user_conf def test_load_scheduled_conversation(self, create_jsonl_file, default_user_config): """Test loading conversation with timestamp scheduling.""" content = [ - json.dumps( + msgspec.json.encode( { "session_id": "scheduled_chat", "turns": [ @@ -284,7 +283,7 @@ def test_load_scheduled_conversation(self, create_jsonl_file, default_user_confi def test_load_batched_conversation(self, create_jsonl_file, default_user_config): """Test loading conversation with batched content.""" content = [ - json.dumps( + msgspec.json.encode( { "session_id": "batched_chat", "turns": [ @@ -320,7 +319,7 @@ def test_load_full_featured_conversation( ): """Test loading conversation with full-featured format.""" content = [ - json.dumps( + msgspec.json.encode( { "session_id": "full_featured_chat", "turns": [ @@ -371,14 +370,14 @@ def test_load_dataset_skips_empty_lines( ): """Test that empty lines are skipped during loading.""" content = [ - json.dumps( + msgspec.json.encode( { "session_id": "test_empty_lines", "turns": [{"text": "First"}], } ), "", # Empty line - json.dumps( + msgspec.json.encode( { "session_id": "test_empty_lines_2", "turns": [{"text": "Second"}], @@ -401,13 +400,13 @@ def test_load_duplicate_session_ids_are_grouped( ): """Test that multiple conversations with same session_id are grouped together.""" content = [ - json.dumps( + msgspec.json.encode( { "session_id": "shared_session", "turns": [{"text": "First conversation"}], } ), - json.dumps( + msgspec.json.encode( { "session_id": "shared_session", "turns": [{"text": "Second conversation"}], diff --git a/tests/unit/dataset/loader/test_single_turn.py b/tests/unit/dataset/loader/test_single_turn.py index 1329ddd83..df88df8a3 100644 --- a/tests/unit/dataset/loader/test_single_turn.py +++ b/tests/unit/dataset/loader/test_single_turn.py @@ -1,8 +1,7 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 -import json - +import msgspec import pytest from aiperf.common.enums import CustomDatasetType @@ -256,7 +255,7 @@ def test_load_dataset_with_full_featured_version( """Test loading dataset with full-featured version.""" content = [ - json.dumps( + msgspec.json.encode( { "texts": [ {"name": "text_field_A", "contents": ["Hello", "World"]}, diff --git a/tests/unit/dataset/test_dataset_manager_inputs_json.py b/tests/unit/dataset/test_dataset_manager_inputs_json.py index 30dc47860..ce943181f 100644 --- a/tests/unit/dataset/test_dataset_manager_inputs_json.py +++ b/tests/unit/dataset/test_dataset_manager_inputs_json.py @@ -4,11 +4,11 @@ Unit tests for DatasetManager._generate_inputs_json_file method. """ -import json import logging from pathlib import Path from unittest.mock import Mock, patch +import msgspec import pytest from aiperf.common.config.config_defaults import OutputDefaults @@ -52,7 +52,7 @@ async def test_generate_inputs_json_success_with_populated_dataset( """Test comprehensive successful generation with populated dataset.""" await populated_dataset_manager._generate_inputs_json_file() - written_json = json.loads(capture_file_writes.written_content) + written_json = msgspec.json.decode(capture_file_writes.written_content) _validate_inputs_file_structure(written_json) # Verify specific dataset content @@ -80,7 +80,7 @@ async def test_generate_inputs_json_empty_dataset( """Test generation with empty dataset creates empty inputs file.""" await empty_dataset_manager._generate_inputs_json_file() - written_json = json.loads(capture_file_writes.written_content) + written_json = msgspec.json.decode(capture_file_writes.written_content) assert written_json == {"data": []} @pytest.mark.asyncio @@ -98,7 +98,7 @@ async def test_generate_inputs_json_file_path_and_io( assert expected_path.exists() with open(expected_path) as f: - content = json.load(f) + content = msgspec.json.decode(f.read()) _validate_inputs_file_structure(content) @pytest.mark.asyncio @@ -110,7 +110,7 @@ async def test_generate_inputs_json_session_order_preservation( """Test that sessions are preserved in dataset iteration order.""" await populated_dataset_manager._generate_inputs_json_file() - written_json = json.loads(capture_file_writes.written_content) + written_json = msgspec.json.decode(capture_file_writes.written_content) session_ids = [session["session_id"] for session in written_json["data"]] expected_order = list(populated_dataset_manager.dataset.keys()) assert session_ids == expected_order @@ -124,7 +124,7 @@ async def test_generate_inputs_json_custom_field_preservation( """Test that custom fields like max_completion_tokens are preserved.""" await populated_dataset_manager._generate_inputs_json_file() - written_json = json.loads(capture_file_writes.written_content) + written_json = msgspec.json.decode(capture_file_writes.written_content) session_2 = next( session for session in written_json["data"] @@ -144,7 +144,7 @@ async def test_generate_inputs_json_pydantic_model_compatibility( """Test that generated content is compatible with InputsFile Pydantic model.""" await populated_dataset_manager._generate_inputs_json_file() - written_json = json.loads(capture_file_writes.written_content) + written_json = msgspec.json.decode(capture_file_writes.written_content) inputs_file = InputsFile.model_validate(written_json) assert isinstance(inputs_file, InputsFile) diff --git a/tests/unit/exporters/test_metrics_json_exporter.py b/tests/unit/exporters/test_metrics_json_exporter.py index ef440cf94..87c00de65 100644 --- a/tests/unit/exporters/test_metrics_json_exporter.py +++ b/tests/unit/exporters/test_metrics_json_exporter.py @@ -1,11 +1,11 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 -import json import tempfile from pathlib import Path from unittest.mock import patch +import msgspec import pytest from aiperf.common.config import EndpointConfig, ServiceConfig, UserConfig @@ -213,7 +213,7 @@ def mock_convert(metrics, reg): content = exporter._generate_content() # Should contain data from instance members - data = json.loads(content) + data = msgspec.json.decode(content) assert "input_config" in data def test_generate_content_uses_telemetry_results_from_instance( @@ -244,7 +244,7 @@ def mock_convert(metrics, reg): content = exporter._generate_content() # Should contain telemetry data - data = json.loads(content) + data = msgspec.json.decode(content) assert "telemetry_data" in data @pytest.mark.asyncio @@ -313,7 +313,7 @@ async def test_json_export_with_telemetry_data( assert expected_file.exists() with open(expected_file) as f: - data = json.load(f) + data = msgspec.json.decode(f.read()) # Verify telemetry_data exists assert "telemetry_data" in data @@ -357,7 +357,7 @@ async def test_json_export_without_telemetry_data( assert expected_file.exists() with open(expected_file) as f: - data = json.load(f) + data = msgspec.json.decode(f.read()) # telemetry_data should not be present or be null assert "telemetry_data" not in data or data.get("telemetry_data") is None @@ -383,7 +383,7 @@ async def test_json_export_telemetry_structure( expected_file = output_dir / OutputDefaults.PROFILE_EXPORT_AIPERF_JSON_FILE with open(expected_file) as f: - data = json.load(f) + data = msgspec.json.decode(f.read()) endpoints = data["telemetry_data"]["endpoints"] # Get first GPU from first endpoint @@ -466,7 +466,7 @@ async def test_json_export_telemetry_exception_handling( assert expected_file.exists() with open(expected_file) as f: - data = json.load(f) + data = msgspec.json.decode(f.read()) # Should still have telemetry structure even if metrics fail assert "telemetry_data" in data @@ -522,7 +522,7 @@ async def test_json_export_telemetry_with_none_values( expected_file = output_dir / OutputDefaults.PROFILE_EXPORT_AIPERF_JSON_FILE with open(expected_file) as f: - data = json.load(f) + data = msgspec.json.decode(f.read()) # Should handle None values gracefully assert "telemetry_data" in data @@ -559,7 +559,7 @@ async def test_json_export_telemetry_empty_hierarchy( expected_file = output_dir / OutputDefaults.PROFILE_EXPORT_AIPERF_JSON_FILE with open(expected_file) as f: - data = json.load(f) + data = msgspec.json.decode(f.read()) # Should have telemetry_data section but empty assert "telemetry_data" in data @@ -618,7 +618,7 @@ async def test_json_export_telemetry_endpoint_normalization( expected_file = output_dir / OutputDefaults.PROFILE_EXPORT_AIPERF_JSON_FILE with open(expected_file) as f: - data = json.load(f) + data = msgspec.json.decode(f.read()) endpoints = data["telemetry_data"]["endpoints"] # Check that endpoint was normalized (removed http:// and /metrics) @@ -701,7 +701,7 @@ async def test_json_export_telemetry_multi_endpoint( expected_file = output_dir / OutputDefaults.PROFILE_EXPORT_AIPERF_JSON_FILE with open(expected_file) as f: - data = json.load(f) + data = msgspec.json.decode(f.read()) endpoints = data["telemetry_data"]["endpoints"] # Should have both endpoints @@ -765,7 +765,7 @@ async def test_json_export_with_hostname_metadata( expected_file = output_dir / OutputDefaults.PROFILE_EXPORT_AIPERF_JSON_FILE with open(expected_file) as f: - data = json.load(f) + data = msgspec.json.decode(f.read()) endpoints = data["telemetry_data"]["endpoints"] gpu_summary = endpoints["localhost:9400"]["gpus"]["gpu_0"] diff --git a/tests/unit/exporters/test_timeslice_metrics_json_exporter.py b/tests/unit/exporters/test_timeslice_metrics_json_exporter.py index 8c435b688..ada1a1876 100644 --- a/tests/unit/exporters/test_timeslice_metrics_json_exporter.py +++ b/tests/unit/exporters/test_timeslice_metrics_json_exporter.py @@ -3,11 +3,11 @@ """Tests for TimesliceMetricsJsonExporter.""" -import json import tempfile from pathlib import Path from unittest.mock import patch +import msgspec import pytest from aiperf.common.config import EndpointConfig, ServiceConfig, UserConfig @@ -246,7 +246,7 @@ def mock_convert(metrics, reg): ): content = exporter._generate_content() - data = json.loads(content) + data = msgspec.json.decode(content) assert "timeslices" in data assert "input_config" in data @@ -292,7 +292,7 @@ def mock_convert(metrics, reg): ): content = exporter._generate_content() - data = json.loads(content) + data = msgspec.json.decode(content) indices = [ts["timeslice_index"] for ts in data["timeslices"]] assert indices == [0, 1, 2] @@ -349,7 +349,7 @@ def mock_convert(metrics, reg): ): content = exporter._generate_content() - data = json.loads(content) + data = msgspec.json.decode(content) timeslice_0 = data["timeslices"][0] assert "time_to_first_token" in timeslice_0 @@ -403,7 +403,7 @@ def mock_convert(metrics, reg): ): content = exporter._generate_content() - data = json.loads(content) + data = msgspec.json.decode(content) metric_data = data["timeslices"][0]["metric"] assert "unit" in metric_data @@ -457,7 +457,7 @@ def mock_convert(metrics, reg): ): content = exporter._generate_content() - data = json.loads(content) + data = msgspec.json.decode(content) ts0 = data["timeslices"][0] ts1 = data["timeslices"][1] @@ -549,7 +549,7 @@ def mock_convert(metrics, reg): # Verify it's valid JSON with open(exporter._file_path) as f: - data = json.load(f) + data = msgspec.json.decode(f.read()) assert "timeslices" in data assert "input_config" in data @@ -631,6 +631,6 @@ def mock_convert(metrics, reg): await exporter.export() with open(exporter._file_path) as f: - data = json.load(f) + data = msgspec.json.decode(f.read()) assert len(data["timeslices"]) == 50 diff --git a/tests/unit/post_processors/test_raw_record_writer_processor.py b/tests/unit/post_processors/test_raw_record_writer_processor.py index 51fb43de2..72856c216 100644 --- a/tests/unit/post_processors/test_raw_record_writer_processor.py +++ b/tests/unit/post_processors/test_raw_record_writer_processor.py @@ -1,7 +1,7 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 -import orjson +import msgspec import pytest from aiperf.common.config import UserConfig @@ -103,7 +103,7 @@ async def test_process_record_writes_valid_data( lines = processor.output_file.read_text().splitlines() assert len(lines) == 1 - record_dict = orjson.loads(lines[0]) + record_dict = msgspec.json.decode(lines[0]) record = RawRecordInfo.model_validate(record_dict) assert record.metadata.conversation_id == "conv-123" @@ -130,7 +130,9 @@ async def test_process_record_with_error( await processor.process_record(error_parsed_record, metadata) - record_dict = orjson.loads(processor.output_file.read_text().splitlines()[0]) + record_dict = msgspec.json.decode( + processor.output_file.read_text().splitlines()[0] + ) record = RawRecordInfo.model_validate(record_dict) assert record.metadata.conversation_id == "conv-error" @@ -161,7 +163,7 @@ async def test_process_multiple_records( assert len(lines) == 5 for i, line in enumerate(lines): - record = RawRecordInfo.model_validate(orjson.loads(line)) + record = RawRecordInfo.model_validate(msgspec.json.decode(line)) assert record.metadata.session_num == i assert record.metadata.conversation_id == f"conv-{i}" assert record.metadata.x_request_id == f"req-{i}" @@ -191,7 +193,7 @@ async def test_output_is_valid_jsonl_and_record_structure( line = lines[0] # Verify format: valid JSON and ends with newline - record_dict = orjson.loads(line) + record_dict = msgspec.json.decode(line) assert isinstance(record_dict, dict) # Verify structure diff --git a/tests/unit/post_processors/test_record_export_results_processor.py b/tests/unit/post_processors/test_record_export_results_processor.py index 8593ed582..cdcd84d67 100644 --- a/tests/unit/post_processors/test_record_export_results_processor.py +++ b/tests/unit/post_processors/test_record_export_results_processor.py @@ -5,7 +5,7 @@ from pathlib import Path from unittest.mock import Mock, patch -import orjson +import msgspec import pytest from aiperf.common.config import ( @@ -228,7 +228,7 @@ async def test_process_result_writes_valid_data( lines = processor.output_file.read_text().splitlines() assert len(lines) == 1 - record_dict = orjson.loads(lines[0]) + record_dict = msgspec.json.decode(lines[0]) record = MetricRecordInfo.model_validate(record_dict) assert record.metadata.x_request_id == "test-record-123" assert record.metadata.conversation_id == "conv-456" @@ -338,7 +338,7 @@ async def test_process_result_multiple_messages( assert len(lines) == 5 for line in lines: - record_dict = orjson.loads(line) + record_dict = msgspec.json.decode(line) record = MetricRecordInfo.model_validate(record_dict) assert isinstance(record, MetricRecordInfo) assert record.metadata.x_request_id.startswith("record-") # type: ignore[union-attr] @@ -375,7 +375,7 @@ async def test_output_is_valid_jsonl( for line in lines: if line.strip(): - record_dict = orjson.loads(line) + record_dict = msgspec.json.decode(line) assert isinstance(record_dict, dict) record = MetricRecordInfo.model_validate(record_dict) assert isinstance(record, MetricRecordInfo) @@ -406,7 +406,7 @@ async def test_record_structure_is_complete( lines = processor.output_file.read_text().splitlines() for line in lines: - record_dict = orjson.loads(line) + record_dict = msgspec.json.decode(line) record = MetricRecordInfo.model_validate(record_dict) assert isinstance(record.metadata, MetricRecordMetadata) diff --git a/tests/unit/post_processors/test_telemetry_export_results_processor.py b/tests/unit/post_processors/test_telemetry_export_results_processor.py index c348831fa..dbca3ea11 100644 --- a/tests/unit/post_processors/test_telemetry_export_results_processor.py +++ b/tests/unit/post_processors/test_telemetry_export_results_processor.py @@ -5,7 +5,7 @@ from pathlib import Path from unittest.mock import AsyncMock, Mock, patch -import orjson +import msgspec import pytest from aiperf.common.config import ( @@ -228,7 +228,7 @@ async def test_process_telemetry_record_with_complete_data( await processor.process_telemetry_record(sample_telemetry_record) lines = processor.output_file.read_text().splitlines() - record_dict = orjson.loads(lines[0]) + record_dict = msgspec.json.decode(lines[0]) record = TelemetryRecord.model_validate(record_dict) assert record.timestamp_ns == 1_000_000_000 @@ -258,7 +258,7 @@ async def test_process_telemetry_record_with_partial_data( await processor.process_telemetry_record(sample_telemetry_record_partial) lines = processor.output_file.read_text().splitlines() - record_dict = orjson.loads(lines[0]) + record_dict = msgspec.json.decode(lines[0]) record = TelemetryRecord.model_validate(record_dict) assert record.timestamp_ns == 2_000_000_000 @@ -405,7 +405,7 @@ async def test_multiple_gpus_same_endpoint( lines = processor.output_file.read_text().splitlines() # Verify each GPU is represented - gpu_indices = [orjson.loads(line)["gpu_index"] for line in lines] + gpu_indices = [msgspec.json.decode(line)["gpu_index"] for line in lines] assert gpu_indices == [0, 1, 2, 3] @pytest.mark.asyncio @@ -440,7 +440,7 @@ async def test_multiple_endpoints( await processor.process_telemetry_record(record) lines = processor.output_file.read_text().splitlines() - dcgm_urls = [orjson.loads(line)["dcgm_url"] for line in lines] + dcgm_urls = [msgspec.json.decode(line)["dcgm_url"] for line in lines] assert "http://node0:9401/metrics" in dcgm_urls assert "http://node1:9401/metrics" in dcgm_urls @@ -505,7 +505,7 @@ async def test_output_is_valid_jsonl( for line in lines: if line.strip(): - record_dict = orjson.loads(line) + record_dict = msgspec.json.decode(line) assert isinstance(record_dict, dict) record = TelemetryRecord.model_validate(record_dict) assert isinstance(record, TelemetryRecord) @@ -531,7 +531,7 @@ async def test_record_structure_is_complete( lines = processor.output_file.read_text().splitlines() for line in lines: - record_dict = orjson.loads(line) + record_dict = msgspec.json.decode(line) record = TelemetryRecord.model_validate(record_dict) assert isinstance(record.timestamp_ns, int) @@ -560,7 +560,7 @@ async def test_preserves_all_telemetry_fields( await processor.process_telemetry_record(sample_telemetry_record) lines = processor.output_file.read_text().splitlines() - record_dict = orjson.loads(lines[0]) + record_dict = msgspec.json.decode(lines[0]) record = TelemetryRecord.model_validate(record_dict) # Check all metadata fields @@ -606,7 +606,7 @@ async def test_handles_none_values( await processor.process_telemetry_record(sample_telemetry_record_partial) lines = processor.output_file.read_text().splitlines() - record_dict = orjson.loads(lines[0]) + record_dict = msgspec.json.decode(lines[0]) # Verify None values are not present in the dict assert "pci_bus_id" not in record_dict @@ -642,7 +642,7 @@ async def test_timestamp_precision( await processor.process_telemetry_record(record) lines = processor.output_file.read_text().splitlines() - record_dict = orjson.loads(lines[0]) + record_dict = msgspec.json.decode(lines[0]) assert record_dict["timestamp_ns"] == precise_timestamp @@ -665,7 +665,7 @@ async def test_metadata_fields_present( await processor.process_telemetry_record(sample_telemetry_record) lines = processor.output_file.read_text().splitlines() - record_dict = orjson.loads(lines[0]) + record_dict = msgspec.json.decode(lines[0]) # Check all required metadata fields assert "timestamp_ns" in record_dict @@ -695,7 +695,7 @@ async def test_hierarchical_identifiers( await processor.process_telemetry_record(sample_telemetry_record) lines = processor.output_file.read_text().splitlines() - record_dict = orjson.loads(lines[0]) + record_dict = msgspec.json.decode(lines[0]) # Verify hierarchical keys are present assert record_dict["dcgm_url"] == "http://node1:9401/metrics" @@ -973,7 +973,7 @@ async def test_integration_with_real_files( # Verify parseable lines = content.splitlines() - record_dict = orjson.loads(lines[0]) + record_dict = msgspec.json.decode(lines[0]) record = TelemetryRecord.model_validate(record_dict) assert record.gpu_uuid == sample_telemetry_record.gpu_uuid @@ -1089,7 +1089,7 @@ async def test_interleaved_gpu_records( # Verify records are in order lines = processor.output_file.read_text().splitlines() - timestamps = [orjson.loads(line)["timestamp_ns"] for line in lines] + timestamps = [msgspec.json.decode(line)["timestamp_ns"] for line in lines] assert timestamps == sorted(timestamps) diff --git a/tests/unit/server/test_config.py b/tests/unit/server/test_config.py index b83dce730..7d0358358 100644 --- a/tests/unit/server/test_config.py +++ b/tests/unit/server/test_config.py @@ -108,7 +108,7 @@ def test_serialize_env_value(self, value, expected): def test_serialize_env_value_list(self): result = _serialize_env_value(["a", "b"]) - assert result == '["a", "b"]' + assert result == '["a","b"]' def test_serialize_env_value_dict(self): result = _serialize_env_value({"key": "value"}) diff --git a/tests/unit/transports/test_aiohttp_client.py b/tests/unit/transports/test_aiohttp_client.py index 29bf0a219..914bc3a61 100644 --- a/tests/unit/transports/test_aiohttp_client.py +++ b/tests/unit/transports/test_aiohttp_client.py @@ -3,10 +3,10 @@ """Comprehensive unit tests for aiohttp client components.""" import asyncio -import json from unittest.mock import AsyncMock, Mock, patch import aiohttp +import msgspec import pytest from aiperf.common.enums import SSEEventType, SSEFieldType @@ -323,12 +323,14 @@ async def test_end_to_end_json_request( test_response = {"message": "success", "data": [1, 2, 3]} with patch("aiohttp.ClientSession") as mock_session_class: - mock_response = create_mock_response(text_content=json.dumps(test_response)) + mock_response = create_mock_response( + text_content=msgspec.json.encode(test_response).decode("utf-8") + ) setup_mock_session(mock_session_class, mock_response, ["request"]) record = await aiohttp_client.post_request( "http://test.com/api", - json.dumps({"query": "test"}), + msgspec.json.encode({"query": "test"}).decode("utf-8"), {"Content-Type": "application/json"}, ) @@ -352,7 +354,7 @@ async def mock_content_iter(): with patch("time.perf_counter_ns", side_effect=range(123456789, 123456799)): record = await aiohttp_client.post_request( "http://test.com/stream", - json.dumps({"stream": True}), + msgspec.json.encode({"stream": True}).decode("utf-8"), {"Accept": "text/event-stream"}, ) diff --git a/tests/unit/transports/test_aiohttp_transport.py b/tests/unit/transports/test_aiohttp_transport.py index d0ffddd62..3583ee99e 100644 --- a/tests/unit/transports/test_aiohttp_transport.py +++ b/tests/unit/transports/test_aiohttp_transport.py @@ -219,10 +219,10 @@ async def test_send_request_builds_correct_headers( assert headers["Accept"] == "application/json" @pytest.mark.asyncio - async def test_send_request_serializes_payload_with_orjson( + async def test_send_request_serializes_payload_with_msgspec( self, transport, model_endpoint_non_streaming ): - """Test that payload is serialized using orjson.""" + """Test that payload is serialized using msgspec.""" await self._setup_initialized_transport_with_mock(transport) request_info = self._create_request_info(model_endpoint_non_streaming)