Skip to content

Conversation

@ajcasagrande
Copy link
Contributor

@ajcasagrande ajcasagrande commented Nov 25, 2025

This is for the upcoming Sticky Credit Router. It will allow us to send credits to specific workers.

Summary by CodeRabbit

Release Notes

  • New Features

    • Added streaming ROUTER and DEALER client types for bidirectional ZeroMQ communication with identity-based message routing and asynchronous message handlers.
  • Tests

    • Added comprehensive test coverage for streaming clients, including initialization, message sending/receiving, lifecycle management, error handling, and edge cases.

✏️ Tip: You can customize this high-level summary in your review settings.

@github-actions
Copy link

github-actions bot commented Nov 25, 2025

Try out this PR

Quick install:

pip install --upgrade --force-reinstall git+https://github.com/ai-dynamo/aiperf.git@9e00c2694a7ecfdf1bd2d7a4c91d3ead7725f490

Recommended with virtual environment (using uv):

uv venv --python 3.12 && source .venv/bin/activate
uv pip install --upgrade --force-reinstall git+https://github.com/ai-dynamo/aiperf.git@9e00c2694a7ecfdf1bd2d7a4c91d3ead7725f490

Last updated for commit: 9e00c26Browse code

@codecov
Copy link

codecov bot commented Nov 25, 2025

Codecov Report

❌ Patch coverage is 92.66667% with 11 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
src/aiperf/zmq/streaming_router_client.py 92.42% 0 Missing and 5 partials ⚠️
src/aiperf/zmq/streaming_dealer_client.py 93.22% 1 Missing and 3 partials ⚠️
src/aiperf/common/base_comms.py 60.00% 2 Missing ⚠️

📢 Thoughts on this report? Let us know!

@coderabbitai
Copy link

coderabbitai bot commented Nov 25, 2025

Walkthrough

The PR introduces bidirectional streaming support for ZMQ ROUTER and DEALER socket patterns. It adds two new client types (STREAMING_ROUTER, STREAMING_DEALER) with corresponding protocol definitions and implementations. The base socket initialization is refactored to handle IDENTITY socket options early. Comprehensive test coverage validates the new functionality across initialization, message handling, lifecycle management, and error scenarios.

Changes

Cohort / File(s) Summary
Protocol & Type Definitions
src/aiperf/common/enums/communication_enums.py, src/aiperf/common/protocols.py
Added CommClientType enum members STREAMING_ROUTER and STREAMING_DEALER; introduced StreamingRouterClientProtocol and StreamingDealerClientProtocol with register_receiver and message sending methods; extended CommunicationProtocol with factory methods for streaming clients.
Factory Methods
src/aiperf/common/base_comms.py
Added create_streaming_router_client and create_streaming_dealer_client factory methods to BaseCommunication, delegating to create_client with appropriate CommClientType and returning protocol-typed clients.
Streaming Client Implementations
src/aiperf/zmq/streaming_dealer_client.py, src/aiperf/zmq/streaming_router_client.py
Implemented ZMQStreamingDealerClient and ZMQStreamingRouterClient with DEALER and ROUTER sockets respectively, including message registration/sending, background receiver tasks, and lifecycle management.
Package Exports
src/aiperf/zmq/__init__.py
Added imports and exports of ZMQStreamingDealerClient and ZMQStreamingRouterClient to public API.
Base Client Refactoring
src/aiperf/zmq/zmq_base_client.py
Moved socket creation from __init__ to _initialize_socket; added early IDENTITY socket option application before bind/connect, with removal from socket_ops after setting.
Test Infrastructure
tests/unit/zmq/conftest.py
Added fixtures for streaming_router_test_helper and streaming_dealer_test_helper; introduced multiple_identities and special_identity parameterized fixtures.
Streaming Client Tests
tests/unit/zmq/test_streaming_dealer_client.py, tests/unit/zmq/test_streaming_router_client.py
Added comprehensive test suites covering initialization, receiver registration, message sending/receiving, lifecycle management, error handling, and edge cases for both streaming clients.
Base Client Tests
tests/unit/zmq/test_zmq_base_client.py
Renamed initialization test; added TestBaseZMQClientIdentity class with extensive tests for IDENTITY socket option ordering and application.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Key areas requiring attention during review:

  • IDENTITY socket option handling: Verify the early application of IDENTITY before bind/connect in zmq_base_client.py does not break existing socket types or configurations
  • Message serialization/deserialization: Inspect JSON encoding/decoding paths in both streaming clients for consistency and error handling
  • Background task lifecycle: Validate that background receiver tasks start immediately, handle cancellation gracefully, and clean up properly on stop
  • Receiver registration semantics: Confirm that single-registration enforcement is consistent across both ROUTER and DEALER client implementations
  • Fixture redefinitions: Note duplicate fixture definitions in conftest.py that may cause unexpected behavior—verify intentionality

Poem

🐰 Streaming sockets, DEALER and ROUTER,
Now dance through the ZMQ route-er!
With identity carved deep,
And promises kept,
Async messages flow—no longer a doubter! 🚀

Pre-merge checks

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 71.05% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The PR title accurately describes the main change: adding bi-directional streaming DEALER and ROUTER ZMQ clients across multiple files including protocols, implementations, and comprehensive tests.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (1)
tests/unit/zmq/test_streaming_router_client.py (1)

322-371: The streaming_router_test_helper fixture is unused in test_full_lifecycle.

At line 340, the streaming_router_test_helper fixture is received but never used. The test manually instantiates ZMQStreamingRouterClient which is appropriate for testing the full manual lifecycle, so the fixture parameter can be removed.

     @pytest.mark.asyncio
-    async def test_full_lifecycle(self, streaming_router_test_helper):
+    async def test_full_lifecycle(self, mock_zmq_context):
         """Test full client lifecycle: initialize -> start -> stop."""
         client = ZMQStreamingRouterClient(address="tcp://*:5555", bind=True)
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 563d9d8 and 12f9291.

📒 Files selected for processing (11)
  • src/aiperf/common/base_comms.py (2 hunks)
  • src/aiperf/common/enums/communication_enums.py (1 hunks)
  • src/aiperf/common/protocols.py (2 hunks)
  • src/aiperf/zmq/__init__.py (2 hunks)
  • src/aiperf/zmq/streaming_dealer_client.py (1 hunks)
  • src/aiperf/zmq/streaming_router_client.py (1 hunks)
  • src/aiperf/zmq/zmq_base_client.py (2 hunks)
  • tests/unit/zmq/conftest.py (2 hunks)
  • tests/unit/zmq/test_streaming_dealer_client.py (1 hunks)
  • tests/unit/zmq/test_streaming_router_client.py (1 hunks)
  • tests/unit/zmq/test_zmq_base_client.py (2 hunks)
🧰 Additional context used
📓 Path-based instructions (4)
**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.py: Use async/await for all I/O operations; never use time.sleep() or blocking calls
Always use orjson for JSON operations: orjson.loads(s) and orjson.dumps(d)
All functions must have type hints on parameters and return types
Use Python 3.10+ union syntax (|) instead of typing.Union; use match/case for pattern matching; use @DataClass(slots=True)

Files:

  • src/aiperf/zmq/__init__.py
  • src/aiperf/common/enums/communication_enums.py
  • tests/unit/zmq/test_streaming_dealer_client.py
  • tests/unit/zmq/conftest.py
  • src/aiperf/zmq/streaming_dealer_client.py
  • tests/unit/zmq/test_streaming_router_client.py
  • src/aiperf/common/base_comms.py
  • src/aiperf/zmq/zmq_base_client.py
  • src/aiperf/zmq/streaming_router_client.py
  • src/aiperf/common/protocols.py
  • tests/unit/zmq/test_zmq_base_client.py
**/*test*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Test files must use pytest with fixtures, helpers, and @pytest.mark.parametrize; import statements at the top; use # fmt: skip for long parameterize blocks

Files:

  • tests/unit/zmq/test_streaming_dealer_client.py
  • tests/unit/zmq/conftest.py
  • tests/unit/zmq/test_streaming_router_client.py
  • tests/unit/zmq/test_zmq_base_client.py
**/conftest.py

📄 CodeRabbit inference engine (CLAUDE.md)

Auto-fixtures in tests should mock time, set RNG=42, and reset singletons

Files:

  • tests/unit/zmq/conftest.py
**/{protocols,factories,plugin_enums}.py

📄 CodeRabbit inference engine (CLAUDE.md)

Extensible features must follow Protocol + Factory + Enum pattern: define Protocol in protocols.py, Factory in factories.py, Enum in plugin_enums.py, and register with @Factory.register(Enum.TYPE)

Files:

  • src/aiperf/common/protocols.py
🧠 Learnings (1)
📚 Learning: 2025-11-25T00:08:56.784Z
Learnt from: CR
Repo: ai-dynamo/aiperf PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-11-25T00:08:56.784Z
Learning: Applies to **/conftest.py : Auto-fixtures in tests should mock time, set RNG=42, and reset singletons

Applied to files:

  • tests/unit/zmq/conftest.py
🧬 Code graph analysis (8)
src/aiperf/zmq/__init__.py (2)
src/aiperf/zmq/streaming_dealer_client.py (1)
  • ZMQStreamingDealerClient (23-189)
src/aiperf/zmq/streaming_router_client.py (1)
  • ZMQStreamingRouterClient (23-206)
tests/unit/zmq/test_streaming_dealer_client.py (3)
src/aiperf/common/exceptions.py (1)
  • NotInitializedError (164-165)
src/aiperf/zmq/streaming_dealer_client.py (3)
  • ZMQStreamingDealerClient (23-189)
  • register_receiver (103-117)
  • send (124-149)
src/aiperf/common/messages/base_messages.py (1)
  • to_json_bytes (46-60)
tests/unit/zmq/conftest.py (4)
src/aiperf/zmq/streaming_router_client.py (1)
  • ZMQStreamingRouterClient (23-206)
src/aiperf/common/base_comms.py (1)
  • create_client (40-56)
src/aiperf/common/protocols.py (2)
  • create_client (266-276)
  • request (171-175)
src/aiperf/zmq/streaming_dealer_client.py (1)
  • ZMQStreamingDealerClient (23-189)
tests/unit/zmq/test_streaming_router_client.py (4)
src/aiperf/common/exceptions.py (1)
  • NotInitializedError (164-165)
tests/unit/zmq/conftest.py (26)
  • mock_zmq_context (54-59)
  • streaming_router_test_helper (441-471)
  • sample_message (75-82)
  • create_client (197-234)
  • create_client (253-267)
  • create_client (287-301)
  • create_client (321-331)
  • create_client (351-365)
  • create_client (385-395)
  • create_client (415-435)
  • create_client (455-469)
  • create_client (489-506)
  • multiple_identities (571-573)
  • special_identity (586-588)
  • create_callback_tracker (552-567)
  • callback (560-563)
  • setup_mock_socket (130-194)
  • setup_mock_socket (249-250)
  • setup_mock_socket (283-284)
  • setup_mock_socket (317-318)
  • setup_mock_socket (347-348)
  • setup_mock_socket (381-382)
  • setup_mock_socket (411-412)
  • setup_mock_socket (451-452)
  • setup_mock_socket (485-486)
  • wait_for_background_task (112-120)
src/aiperf/common/base_comms.py (1)
  • create_client (40-56)
src/aiperf/common/messages/base_messages.py (1)
  • to_json_bytes (46-60)
src/aiperf/common/base_comms.py (2)
src/aiperf/common/protocols.py (5)
  • StreamingDealerClientProtocol (212-234)
  • StreamingRouterClientProtocol (185-208)
  • create_streaming_router_client (339-347)
  • create_client (266-276)
  • create_streaming_dealer_client (349-358)
src/aiperf/common/enums/communication_enums.py (1)
  • CommClientType (12-20)
src/aiperf/zmq/streaming_router_client.py (3)
src/aiperf/common/protocols.py (5)
  • StreamingRouterClientProtocol (185-208)
  • register_receiver (188-198)
  • register_receiver (215-225)
  • send_to (200-208)
  • exception (74-74)
src/aiperf/zmq/zmq_base_client.py (1)
  • _check_initialized (57-62)
src/aiperf/common/messages/base_messages.py (1)
  • to_json_bytes (46-60)
src/aiperf/common/protocols.py (3)
src/aiperf/zmq/streaming_dealer_client.py (2)
  • register_receiver (103-117)
  • send (124-149)
src/aiperf/zmq/streaming_router_client.py (2)
  • register_receiver (103-118)
  • send_to (125-154)
src/aiperf/common/base_comms.py (2)
  • create_streaming_router_client (131-142)
  • create_streaming_dealer_client (144-161)
tests/unit/zmq/test_zmq_base_client.py (3)
tests/unit/zmq/conftest.py (1)
  • mock_zmq_socket (23-50)
src/aiperf/zmq/zmq_base_client.py (1)
  • BaseZMQClient (18-146)
src/aiperf/common/protocols.py (1)
  • initialize (118-118)
🪛 Ruff (0.14.5)
tests/unit/zmq/test_streaming_dealer_client.py

21-21: Unused method argument: mock_zmq_context

(ARG002)


44-44: Unused method argument: mock_zmq_context

(ARG002)


57-57: Unused method argument: mock_zmq_context

(ARG002)


70-70: Unused method argument: mock_zmq_context

(ARG002)


85-85: Unused method argument: mock_zmq_context

(ARG002)


101-101: Unused method argument: mock_zmq_context

(ARG002)


118-118: Unused method argument: mock_zmq_context

(ARG002)


173-173: Unused method argument: streaming_dealer_test_helper

(ARG002)


443-443: Unused method argument: mock_zmq_context

(ARG002)


454-454: Unused method argument: mock_zmq_context

(ARG002)

src/aiperf/zmq/streaming_dealer_client.py

113-113: Avoid specifying long messages outside the exception class

(TRY003)


138-140: Avoid specifying long messages outside the exception class

(TRY003)


180-180: Do not catch blind exception: Exception

(BLE001)

tests/unit/zmq/test_streaming_router_client.py

21-21: Unused method argument: mock_zmq_context

(ARG002)


38-38: Unused method argument: mock_zmq_context

(ARG002)


45-45: Unused method argument: mock_zmq_context

(ARG002)


61-61: Unused method argument: mock_zmq_context

(ARG002)


73-73: Unused method argument: mock_zmq_context

(ARG002)


124-124: Unused method argument: mock_zmq_context

(ARG002)


340-340: Unused method argument: streaming_router_test_helper

(ARG002)

src/aiperf/zmq/streaming_router_client.py

116-116: Avoid specifying long messages outside the exception class

(TRY003)


140-142: Avoid specifying long messages outside the exception class

(TRY003)


198-198: Do not catch blind exception: Exception

(BLE001)

tests/unit/zmq/test_zmq_base_client.py

290-290: Unused method argument: mock_zmq_context

(ARG002)


319-319: Unused method argument: mock_zmq_socket

(ARG002)


319-319: Unused method argument: mock_zmq_context

(ARG002)


361-361: Unused method argument: mock_zmq_context

(ARG002)


380-380: Unused method argument: mock_zmq_context

(ARG002)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
  • GitHub Check: build (ubuntu-latest, 3.13)
  • GitHub Check: build (ubuntu-latest, 3.12)
  • GitHub Check: build (macos-latest, 3.11)
  • GitHub Check: build (macos-latest, 3.12)
  • GitHub Check: build (ubuntu-latest, 3.10)
  • GitHub Check: build (ubuntu-latest, 3.11)
  • GitHub Check: build (macos-latest, 3.13)
  • GitHub Check: build (macos-latest, 3.10)
  • GitHub Check: integration-tests (ubuntu-latest, 3.12)
  • GitHub Check: integration-tests (ubuntu-latest, 3.10)
  • GitHub Check: integration-tests (ubuntu-latest, 3.11)
  • GitHub Check: integration-tests (ubuntu-latest, 3.13)
🔇 Additional comments (35)
src/aiperf/zmq/__init__.py (1)

26-31: LGTM!

The new streaming client imports and exports are correctly added and maintain the existing alphabetical ordering convention for the auto-generated __all__ list.

Also applies to: 80-81

src/aiperf/common/enums/communication_enums.py (1)

19-20: LGTM!

The new enum members follow the existing naming conventions and integrate properly with the factory methods in base_comms.py.

src/aiperf/zmq/zmq_base_client.py (2)

46-46: LGTM!

Deferring socket creation to _initialize_socket is a good practice - it ensures resources are only allocated when the client is explicitly initialized.


85-92: Verification complete: No existing callers pass reused socket_ops with zmq.IDENTITY.

All production code either creates inline dicts or generates new dicts before passing to socket clients. The streaming dealer client protects callers by constructing {**(socket_ops or {}), zmq.IDENTITY: identity.encode()}. Proxies reuse socket_ops across multiple client instantiations but currently never include zmq.IDENTITY in that dict. The early IDENTITY setting before bind/connect is correct per ZMQ requirements.

While the current implementation lacks defensive copying (creating a latent design issue for future code), it poses no risk to existing callers and functions correctly for the streaming dealer pattern.

src/aiperf/common/base_comms.py (2)

17-18: LGTM!

The new protocol imports align with the factory methods added below.


131-142: LGTM!

The factory method follows the existing pattern and correctly defaults bind=True for ROUTER sockets, which is the typical use case.

tests/unit/zmq/conftest.py (3)

440-471: LGTM!

The streaming_router_test_helper fixture follows the established pattern and correctly wraps ZMQStreamingRouterClient with lifecycle management.


474-508: LGTM!

The streaming_dealer_test_helper correctly passes the identity parameter through client_kwargs, matching the ZMQStreamingDealerClient constructor requirements.


570-587: LGTM!

The identity fixtures provide good test coverage for common worker naming patterns and edge cases with special characters. The # fmt: skip comment correctly suppresses formatter changes per coding guidelines.

tests/unit/zmq/test_streaming_router_client.py (6)

1-16: LGTM!

Imports are well-organized with standard library, third-party, and local imports properly separated. The test module provides comprehensive coverage for the new streaming router client.


18-55: LGTM!

Initialization tests properly verify socket type, receiver state, address/bind handling, and custom socket options. The parameterized test with # fmt: skip follows coding guidelines.

Note: The static analysis warnings about unused mock_zmq_context are false positives—it's an autouse fixture from conftest.py that patches the ZMQ context globally.


57-88: LGTM!

Good test coverage for receiver registration including the error case when attempting to register a second handler.


90-165: LGTM!

Comprehensive send_to tests covering:

  • Routing envelope construction
  • Multiple identity sends
  • Not-initialized error handling
  • Type validation for Message instances
  • Send failure handling
  • Special character identity encoding

167-320: LGTM!

The receiver background task tests are well-designed with proper handling of async patterns:

  • Uses asyncio.Future() to block after first message (prevents busy loops)
  • Tests handler invocation, missing handler warnings, exception handling
  • Covers zmq.Again, RuntimeError, and CancelledError scenarios
  • Tests edge case of empty routing envelope

374-392: LGTM!

Good edge case test verifying concurrent sends work correctly with asyncio.gather.

src/aiperf/common/protocols.py (3)

184-208: Well-structured streaming ROUTER protocol definition.

The StreamingRouterClientProtocol correctly defines the bidirectional streaming interface with register_receiver for incoming message handling and send_to for identity-based routing. The handler signature properly includes both identity: str and message: MessageT parameters, aligning with the ROUTER socket's envelope-based messaging pattern.


211-234: Clean streaming DEALER protocol definition.

The StreamingDealerClientProtocol appropriately simplifies the handler signature to just message: MessageT since DEALER sockets don't need to track sender identity. The send method (vs send_to) correctly reflects that DEALER sends to its connected ROUTER without specifying a destination.


339-358: Factory methods correctly integrate with the protocol pattern.

The factory methods properly follow the existing pattern in CommunicationProtocol. The create_streaming_dealer_client correctly includes the identity parameter which is essential for DEALER socket identification. Default bind values (True for ROUTER, False for DEALER) align with typical ZMQ usage patterns.

tests/unit/zmq/test_streaming_dealer_client.py (6)

18-95: Comprehensive initialization tests with good parametrization.

The initialization tests properly validate socket type, identity handling, and socket options. The unused mock_zmq_context fixture arguments flagged by static analysis are intentional — they ensure the ZMQ context is mocked via the autouse fixture mechanism before client instantiation.


97-136: Receiver registration tests correctly validate single-handler constraint.

Tests properly verify both successful registration and the prevention of duplicate handler registration. This aligns with the implementation that raises ValueError when a handler is already registered.


139-202: Send method tests cover key validation scenarios.

Good coverage of: successful send, multiple messages, uninitialized socket error, type validation, and send failure propagation. The NotInitializedError test correctly sets socket = None to simulate the uninitialized state.


205-314: Background receiver tests validate async behavior correctly.

The tests properly exercise the receiver task's lifecycle, message dispatching, exception handling (zmq.Again and generic errors), and graceful cancellation. The use of asyncio.Future() to block after controlled iterations is an effective pattern for testing background loops.


401-436: Multiple message receiver test validates streaming throughput.

The test correctly simulates receiving multiple messages in sequence and verifies all are dispatched to the handler. Using an asyncio.Event to signal completion avoids flaky timing-based assertions.


355-365: No issues found—test expectation is correct and verified.

The test correctly expects asyncio.CancelledError with message "Socket was stopped". This is confirmed by the _check_initialized() implementation in src/aiperf/zmq/zmq_base_client.py (line 60), which raises exactly this exception when self.stop_requested is true. The send() method calls this check at line 135 of streaming_dealer_client.py, ensuring the test behavior aligns with the actual implementation.

tests/unit/zmq/test_zmq_base_client.py (4)

33-45: Test correctly updated to reflect deferred socket creation.

The renamed test test_init_stores_correct_params and new assertions (socket is None, socket.assert_not_called()) accurately reflect the architectural change where socket creation is deferred to initialize() rather than happening in __init__.


275-315: Thorough verification of IDENTITY option ordering.

The test_identity_set_before_bind_or_connect test correctly verifies the critical requirement that IDENTITY must be set before bind/connect. The method call index comparison (identity_idx < action_idx) is a robust way to validate ordering.


317-332: Important test for socket_ops mutation behavior.

This test validates that IDENTITY is removed from socket_ops after being applied, preventing duplicate application. The assertion that other options (like zmq.IMMEDIATE) remain in socket_ops confirms the selective removal is working correctly.


373-392: Good coverage of identity format variations.

Testing various identity formats including max-length (255 bytes) ensures the implementation handles edge cases. The parametrization approach keeps tests concise while providing comprehensive coverage.

src/aiperf/zmq/streaming_dealer_client.py (3)

73-101: Clean constructor with proper identity handling.

The constructor correctly encodes the identity and passes it via socket_ops. Setting client_id=identity ensures consistent identification across the client lifecycle.


103-117: Single-handler constraint is appropriately enforced.

The register_receiver method correctly prevents multiple handler registrations, which avoids ambiguity in message dispatch.


124-149: Send method correctly validates and transmits messages.

The method properly checks initialization, validates message type, and uses single-frame send (appropriate for DEALER sockets). Error logging before re-raising provides good observability.

src/aiperf/zmq/streaming_router_client.py (4)

84-101: Constructor correctly initializes ROUTER socket.

The constructor properly sets up the ROUTER socket with default bind=True, which is the typical pattern for ROUTER sockets that accept connections from multiple DEALER clients.


103-118: Handler registration follows consistent pattern.

The single-handler constraint matches the DEALER client implementation, maintaining consistency across the streaming client family.


125-154: send_to correctly implements ROUTER envelope pattern.

The method properly encodes the identity to bytes and uses send_multipart with the routing envelope. The tuple unpacking [*routing_envelope, message.to_json_bytes()] is a clean way to construct the multipart message.


172-180: Routing envelope extraction handles edge cases.

The fallback to (b"",) when len(data) <= 1 provides defensive handling for malformed messages, though in practice DEALER→ROUTER messages should always include identity.

@ajcasagrande ajcasagrande force-pushed the ajc/zmq-streaming-dealer-router branch from 12f9291 to 38ab01b Compare November 25, 2025 15:50
Signed-off-by: Anthony Casagrande <[email protected]>
@ajcasagrande ajcasagrande merged commit 42c6829 into main Nov 25, 2025
18 checks passed
@ajcasagrande ajcasagrande deleted the ajc/zmq-streaming-dealer-router branch November 25, 2025 18:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants