Skip to content

Commit 8626364

Browse files
committed
Add obj get perf bench
Signed-off-by: Waldemar Quevedo <[email protected]>
1 parent 2e4432e commit 8626364

File tree

1 file changed

+249
-0
lines changed

1 file changed

+249
-0
lines changed

nats/benchmark/obj_fetch_perf.py

Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
import argparse
2+
import asyncio
3+
import os
4+
import sys
5+
import time
6+
7+
import nats
8+
9+
try:
10+
import uvloop
11+
12+
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
13+
except ImportError:
14+
pass
15+
16+
DEFAULT_NUM_FETCHES = 10
17+
DEFAULT_TIMEOUT = 30
18+
DEFAULT_BUCKET = ""
19+
DEFAULT_OBJECT = ""
20+
21+
22+
class ProgressFileWrapper:
23+
"""
24+
A file wrapper that shows download progress as data is written.
25+
"""
26+
27+
def __init__(self, file_obj, total_size: int, object_name: str):
28+
self.file = file_obj
29+
self.total_size = total_size
30+
self.object_name = object_name
31+
self.bytes_written = 0
32+
self.last_progress = -1
33+
self.start_time = time.time()
34+
35+
def write(self, data):
36+
"""Write data to file and update progress."""
37+
result = self.file.write(data)
38+
self.bytes_written += len(data)
39+
self._update_progress()
40+
return result
41+
42+
def _update_progress(self):
43+
"""Update progress display."""
44+
if self.total_size <= 0:
45+
return
46+
47+
progress = int((self.bytes_written / self.total_size) * 100)
48+
49+
# Only update every 5% to avoid too much output
50+
if progress >= self.last_progress + 5:
51+
elapsed = time.time() - self.start_time
52+
if elapsed > 0:
53+
speed_mbps = (self.bytes_written / (1024 * 1024)) / elapsed
54+
mb_written = self.bytes_written / (1024 * 1024)
55+
mb_total = self.total_size / (1024 * 1024)
56+
57+
# Clear the current line and show progress
58+
print(
59+
f"\r {self.object_name}: {progress:3d}% ({mb_written:.1f}/{mb_total:.1f} MB) @ {speed_mbps:.1f} MB/s",
60+
end="",
61+
flush=True,
62+
)
63+
self.last_progress = progress
64+
65+
def __getattr__(self, name):
66+
"""Delegate other attributes to the wrapped file."""
67+
return getattr(self.file, name)
68+
69+
70+
def show_usage():
71+
message = """
72+
Usage: obj_fetch_perf [options]
73+
74+
options:
75+
-n COUNT Number of fetches to perform (default: 10)
76+
-b BUCKET Object store bucket name
77+
-o OBJECT Object name to fetch
78+
-t TIMEOUT Timeout per fetch in seconds (default: 30)
79+
-f FILE Write to file (streaming mode, memory efficient)
80+
--overwrite Overwrite output file if it exists
81+
--servers SERVERS NATS server URLs (default: nats://demo.nats.io:4222)
82+
"""
83+
print(message)
84+
85+
86+
def show_usage_and_die():
87+
show_usage()
88+
sys.exit(1)
89+
90+
91+
async def main():
92+
parser = argparse.ArgumentParser()
93+
parser.add_argument("-n", "--count", default=DEFAULT_NUM_FETCHES, type=int)
94+
parser.add_argument("-b", "--bucket", default=DEFAULT_BUCKET)
95+
parser.add_argument("-o", "--object", default=DEFAULT_OBJECT)
96+
parser.add_argument("-t", "--timeout", default=DEFAULT_TIMEOUT, type=int)
97+
parser.add_argument("-f", "--file", help="Write to file (streaming mode)")
98+
parser.add_argument("--overwrite", action="store_true", help="Overwrite output file if it exists")
99+
parser.add_argument("--servers", default=[], action="append")
100+
args = parser.parse_args()
101+
102+
servers = args.servers
103+
if len(args.servers) < 1:
104+
servers = ["nats://demo.nats.io:4222"]
105+
106+
print(f"Connecting to NATS servers: {servers}")
107+
108+
# Connect to NATS with JetStream
109+
try:
110+
nc = await nats.connect(servers, pending_size=1024 * 1024)
111+
js = nc.jetstream()
112+
except Exception as e:
113+
sys.stderr.write(f"ERROR: Failed to connect to NATS: {e}\n")
114+
show_usage_and_die()
115+
116+
# Get object store
117+
try:
118+
obs = await js.object_store(bucket=args.bucket)
119+
print(f"Connected to object store bucket: {args.bucket}")
120+
except Exception as e:
121+
sys.stderr.write(f"ERROR: Failed to access object store bucket '{args.bucket}': {e}\n")
122+
await nc.close()
123+
sys.exit(1)
124+
125+
# Get object info first to verify it exists and show stats
126+
try:
127+
info = await obs.get_info(args.object)
128+
size_mb = info.size / (1024 * 1024)
129+
print(f"Object: {args.object}")
130+
print(f"Size: {info.size} bytes ({size_mb:.2f} MB)")
131+
print(f"Chunks: {info.chunks}")
132+
print(f"Description: {info.description}")
133+
print()
134+
except Exception as e:
135+
sys.stderr.write(f"ERROR: Failed to get object info for '{args.object}': {e}\n")
136+
await nc.close()
137+
sys.exit(1)
138+
139+
# Handle file output setup
140+
if args.file:
141+
if os.path.exists(args.file) and not args.overwrite:
142+
sys.stderr.write(f"ERROR: File '{args.file}' already exists. Use --overwrite to replace it.\n")
143+
await nc.close()
144+
sys.exit(1)
145+
146+
# For multiple fetches with file output, append a counter
147+
if args.count > 1:
148+
base, ext = os.path.splitext(args.file)
149+
print(f"Multiple fetches with file output - files will be named: {base}_1{ext}, {base}_2{ext}, etc.")
150+
else:
151+
print(f"Streaming output to file: {args.file}")
152+
print()
153+
154+
# Start the benchmark
155+
print(f"Starting benchmark: fetching '{args.object}' {args.count} times")
156+
if args.file:
157+
print("Progress (streaming to file):")
158+
else:
159+
print("Progress: ", end="", flush=True)
160+
161+
start = time.time()
162+
total_bytes = 0
163+
successful_fetches = 0
164+
failed_fetches = 0
165+
166+
for i in range(args.count):
167+
try:
168+
# Determine output file for this fetch
169+
current_file = None
170+
if args.file:
171+
if args.count > 1:
172+
base, ext = os.path.splitext(args.file)
173+
current_file = f"{base}_{i + 1}{ext}"
174+
else:
175+
current_file = args.file
176+
177+
# Fetch the object
178+
if current_file:
179+
# Stream to file with progress tracking
180+
with open(current_file, "wb") as f:
181+
# Wrap the file with progress tracker
182+
progress_wrapper = ProgressFileWrapper(f, info.size, args.object)
183+
result = await asyncio.wait_for(
184+
obs.get(args.object, writeinto=progress_wrapper), timeout=args.timeout
185+
)
186+
# Get file size for stats
187+
fetch_bytes = os.path.getsize(current_file)
188+
# Ensure we show 100% completion
189+
if progress_wrapper.bytes_written > 0:
190+
print(
191+
f"\r 📥 {args.object}: 100% ({fetch_bytes / (1024 * 1024):.1f}/{info.size / (1024 * 1024):.1f} MB) ✓"
192+
)
193+
else:
194+
# Load into memory
195+
result = await asyncio.wait_for(obs.get(args.object), timeout=args.timeout)
196+
fetch_bytes = len(result.data)
197+
198+
total_bytes += fetch_bytes
199+
successful_fetches += 1
200+
201+
# Show simple progress for in-memory mode
202+
if not current_file:
203+
print("#", end="", flush=True)
204+
205+
except asyncio.TimeoutError:
206+
failed_fetches += 1
207+
if args.file:
208+
print(f"\r{args.object}: Timeout after {args.timeout}s")
209+
else:
210+
print("T", end="", flush=True) # T for timeout
211+
except Exception as e:
212+
failed_fetches += 1
213+
if args.file:
214+
print(f"\r{args.object}: Error - {str(e)[:50]}")
215+
else:
216+
print("E", end="", flush=True) # E for error
217+
if i == 0: # Show first error for debugging
218+
sys.stderr.write(f"\nFirst fetch error: {e}\n")
219+
220+
# Small pause between fetches
221+
await asyncio.sleep(0.01)
222+
223+
elapsed = time.time() - start
224+
225+
print("\n\nBenchmark Results:")
226+
print("=================")
227+
if args.file:
228+
print("Mode: Streaming to file(s) (memory efficient)")
229+
else:
230+
print("Mode: In-memory loading")
231+
print(f"Total time: {elapsed:.2f} seconds")
232+
print(f"Successful fetches: {successful_fetches}/{args.count}")
233+
print(f"Failed fetches: {failed_fetches}")
234+
235+
if successful_fetches > 0:
236+
avg_time = elapsed / successful_fetches
237+
mbytes_per_sec = (total_bytes / elapsed) / (1024 * 1024)
238+
fetches_per_sec = successful_fetches / elapsed
239+
240+
print(f"Average fetch time: {avg_time:.3f} seconds")
241+
print(f"Fetches per second: {fetches_per_sec:.2f}")
242+
print(f"Throughput: {mbytes_per_sec:.2f} MB/sec")
243+
print(f"Total data transferred: {total_bytes / (1024 * 1024):.2f} MB")
244+
245+
await nc.close()
246+
247+
248+
if __name__ == "__main__":
249+
asyncio.run(main())

0 commit comments

Comments
 (0)