-
Notifications
You must be signed in to change notification settings - Fork 11
feat: add bi-directional streaming dealer and router zmq clients #494
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Try out this PRQuick install: pip install --upgrade --force-reinstall git+https://github.com/ai-dynamo/aiperf.git@9e00c2694a7ecfdf1bd2d7a4c91d3ead7725f490Recommended with virtual environment (using uv): uv venv --python 3.12 && source .venv/bin/activate
uv pip install --upgrade --force-reinstall git+https://github.com/ai-dynamo/aiperf.git@9e00c2694a7ecfdf1bd2d7a4c91d3ead7725f490Last updated for commit: |
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
WalkthroughThe PR introduces bidirectional streaming support for ZMQ ROUTER and DEALER socket patterns. It adds two new client types (STREAMING_ROUTER, STREAMING_DEALER) with corresponding protocol definitions and implementations. The base socket initialization is refactored to handle IDENTITY socket options early. Comprehensive test coverage validates the new functionality across initialization, message handling, lifecycle management, and error scenarios. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Key areas requiring attention during review:
Poem
Pre-merge checks❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (1)
tests/unit/zmq/test_streaming_router_client.py (1)
322-371: Thestreaming_router_test_helperfixture is unused intest_full_lifecycle.At line 340, the
streaming_router_test_helperfixture is received but never used. The test manually instantiatesZMQStreamingRouterClientwhich is appropriate for testing the full manual lifecycle, so the fixture parameter can be removed.@pytest.mark.asyncio - async def test_full_lifecycle(self, streaming_router_test_helper): + async def test_full_lifecycle(self, mock_zmq_context): """Test full client lifecycle: initialize -> start -> stop.""" client = ZMQStreamingRouterClient(address="tcp://*:5555", bind=True)
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (11)
src/aiperf/common/base_comms.py(2 hunks)src/aiperf/common/enums/communication_enums.py(1 hunks)src/aiperf/common/protocols.py(2 hunks)src/aiperf/zmq/__init__.py(2 hunks)src/aiperf/zmq/streaming_dealer_client.py(1 hunks)src/aiperf/zmq/streaming_router_client.py(1 hunks)src/aiperf/zmq/zmq_base_client.py(2 hunks)tests/unit/zmq/conftest.py(2 hunks)tests/unit/zmq/test_streaming_dealer_client.py(1 hunks)tests/unit/zmq/test_streaming_router_client.py(1 hunks)tests/unit/zmq/test_zmq_base_client.py(2 hunks)
🧰 Additional context used
📓 Path-based instructions (4)
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py: Use async/await for all I/O operations; never use time.sleep() or blocking calls
Always use orjson for JSON operations: orjson.loads(s) and orjson.dumps(d)
All functions must have type hints on parameters and return types
Use Python 3.10+ union syntax (|) instead of typing.Union; use match/case for pattern matching; use @DataClass(slots=True)
Files:
src/aiperf/zmq/__init__.pysrc/aiperf/common/enums/communication_enums.pytests/unit/zmq/test_streaming_dealer_client.pytests/unit/zmq/conftest.pysrc/aiperf/zmq/streaming_dealer_client.pytests/unit/zmq/test_streaming_router_client.pysrc/aiperf/common/base_comms.pysrc/aiperf/zmq/zmq_base_client.pysrc/aiperf/zmq/streaming_router_client.pysrc/aiperf/common/protocols.pytests/unit/zmq/test_zmq_base_client.py
**/*test*.py
📄 CodeRabbit inference engine (CLAUDE.md)
Test files must use pytest with fixtures, helpers, and @pytest.mark.parametrize; import statements at the top; use # fmt: skip for long parameterize blocks
Files:
tests/unit/zmq/test_streaming_dealer_client.pytests/unit/zmq/conftest.pytests/unit/zmq/test_streaming_router_client.pytests/unit/zmq/test_zmq_base_client.py
**/conftest.py
📄 CodeRabbit inference engine (CLAUDE.md)
Auto-fixtures in tests should mock time, set RNG=42, and reset singletons
Files:
tests/unit/zmq/conftest.py
**/{protocols,factories,plugin_enums}.py
📄 CodeRabbit inference engine (CLAUDE.md)
Extensible features must follow Protocol + Factory + Enum pattern: define Protocol in protocols.py, Factory in factories.py, Enum in plugin_enums.py, and register with @Factory.register(Enum.TYPE)
Files:
src/aiperf/common/protocols.py
🧠 Learnings (1)
📚 Learning: 2025-11-25T00:08:56.784Z
Learnt from: CR
Repo: ai-dynamo/aiperf PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-11-25T00:08:56.784Z
Learning: Applies to **/conftest.py : Auto-fixtures in tests should mock time, set RNG=42, and reset singletons
Applied to files:
tests/unit/zmq/conftest.py
🧬 Code graph analysis (8)
src/aiperf/zmq/__init__.py (2)
src/aiperf/zmq/streaming_dealer_client.py (1)
ZMQStreamingDealerClient(23-189)src/aiperf/zmq/streaming_router_client.py (1)
ZMQStreamingRouterClient(23-206)
tests/unit/zmq/test_streaming_dealer_client.py (3)
src/aiperf/common/exceptions.py (1)
NotInitializedError(164-165)src/aiperf/zmq/streaming_dealer_client.py (3)
ZMQStreamingDealerClient(23-189)register_receiver(103-117)send(124-149)src/aiperf/common/messages/base_messages.py (1)
to_json_bytes(46-60)
tests/unit/zmq/conftest.py (4)
src/aiperf/zmq/streaming_router_client.py (1)
ZMQStreamingRouterClient(23-206)src/aiperf/common/base_comms.py (1)
create_client(40-56)src/aiperf/common/protocols.py (2)
create_client(266-276)request(171-175)src/aiperf/zmq/streaming_dealer_client.py (1)
ZMQStreamingDealerClient(23-189)
tests/unit/zmq/test_streaming_router_client.py (4)
src/aiperf/common/exceptions.py (1)
NotInitializedError(164-165)tests/unit/zmq/conftest.py (26)
mock_zmq_context(54-59)streaming_router_test_helper(441-471)sample_message(75-82)create_client(197-234)create_client(253-267)create_client(287-301)create_client(321-331)create_client(351-365)create_client(385-395)create_client(415-435)create_client(455-469)create_client(489-506)multiple_identities(571-573)special_identity(586-588)create_callback_tracker(552-567)callback(560-563)setup_mock_socket(130-194)setup_mock_socket(249-250)setup_mock_socket(283-284)setup_mock_socket(317-318)setup_mock_socket(347-348)setup_mock_socket(381-382)setup_mock_socket(411-412)setup_mock_socket(451-452)setup_mock_socket(485-486)wait_for_background_task(112-120)src/aiperf/common/base_comms.py (1)
create_client(40-56)src/aiperf/common/messages/base_messages.py (1)
to_json_bytes(46-60)
src/aiperf/common/base_comms.py (2)
src/aiperf/common/protocols.py (5)
StreamingDealerClientProtocol(212-234)StreamingRouterClientProtocol(185-208)create_streaming_router_client(339-347)create_client(266-276)create_streaming_dealer_client(349-358)src/aiperf/common/enums/communication_enums.py (1)
CommClientType(12-20)
src/aiperf/zmq/streaming_router_client.py (3)
src/aiperf/common/protocols.py (5)
StreamingRouterClientProtocol(185-208)register_receiver(188-198)register_receiver(215-225)send_to(200-208)exception(74-74)src/aiperf/zmq/zmq_base_client.py (1)
_check_initialized(57-62)src/aiperf/common/messages/base_messages.py (1)
to_json_bytes(46-60)
src/aiperf/common/protocols.py (3)
src/aiperf/zmq/streaming_dealer_client.py (2)
register_receiver(103-117)send(124-149)src/aiperf/zmq/streaming_router_client.py (2)
register_receiver(103-118)send_to(125-154)src/aiperf/common/base_comms.py (2)
create_streaming_router_client(131-142)create_streaming_dealer_client(144-161)
tests/unit/zmq/test_zmq_base_client.py (3)
tests/unit/zmq/conftest.py (1)
mock_zmq_socket(23-50)src/aiperf/zmq/zmq_base_client.py (1)
BaseZMQClient(18-146)src/aiperf/common/protocols.py (1)
initialize(118-118)
🪛 Ruff (0.14.5)
tests/unit/zmq/test_streaming_dealer_client.py
21-21: Unused method argument: mock_zmq_context
(ARG002)
44-44: Unused method argument: mock_zmq_context
(ARG002)
57-57: Unused method argument: mock_zmq_context
(ARG002)
70-70: Unused method argument: mock_zmq_context
(ARG002)
85-85: Unused method argument: mock_zmq_context
(ARG002)
101-101: Unused method argument: mock_zmq_context
(ARG002)
118-118: Unused method argument: mock_zmq_context
(ARG002)
173-173: Unused method argument: streaming_dealer_test_helper
(ARG002)
443-443: Unused method argument: mock_zmq_context
(ARG002)
454-454: Unused method argument: mock_zmq_context
(ARG002)
src/aiperf/zmq/streaming_dealer_client.py
113-113: Avoid specifying long messages outside the exception class
(TRY003)
138-140: Avoid specifying long messages outside the exception class
(TRY003)
180-180: Do not catch blind exception: Exception
(BLE001)
tests/unit/zmq/test_streaming_router_client.py
21-21: Unused method argument: mock_zmq_context
(ARG002)
38-38: Unused method argument: mock_zmq_context
(ARG002)
45-45: Unused method argument: mock_zmq_context
(ARG002)
61-61: Unused method argument: mock_zmq_context
(ARG002)
73-73: Unused method argument: mock_zmq_context
(ARG002)
124-124: Unused method argument: mock_zmq_context
(ARG002)
340-340: Unused method argument: streaming_router_test_helper
(ARG002)
src/aiperf/zmq/streaming_router_client.py
116-116: Avoid specifying long messages outside the exception class
(TRY003)
140-142: Avoid specifying long messages outside the exception class
(TRY003)
198-198: Do not catch blind exception: Exception
(BLE001)
tests/unit/zmq/test_zmq_base_client.py
290-290: Unused method argument: mock_zmq_context
(ARG002)
319-319: Unused method argument: mock_zmq_socket
(ARG002)
319-319: Unused method argument: mock_zmq_context
(ARG002)
361-361: Unused method argument: mock_zmq_context
(ARG002)
380-380: Unused method argument: mock_zmq_context
(ARG002)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
- GitHub Check: build (ubuntu-latest, 3.13)
- GitHub Check: build (ubuntu-latest, 3.12)
- GitHub Check: build (macos-latest, 3.11)
- GitHub Check: build (macos-latest, 3.12)
- GitHub Check: build (ubuntu-latest, 3.10)
- GitHub Check: build (ubuntu-latest, 3.11)
- GitHub Check: build (macos-latest, 3.13)
- GitHub Check: build (macos-latest, 3.10)
- GitHub Check: integration-tests (ubuntu-latest, 3.12)
- GitHub Check: integration-tests (ubuntu-latest, 3.10)
- GitHub Check: integration-tests (ubuntu-latest, 3.11)
- GitHub Check: integration-tests (ubuntu-latest, 3.13)
🔇 Additional comments (35)
src/aiperf/zmq/__init__.py (1)
26-31: LGTM!The new streaming client imports and exports are correctly added and maintain the existing alphabetical ordering convention for the auto-generated
__all__list.Also applies to: 80-81
src/aiperf/common/enums/communication_enums.py (1)
19-20: LGTM!The new enum members follow the existing naming conventions and integrate properly with the factory methods in
base_comms.py.src/aiperf/zmq/zmq_base_client.py (2)
46-46: LGTM!Deferring socket creation to
_initialize_socketis a good practice - it ensures resources are only allocated when the client is explicitly initialized.
85-92: Verification complete: No existing callers pass reusedsocket_opswithzmq.IDENTITY.All production code either creates inline dicts or generates new dicts before passing to socket clients. The streaming dealer client protects callers by constructing
{**(socket_ops or {}), zmq.IDENTITY: identity.encode()}. Proxies reusesocket_opsacross multiple client instantiations but currently never includezmq.IDENTITYin that dict. The early IDENTITY setting before bind/connect is correct per ZMQ requirements.While the current implementation lacks defensive copying (creating a latent design issue for future code), it poses no risk to existing callers and functions correctly for the streaming dealer pattern.
src/aiperf/common/base_comms.py (2)
17-18: LGTM!The new protocol imports align with the factory methods added below.
131-142: LGTM!The factory method follows the existing pattern and correctly defaults
bind=Truefor ROUTER sockets, which is the typical use case.tests/unit/zmq/conftest.py (3)
440-471: LGTM!The
streaming_router_test_helperfixture follows the established pattern and correctly wrapsZMQStreamingRouterClientwith lifecycle management.
474-508: LGTM!The
streaming_dealer_test_helpercorrectly passes theidentityparameter throughclient_kwargs, matching theZMQStreamingDealerClientconstructor requirements.
570-587: LGTM!The identity fixtures provide good test coverage for common worker naming patterns and edge cases with special characters. The
# fmt: skipcomment correctly suppresses formatter changes per coding guidelines.tests/unit/zmq/test_streaming_router_client.py (6)
1-16: LGTM!Imports are well-organized with standard library, third-party, and local imports properly separated. The test module provides comprehensive coverage for the new streaming router client.
18-55: LGTM!Initialization tests properly verify socket type, receiver state, address/bind handling, and custom socket options. The parameterized test with
# fmt: skipfollows coding guidelines.Note: The static analysis warnings about unused
mock_zmq_contextare false positives—it's an autouse fixture fromconftest.pythat patches the ZMQ context globally.
57-88: LGTM!Good test coverage for receiver registration including the error case when attempting to register a second handler.
90-165: LGTM!Comprehensive send_to tests covering:
- Routing envelope construction
- Multiple identity sends
- Not-initialized error handling
- Type validation for Message instances
- Send failure handling
- Special character identity encoding
167-320: LGTM!The receiver background task tests are well-designed with proper handling of async patterns:
- Uses
asyncio.Future()to block after first message (prevents busy loops)- Tests handler invocation, missing handler warnings, exception handling
- Covers
zmq.Again,RuntimeError, andCancelledErrorscenarios- Tests edge case of empty routing envelope
374-392: LGTM!Good edge case test verifying concurrent sends work correctly with
asyncio.gather.src/aiperf/common/protocols.py (3)
184-208: Well-structured streaming ROUTER protocol definition.The
StreamingRouterClientProtocolcorrectly defines the bidirectional streaming interface withregister_receiverfor incoming message handling andsend_tofor identity-based routing. The handler signature properly includes bothidentity: strandmessage: MessageTparameters, aligning with the ROUTER socket's envelope-based messaging pattern.
211-234: Clean streaming DEALER protocol definition.The
StreamingDealerClientProtocolappropriately simplifies the handler signature to justmessage: MessageTsince DEALER sockets don't need to track sender identity. Thesendmethod (vssend_to) correctly reflects that DEALER sends to its connected ROUTER without specifying a destination.
339-358: Factory methods correctly integrate with the protocol pattern.The factory methods properly follow the existing pattern in
CommunicationProtocol. Thecreate_streaming_dealer_clientcorrectly includes theidentityparameter which is essential for DEALER socket identification. Defaultbindvalues (Truefor ROUTER,Falsefor DEALER) align with typical ZMQ usage patterns.tests/unit/zmq/test_streaming_dealer_client.py (6)
18-95: Comprehensive initialization tests with good parametrization.The initialization tests properly validate socket type, identity handling, and socket options. The unused
mock_zmq_contextfixture arguments flagged by static analysis are intentional — they ensure the ZMQ context is mocked via the autouse fixture mechanism before client instantiation.
97-136: Receiver registration tests correctly validate single-handler constraint.Tests properly verify both successful registration and the prevention of duplicate handler registration. This aligns with the implementation that raises
ValueErrorwhen a handler is already registered.
139-202: Send method tests cover key validation scenarios.Good coverage of: successful send, multiple messages, uninitialized socket error, type validation, and send failure propagation. The
NotInitializedErrortest correctly setssocket = Noneto simulate the uninitialized state.
205-314: Background receiver tests validate async behavior correctly.The tests properly exercise the receiver task's lifecycle, message dispatching, exception handling (zmq.Again and generic errors), and graceful cancellation. The use of
asyncio.Future()to block after controlled iterations is an effective pattern for testing background loops.
401-436: Multiple message receiver test validates streaming throughput.The test correctly simulates receiving multiple messages in sequence and verifies all are dispatched to the handler. Using an
asyncio.Eventto signal completion avoids flaky timing-based assertions.
355-365: No issues found—test expectation is correct and verified.The test correctly expects
asyncio.CancelledErrorwith message "Socket was stopped". This is confirmed by the_check_initialized()implementation insrc/aiperf/zmq/zmq_base_client.py(line 60), which raises exactly this exception whenself.stop_requestedis true. Thesend()method calls this check at line 135 ofstreaming_dealer_client.py, ensuring the test behavior aligns with the actual implementation.tests/unit/zmq/test_zmq_base_client.py (4)
33-45: Test correctly updated to reflect deferred socket creation.The renamed test
test_init_stores_correct_paramsand new assertions (socket is None,socket.assert_not_called()) accurately reflect the architectural change where socket creation is deferred toinitialize()rather than happening in__init__.
275-315: Thorough verification of IDENTITY option ordering.The
test_identity_set_before_bind_or_connecttest correctly verifies the critical requirement that IDENTITY must be set before bind/connect. The method call index comparison (identity_idx < action_idx) is a robust way to validate ordering.
317-332: Important test for socket_ops mutation behavior.This test validates that IDENTITY is removed from
socket_opsafter being applied, preventing duplicate application. The assertion that other options (likezmq.IMMEDIATE) remain insocket_opsconfirms the selective removal is working correctly.
373-392: Good coverage of identity format variations.Testing various identity formats including max-length (255 bytes) ensures the implementation handles edge cases. The parametrization approach keeps tests concise while providing comprehensive coverage.
src/aiperf/zmq/streaming_dealer_client.py (3)
73-101: Clean constructor with proper identity handling.The constructor correctly encodes the identity and passes it via
socket_ops. Settingclient_id=identityensures consistent identification across the client lifecycle.
103-117: Single-handler constraint is appropriately enforced.The
register_receivermethod correctly prevents multiple handler registrations, which avoids ambiguity in message dispatch.
124-149: Send method correctly validates and transmits messages.The method properly checks initialization, validates message type, and uses single-frame send (appropriate for DEALER sockets). Error logging before re-raising provides good observability.
src/aiperf/zmq/streaming_router_client.py (4)
84-101: Constructor correctly initializes ROUTER socket.The constructor properly sets up the ROUTER socket with default
bind=True, which is the typical pattern for ROUTER sockets that accept connections from multiple DEALER clients.
103-118: Handler registration follows consistent pattern.The single-handler constraint matches the DEALER client implementation, maintaining consistency across the streaming client family.
125-154: send_to correctly implements ROUTER envelope pattern.The method properly encodes the identity to bytes and uses
send_multipartwith the routing envelope. The tuple unpacking[*routing_envelope, message.to_json_bytes()]is a clean way to construct the multipart message.
172-180: Routing envelope extraction handles edge cases.The fallback to
(b"",)whenlen(data) <= 1provides defensive handling for malformed messages, though in practice DEALER→ROUTER messages should always include identity.
12f9291 to
38ab01b
Compare
Signed-off-by: Anthony Casagrande <[email protected]>
This is for the upcoming Sticky Credit Router. It will allow us to send credits to specific workers.
Summary by CodeRabbit
Release Notes
New Features
Tests
✏️ Tip: You can customize this high-level summary in your review settings.