Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ except Exception as e:

**Logging**: Lambda for expensive logs: `self.debug(lambda: f"{len(self._x())}")`. Direct string: `self.info("Starting")`.

**JSON**: Always orjson: `orjson.loads(s)`, `orjson.dumps(d)`
**JSON**: Always msgspec: `msgspec.json.decode(s)`, `msgspec.json.encode(d)`

## Mixins & Base Classes

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ dependencies = [
"ffmpeg-python~=0.2.0",
"jinja2~=3.1.5", # NOTE: Versions prior to 3.1.5 have vuln exploits
"jmespath~=1.0.1",
"msgspec~=0.20.0",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Verify msgspec version compatibility.

The addition of msgspec~=0.20.0 looks good. Ensure this version is compatible with Python 3.10+ and provides the required JSON encoding/decoding APIs used throughout the codebase.


🌐 Web query:

What is the latest stable version of msgspec and does version 0.20.0 support Python 3.10+?

💡 Result:

  • The official msgspec latest stable release is 0.19.0 (published Dec 27, 2024). [1]

  • msgspec 0.19.0 provides wheels for CPython 3.10 (and newer CPython versions listed on the release files), so the official msgspec release line supports Python 3.10+. [1]

  • There is a community fork named msgspec-x which has a 0.20.0 release (Jun 21, 2025). That fork’s 0.20.0 lists support for CPython 3.10+ in its PyPI release metadata. If you meant the official project, 0.20.0 is not an official msgspec release; 0.19.0 is the latest official stable. [2]

Sources:
[1] msgspec on PyPI — release files and metadata.
[2] msgspec-x on PyPI (version 0.20.0).


I need to check the codebase to determine which msgspec package is intended (official or the fork).


🏁 Script executed:

#!/bin/bash
# Search for msgspec imports and usage in codebase
rg -n "msgspec|from msgspec|import msgspec" --type py -C 3

Length of output: 77213


🏁 Script executed:

#!/bin/bash
# Check pyproject.toml for any comments or context about msgspec
cat pyproject.toml | head -50

Length of output: 1241


🏁 Script executed:

#!/bin/bash
# Check if msgspec-x is mentioned anywhere
rg -n "msgspec-x|msgspec_x" --type py --type toml --type txt

Length of output: 0


Fix invalid msgspec version constraint that will cause installation failures.

Official msgspec's latest stable version is 0.19.0, not 0.20.0. The current constraint msgspec~=0.20.0 cannot be satisfied by the official package, causing pip install to fail. Version 0.19.0 supports Python 3.10+ as required by your project. Either downgrade to msgspec~=0.19.0 or explicitly specify the msgspec-x fork if version 0.20.0 is intentional.

🤖 Prompt for AI Agents
In pyproject.toml around line 33, the dependency constraint "msgspec~=0.20.0" is
invalid (official latest is 0.19.0) and will break installation; update the
entry to "msgspec~=0.19.0" to match the published package, or if you
intentionally require the 0.20.0 fork, replace the dependency with the correct
fork package name and version (e.g., specify the msgspec-x fork and its
compatible version) so pip can resolve and install it successfully.

"numpy~=1.26.4",
"orjson~=3.10.18",
"pillow~=11.1.0",
"prometheus_client~=0.23.1",
"psutil~=7.0.0",
Expand Down
4 changes: 2 additions & 2 deletions src/aiperf/common/config/config_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pathlib import Path
from typing import Any

import orjson
import msgspec

from aiperf.common.enums.service_enums import ServiceType
from aiperf.common.utils import load_json_str
Expand Down Expand Up @@ -151,7 +151,7 @@ def parse_str_or_dict_as_tuple_list(input: Any | None) -> list[tuple[str, Any]]
if input.startswith("{"):
try:
return [(key, value) for key, value in load_json_str(input).items()]
except orjson.JSONDecodeError as e:
except msgspec.DecodeError as e:
raise ValueError(
f"User Config: {input} - must be a valid JSON string"
) from e
Expand Down
4 changes: 2 additions & 2 deletions src/aiperf/common/config/user_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pathlib import Path
from typing import Annotated, Any

from orjson import JSONDecodeError
import msgspec
from pydantic import BeforeValidator, Field, model_validator
from typing_extensions import Self

Expand Down Expand Up @@ -141,7 +141,7 @@ def _should_use_fixed_schedule_for_mooncake_trace(self) -> bool:
try:
data = load_json_str(line)
return "timestamp" in data and data["timestamp"] is not None
except (JSONDecodeError, KeyError):
except (msgspec.DecodeError, KeyError):
continue
except (OSError, FileNotFoundError):
_logger.warning(
Expand Down
10 changes: 5 additions & 5 deletions src/aiperf/common/messages/base_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import time
from typing import ClassVar

import orjson
import msgspec
from pydantic import Field

from aiperf.common.enums.message_enums import MessageType
Expand Down Expand Up @@ -44,10 +44,10 @@ def __str__(self) -> str:
return self.model_dump_json(exclude_none=True)

def to_json_bytes(self) -> bytes:
"""Serialize message to JSON bytes using orjson for optimal performance.
"""Serialize message to JSON bytes using msgspec for optimal performance.

This method uses orjson for high-performance serialization (6x faster for
large records >20KB). It automatically excludes None fields to minimize
This method uses msgspec for high-performance serialization (ultra-fast
serialization for large records). It automatically excludes None fields to minimize
message size.

Returns:
Expand All @@ -57,7 +57,7 @@ def to_json_bytes(self) -> bytes:
Prefer this method over model_dump_json() for ZMQ message passing
and other high-throughput scenarios.
"""
return orjson.dumps(self.model_dump(exclude_none=True, mode="json"))
return msgspec.json.encode(self.model_dump(exclude_none=True, mode="json"))


class RequiresRequestNSMixin(Message):
Expand Down
16 changes: 9 additions & 7 deletions src/aiperf/common/mixins/buffered_jsonl_writer_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from typing import Generic

import aiofiles
import orjson
import msgspec

from aiperf.common.environment import Environment
from aiperf.common.hooks import on_init, on_stop
Expand Down Expand Up @@ -57,28 +57,30 @@ def __init__(
async def _open_file(self) -> None:
"""Open the file handle for writing in binary mode (called automatically on initialization)."""
async with self._file_lock:
# Binary mode for optimal performance with orjson
# Binary mode for optimal performance with msgspec
self._file_handle = await aiofiles.open(self.output_file, mode="wb")

async def buffered_write(self, record: BaseModelT) -> None:
"""Write a Pydantic model to the buffer with automatic flushing.

This method serializes the provided Pydantic model to JSON bytes using orjson
This method serializes the provided Pydantic model to JSON bytes using msgspec
and adds it to the internal buffer. If the buffer reaches the configured batch
size, it automatically flushes the buffer to disk.

Uses binary mode with orjson for optimal performance:
- 6x faster for large records (>20KB)
Uses binary mode with msgspec for optimal performance:
- Ultra-fast serialization for large records
- No encode/decode overhead
- Efficient for all record sizes

Args:
record: A Pydantic BaseModel instance to write
"""
try:
# Serialize to bytes using orjson (faster for large records)
# Serialize to bytes using msgspec (faster for large records)
# Use exclude_none=True to omit None fields (smaller output)
json_bytes = orjson.dumps(record.model_dump(exclude_none=True, mode="json"))
json_bytes = msgspec.json.encode(
record.model_dump(exclude_none=True, mode="json")
)

buffer_to_flush = None
async with self._buffer_lock:
Expand Down
6 changes: 3 additions & 3 deletions src/aiperf/common/models/record_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from functools import cached_property
from typing import Any, AnyStr

import orjson
import msgspec
from pydantic import (
Field,
RootModel,
Expand Down Expand Up @@ -250,7 +250,7 @@ def get_json(self) -> JsonObject | None:
if not self.text:
return None
return load_json_str(self.text)
except orjson.JSONDecodeError:
except msgspec.DecodeError:
return None


Expand Down Expand Up @@ -347,7 +347,7 @@ def get_json(self) -> JsonObject | None:
if data_content in ("", None, "[DONE]"):
return None
return load_json_str(data_content)
except orjson.JSONDecodeError:
except msgspec.DecodeError:
return None


Expand Down
4 changes: 2 additions & 2 deletions src/aiperf/common/models/sequence_distribution.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import re
from dataclasses import dataclass

import msgspec
import numpy as np
import orjson

from aiperf.common import random_generator as rng
from aiperf.common.aiperf_logger import AIPerfLogger
Expand Down Expand Up @@ -362,7 +362,7 @@ def _parse_pairs_from_json(cls, json_str: str) -> list[SequenceLengthPair]:
"""Parse JSON format and extract pairs: {"pairs": [{"isl": 256, "isl_stddev": 10, "osl": 128, "osl_stddev": 5, "prob": 40}, ...]}"""
try:
data = load_json_str(json_str)
except orjson.JSONDecodeError as e:
except msgspec.DecodeError as e:
raise ValueError(f"Invalid JSON format: {e}") from e

if "pairs" not in data:
Expand Down
10 changes: 4 additions & 6 deletions src/aiperf/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from collections.abc import Callable
from typing import Any

import orjson
import msgspec
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

Fix bytes/str mixing in load_json_str error logging and update JSON backend guidance

When json_str is bytes and decoding fails, the except block does:

snippet = json_str[:200] + ("..." if len(json_str) > 200 else "")

For bytes input this becomes bytes + str, which raises TypeError inside the except handler and masks the original msgspec.DecodeError. That can surprise callers that now expect msgspec.DecodeError on parse failure.

You can keep the msgspec backend but make the snippet construction bytes‑safe like this:

 def load_json_str(
     json_str: str | bytes, func: Callable = lambda x: x
 ) -> dict[str, Any]:
@@
-    try:
-        # Note: msgspec is strict on UTF-8 conformance and provides fast JSON parsing
-        return func(msgspec.json.decode(json_str))
-    except msgspec.DecodeError as e:
-        snippet = json_str[:200] + ("..." if len(json_str) > 200 else "")
-        _logger.exception(f"Failed to parse JSON string: '{snippet}' - {e!r}")
-        raise
+    try:
+        # Note: msgspec is strict on UTF-8 conformance and provides fast JSON parsing
+        return func(msgspec.json.decode(json_str))
+    except msgspec.DecodeError as e:
+        if isinstance(json_str, bytes):
+            snippet = json_str[:200].decode("utf-8", errors="replace")
+        else:
+            snippet = json_str[:200]
+        if len(json_str) > 200:
+            snippet += "..."
+        _logger.exception(f"Failed to parse JSON string: '{snippet}' - {e!r}")
+        raise

Separately, the current CLAUDE/guidelines still say “Always use orjson for JSON operations”, but this helper (and the PR as a whole) intentionally standardizes on msgspec. It would be good to update those docs to avoid future confusion. Based on learnings, this keeps the documented guidance in sync with the new implementation.

Also applies to: 79-99


🏁 Script executed:

#!/bin/bash
# Inspect load_json_str implementation to verify bytes/str mixing issue
cat src/aiperf/common/utils.py | sed -n '1,120p'

Length of output: 4041


🏁 Script executed:

#!/bin/bash
# Check CLAUDE.md for orjson guidance that may be outdated
fd -e md --type f | head -20 | xargs grep -l "orjson" 2>/dev/null || echo "No orjson references found in md files"

Length of output: 22


🏁 Script executed:

#!/bin/bash
# Verify msgspec API: check if msgspec.json.decode and msgspec.DecodeError are valid
python3 << 'EOF'
import msgspec
# Check if msgspec.json.decode exists
print("msgspec.json.decode exists:", hasattr(msgspec.json, 'decode'))
# Check if msgspec.DecodeError exists
print("msgspec.DecodeError exists:", hasattr(msgspec, 'DecodeError'))
# Test bytes + str behavior
test_bytes = b"test"[:3]
print(f"Type of bytes[:3]: {type(test_bytes)}")
try:
    result = test_bytes + "..."
except TypeError as e:
    print(f"bytes + str raises TypeError: {e}")
EOF

Length of output: 119


[/run_scripts]

[/web_search]


🏁 Script executed:

#!/bin/bash
# Check ATTRIBUTIONS-Python.md and other docs for orjson and messaging guidance
grep -r "orjson\|msgspec" --include="*.md" .

Length of output: 375


🏁 Script executed:

#!/bin/bash
# Look for coding guidelines or CLAUDE files mentioning JSON
fd -e md --type f -name "*CLAUDE*" -o -name "*guideline*" -o -name "*coding*" | xargs cat 2>/dev/null | head -100

Length of output: 230


🌐 Web query:

msgspec.json.decode and msgspec.DecodeError Python API documentation

💡 Result:

Here’s the short API info and examples for msgspec.json.decode and the DecodeError/ValidationError behavior.

  • msgspec.json.decode(buf, *, type: Optional[type] = None, strict: bool = True, dec_hook: Optional[callable] = None) -> Any

    • buf: bytes-like or str containing the JSON to decode.
    • type: optional Python type (annotation form) to decode/validate into; if omitted decodes to default JSON->Python types.
    • strict: when False, allows wider coercions from strings to non-string types.
    • dec_hook: optional callback dec_hook(type, obj) -> Any for custom-type construction.
    • Returns the deserialized Python object. [1][2]
  • Errors:

    • msgspec.DecodeError / msgspec.ValidationError (raised on invalid JSON or when decoded data doesn’t match the requested type). The library raises a DecodeError (or ValidationError for type-validation failures) with a message describing the problem and a JSON path (e.g. "$.groups[0]"). Example: decoding b'{"name":"bob","groups":[123]}' as a type expecting str in groups yields an error like "Expected str, got int - at $.groups[0]." [2][3]
  • Example

    • encode: msg = msgspec.json.encode(obj)
    • decode without type: msgspec.json.decode(msg) -> dict/list/...
    • decode with type: msgspec.json.decode(msg, type=User) -> User (or raises ValidationError). [2][3]

Sources:

  • msgspec API docs (json.decode parameters & description). [1]
  • msgspec usage and examples (encoding/decoding, typed decoding, error examples). [2][3]

References
[1] msgspec API docs — JSON (decode).
[2] msgspec Usage / Examples (encode/decode, typed decoding).
[3] msgspec GitHub / README (examples showing DecodeError/ValidationError messages).


Fix bytes/str mixing in load_json_str error logging

When json_str is bytes and decoding fails, the except block does:

snippet = json_str[:200] + ("..." if len(json_str) > 200 else "")

For bytes input this becomes bytes + str, which raises TypeError inside the except handler and masks the original msgspec.DecodeError. That can surprise callers that expect msgspec.DecodeError on parse failure.

Fix by normalizing snippet to str:

 def load_json_str(
     json_str: str | bytes, func: Callable = lambda x: x
 ) -> dict[str, Any]:
@@
-    try:
-        # Note: msgspec is strict on UTF-8 conformance and provides fast JSON parsing
-        return func(msgspec.json.decode(json_str))
-    except msgspec.DecodeError as e:
-        snippet = json_str[:200] + ("..." if len(json_str) > 200 else "")
-        _logger.exception(f"Failed to parse JSON string: '{snippet}' - {e!r}")
-        raise
+    try:
+        # Note: msgspec is strict on UTF-8 conformance and provides fast JSON parsing
+        return func(msgspec.json.decode(json_str))
+    except msgspec.DecodeError as e:
+        if isinstance(json_str, bytes):
+            snippet = json_str[:200].decode("utf-8", errors="replace")
+        else:
+            snippet = json_str[:200]
+        if len(json_str) > 200:
+            snippet += "..."
+        _logger.exception(f"Failed to parse JSON string: '{snippet}' - {e!r}")
+        raise


from aiperf.common import aiperf_logger
from aiperf.common.aiperf_logger import AIPerfLogger
Expand Down Expand Up @@ -90,11 +90,9 @@ def load_json_str(
run validation checks on the object. Defaults to identity function.
"""
try:
# Note: orjson may not parse JSON the same way as Python's standard json library,
# notably being stricter on UTF-8 conformance.
# Refer to https://github.com/ijl/orjson?tab=readme-ov-file#str for details.
return func(orjson.loads(json_str))
except orjson.JSONDecodeError as e:
# Note: msgspec is strict on UTF-8 conformance and provides fast JSON parsing
return func(msgspec.json.decode(json_str))
except msgspec.DecodeError as e:
snippet = json_str[:200] + ("..." if len(json_str) > 200 else "")
_logger.exception(f"Failed to parse JSON string: '{snippet}' - {e!r}")
raise
Expand Down
6 changes: 3 additions & 3 deletions src/aiperf/endpoints/template_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import jinja2
import jmespath
import orjson
import msgspec

from aiperf.common.decorators import implements_protocol
from aiperf.common.enums import EndpointType
Expand Down Expand Up @@ -145,8 +145,8 @@ def format_payload(self, request_info: RequestInfo) -> dict[str, Any]:
rendered = self._template.render(**template_vars)

try:
payload = orjson.loads(rendered)
except orjson.JSONDecodeError as e:
payload = msgspec.json.decode(rendered)
except msgspec.DecodeError as e:
self.error(f"Template did not render valid JSON: {rendered} - {e!r}")
raise ValueError(
f"Template did not render valid JSON {e!r}: {rendered[:100]}"
Expand Down
6 changes: 3 additions & 3 deletions src/aiperf/transports/aiohttp_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from collections.abc import Mapping
from typing import Any

import orjson
import msgspec

from aiperf.common.enums import TransportType
from aiperf.common.exceptions import NotInitializedError
Expand Down Expand Up @@ -148,8 +148,8 @@ async def send_request(
url = self.build_url(request_info)
headers = self.build_headers(request_info)

# Serialize with orjson for performance
json_str = orjson.dumps(payload).decode("utf-8")
# Serialize with msgspec for performance
json_str = msgspec.json.encode(payload).decode("utf-8")
record = await self.aiohttp_client.post_request(url, json_str, headers)
record.request_headers = headers
except Exception as e:
Expand Down
4 changes: 2 additions & 2 deletions tests/aiperf_mock_server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
# SPDX-License-Identifier: Apache-2.0
"""Mock server configuration."""

import json
import logging
import os
from typing import Annotated, Any, Literal

import msgspec
from cyclopts import Parameter
from pydantic import Field, model_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
Expand Down Expand Up @@ -153,5 +153,5 @@ def _get_env_key(config_key: str) -> str:
def _serialize_env_value(value: Any) -> str:
"""Serialize value for environment variable storage."""
if isinstance(value, list | dict):
return json.dumps(value)
return msgspec.json.encode(value).decode("utf-8")
return str(value)
4 changes: 2 additions & 2 deletions tests/integration/test_gpu_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import platform

import orjson
import msgspec
import pytest

from aiperf.common.models.telemetry_models import TelemetryRecord
Expand Down Expand Up @@ -98,7 +98,7 @@ async def test_gpu_telemetry_export(

# Validate each line is valid JSON and can be parsed as TelemetryRecord
for line in lines:
record_dict = orjson.loads(line)
record_dict = msgspec.json.decode(line)
record = TelemetryRecord.model_validate(record_dict)

# Verify required fields are present
Expand Down
6 changes: 3 additions & 3 deletions tests/integration/test_random_generator_canary.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
that any changes to the codebase don't silently break determinism.
"""

import json
from pathlib import Path

import msgspec
import pytest

from aiperf.common.utils import load_json_str
Expand Down Expand Up @@ -115,5 +115,5 @@ def _save_reference(self, data: dict) -> None:
data: Inputs data to save as reference
"""
self.REFERENCE_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(self.REFERENCE_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
with open(self.REFERENCE_FILE, "wb") as f:
f.write(msgspec.json.format(msgspec.json.encode(data), indent=2))
7 changes: 3 additions & 4 deletions tests/integration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
"""Utility functions for integration tests."""

import base64
import json
import subprocess
from pathlib import Path

import orjson
import msgspec

from aiperf.common.aiperf_logger import AIPerfLogger
from tests.integration.models import VideoDetails
Expand All @@ -34,7 +33,7 @@ def create_rankings_dataset(tmp_path: Path, num_entries: int) -> Path:
{"name": "passages", "contents": [f"AI passage {i}"]},
]
}
f.write(orjson.dumps(entry).decode("utf-8") + "\n")
f.write(msgspec.json.encode(entry).decode("utf-8") + "\n")
return dataset_path


Expand Down Expand Up @@ -62,7 +61,7 @@ def extract_base64_video_details(base64_data: str) -> VideoDetails:
]
result = subprocess.run(cmd, input=video_bytes, capture_output=True, check=True)

probe_data = json.loads(result.stdout)
probe_data = msgspec.json.decode(result.stdout)
format_info = probe_data["format"]
video_stream = next(s for s in probe_data["streams"] if s["codec_type"] == "video")

Expand Down
Loading