-
Notifications
You must be signed in to change notification settings - Fork 233
Add nats-client package #732
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
381af5b to
781a9bf
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds a new NATS client package with significant performance improvements over the existing nats.aio client. The implementation provides core NATS messaging functionality including publish/subscribe, request/reply, queue groups, and message headers through an asyncio-based Python client.
- Introduces a complete NATS client implementation with high-level API
- Adds comprehensive test coverage for all client features and edge cases
- Includes performance benchmarking tools demonstrating 35x improvement for small messages
Reviewed Changes
Copilot reviewed 20 out of 20 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| nats-client/src/nats/client/init.py | Main client implementation with connection management, messaging, and reconnection logic |
| nats-client/src/nats/client/subscription.py | Subscription class supporting async iteration and context management |
| nats-client/src/nats/client/protocol/message.py | NATS protocol message parsing with support for MSG, HMSG, and control messages |
| nats-client/src/nats/client/protocol/command.py | Command encoding for NATS protocol operations (PUB, SUB, CONNECT, etc.) |
| nats-client/src/nats/client/message.py | Message data structures including Headers, Status, and Message classes |
| nats-client/tests/ | Comprehensive test suite covering client functionality, subscriptions, and protocol handling |
| nats-client/tools/bench.py | Benchmarking tool for performance testing |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
83d0f80 to
44fb982
Compare
781a9bf to
9a8cd71
Compare
|
Move this to be under |
philpennock
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see anything in the SSL context init to handle the server identity for TLS identity verification, when we learnt a reconnect address as an IP from the INFO line?
If we connect with a hostname originally, we validate that hostname in the cert, and if we reconnect to a learnt IP address, we validate that same original hostname as present in the new server cert. Did I just miss the handling of that?
accfd1c to
20963ae
Compare
9a8cd71 to
0094dba
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 21 out of 22 changed files in this pull request and generated 4 comments.
Comments suppressed due to low confidence (1)
.github/workflows/test.yml:1
- Git merge conflict markers are present in the CI configuration file. These need to be resolved before merging.
name: test
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 21 out of 22 changed files in this pull request and generated no new comments.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
16d8621 to
7acfd1f
Compare
wallyqs
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change the import to nats.experimental.aio.client while iterating on the implementation, then can work using that namespace in the other branches with the JetStream changes.
So Also would make all the currently pending PRs painful to merge and also the ones that aren't opened yet (e.g client auth, client tls, jetstream ordered consumer). Not sure what the benefit is of such a rename? |
2e13fe5 to
3cd0ce1
Compare
|
|
||
| for callback in subscription._callbacks: | ||
| try: | ||
| callback(msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
callback runs in place in the read loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, only synchronous callbacks allowed. If user wants to perform an asynchronous task, up to them to spawn one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so if you have two subscriptions, one callback processing will cause head of line blocking on the other subscription no? unless you create a task per msg so that it doesn't?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't await as it introduces a bunch of overhead, will basically stall everything to a halt. If the user chooses to do heavy processing in a callback, then they are responsible for scheduling that in a way that makes sense.
Ideally, you will use async for await, callbacks are for dispatching of messages as they arrive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I understand how this is expected to be used, can you try to implement this example?
import asyncio
import time
import nats
async def main():
nc = await nats.connect("nats://localhost:4222")
async def cb1(msg):
print(time.time_ns(), " [1] --->", msg)
await asyncio.sleep(5)
sub2 = await nc.subscribe("foo", cb=cb1)
async def cb2(msg):
print(time.time_ns(), "[2] --->", msg)
await asyncio.sleep(1)
sub2 = await nc.subscribe("bar", cb=cb2)
async def task1():
while True:
await nc.publish("foo", b'1')
await asyncio.sleep(1)
async def task2():
while True:
await nc.publish("bar", b'2')
await asyncio.sleep(0.5)
t1 = asyncio.create_task(task1())
t2 = asyncio.create_task(task2())
await asyncio.sleep(120)
await nc.close()
if __name__ == "__main__":
asyncio.run(main())I just see a lot of blocking happening using this model, starting point for example:
import asyncio
import time
from nats.client import connect
async def main():
nc = await connect("nats://localhost:4222")
def cb1(msg):
print(time.time_ns(), " [1] --->", msg)
# can't async await or do i/o since that blocks the event loop
time.sleep(5)
sub2 = await nc.subscribe("foo", callback=cb1)
def cb2(msg):
print(time.time_ns(), "[2] --->", msg)
# e.g. do some work
for i in range(1, 50000000):
pass
sub2 = await nc.subscribe("foo", callback=cb2)
async def task1():
while True:
await nc.publish("foo", b'1')
await asyncio.sleep(1)
async def task2():
while True:
await nc.publish("foo", b'2')
await asyncio.sleep(0.5)
t1 = asyncio.create_task(task1())
t2 = asyncio.create_task(task2())
await asyncio.sleep(120)
await nc.close()
if __name__ == "__main__":
asyncio.run(main())There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this is using sub.messages, same bench script for both clients.
Using sub.next_msg is about 2x slower.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have you tried the nats/benchmark/sub_perf.py in the repo?
I get this for example:
# nats bench pub test --size 64
python3 nats/benchmark/sub_perf.py --servers nats://localhost:4222
Waiting for 100000 messages on [test]...
****************************************************************************************************
Test completed : 337487.6190604489 msgs/sec sent
Received 100000 messages (337487.6190604489 msgs/sec)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same changes just addressing async sub iterator: #752
# nats bench pub test --size 64
uv run nats/benchmark/sub_perf_messages.py --servers nats://localhost:4222
Waiting for 100000 messages on [test]...
****************************************************************************************************
Test completed : 316181.45606272004 msgs/sec sent
Received 100000 messages (316181.45606272004 msgs/sec)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just sub tho? the above benchmark is pub sub, see tools/bench.py, you can run both clients with it.
Tried running with the patch and got 8000 messages per sec (slow consumers triggered).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what are you running exactly? I get this with that script:
uv run tools/bench.py --client aio --msgs 1000000 --size 128 --pub --sub
Starting pub/sub benchmark with nats.aio [msgs=1,000,000, size=128 B]
Publisher results:
Test completed: 1,000,000 messages, 128,000,000 bytes, 4.44 seconds
Throughput: 224,990 msgs/sec, 27.46 MB/sec
Latency: (min/avg/max/std) = 0.00/0.00/60.50/0.42 ms
Subscriber results:
Test completed: 1,000,000 messages, 128,000,000 bytes, 4.51 seconds
Throughput: 221,720 msgs/sec, 27.07 MB/sec
Latency: (min/avg/max/std) = 93.17/2306.20/4603.37/1284.93 ms
| logger.exception("Error in subscription callback") | ||
|
|
||
| try: | ||
| await subscription.queue.put(msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same with callback dispatch, isn't this blocking the read loop? I guess no since it is unbound?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With an unbound queue, not an issue. With slow consumer wiring it will eventually be put_nowait.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where is client slow consumer handled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently isn't.
| else: | ||
| logger.debug("->> SUB %s %s", subject, sid) | ||
|
|
||
| await self._connection.write(command) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
subs are sync writes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, PUB and HPUB are buffered.
PING, PONG, SUB are unbuffered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 21 out of 22 changed files in this pull request and generated 5 comments.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
It can be either @wallyqs, but using versions is a better way to signal stability than changing package and namespace names imo. And again, I still have a lot of work to push up here, not limited to but including auth and mtls. Other pull requests like everything on jetstream that are here now are stuck in rebase hell. |
@caspervonb there doesn't have to be, honestly I'm not finding the rewrite sound yet, although there are good ideas and like the repo structure they could have been backported to what we have in nats-py and instead close some of the opened issues as I mentioned a few times already. |
bc5924c to
777a0e0
Compare
|
Removed performance section from README, let's generate this data from CI instead. |
6eb24dc to
bd8a5f0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 56 out of 57 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Performance BenchmarksI did some further optimization and comparison. Test EnvironmentHardware:
Software:
Test Parameters:
Complete Results Table (Ordered by Subscriber Performance)
Key Findings1. Performance RankingsBy Subscriber Throughput:
2. Message Loss Analysisnats.aio Queue Mode - Critical Issue:
All other configurations: 0% message loss ✅ 3. Runtime ComparisonPyPy vs CPython (nats-client callback):
4. Implementation Comparisonnats-client vs nats.aio (callback mode):
Callback vs Queue (nats-client):
Performance ComparisonCallback Mode (Subscriber Throughput)
Queue Mode (Subscriber Throughput)
* nats.aio queue mode experiences severe message loss (47-96%) Summary
Benchmark Commandscd nats-client
# Individual tests
uv run python tools/bench.py --client client --messages 1000000 --size 128 [--callback]
uv run python tools/bench.py --client aio --messages 1000000 --size 128 [--callback] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 56 out of 57 changed files in this pull request and generated 28 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Dismissing this review blocked about naming, as the client is already in a separate nats-client package and will be published independently (likely as nats-core as that's the package name I currently sit on and nats-client).
Since it's not part of the nats.py namespace, the experimental designation and nested import path are unnecessary.
This PR has been open for 60 days and we need to move forward.
Jarema
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considering it is a whole new package, not touching or affecting current client, we should merge it.
LGTM.
ee828b2 to
6d6c281
Compare
Signed-off-by: Casper Beyer <[email protected]>
6d6c281 to
22cbc2a
Compare
This adds a standalone nats.client package
Main differences in
nats.clientcompared tonats.aio