Skip to content

Commit b4b1b3c

Browse files
committed
Add smoke tests for examples
Signed-off-by: Casper Beyer <[email protected]>
1 parent 614060c commit b4b1b3c

File tree

1 file changed

+263
-0
lines changed

1 file changed

+263
-0
lines changed

nats-client/tests/test_examples.py

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
"""Integration tests for example scripts.
2+
3+
These tests verify that all example scripts work correctly.
4+
"""
5+
6+
import asyncio
7+
import subprocess
8+
import sys
9+
from pathlib import Path
10+
11+
import pytest
12+
from nats.server import Server
13+
14+
15+
@pytest.fixture
16+
def examples_dir() -> Path:
17+
"""Get the examples directory path."""
18+
return Path(__file__).parent.parent / "examples"
19+
20+
21+
@pytest.mark.asyncio
22+
async def test_pub_sub_example(server: Server, examples_dir: Path):
23+
"""Test that nats-pub and nats-sub work together."""
24+
# Start a subscriber in the background
25+
sub_proc = subprocess.Popen(
26+
[
27+
sys.executable,
28+
str(examples_dir / "nats-sub.py"),
29+
"-s",
30+
server.client_url,
31+
"test.subject",
32+
],
33+
stdout=subprocess.PIPE,
34+
stderr=subprocess.PIPE,
35+
text=True,
36+
)
37+
38+
try:
39+
# Give subscriber time to connect
40+
await asyncio.sleep(0.5)
41+
42+
# Publish a message
43+
pub_result = subprocess.run(
44+
[
45+
sys.executable,
46+
str(examples_dir / "nats-pub.py"),
47+
"-s",
48+
server.client_url,
49+
"test.subject",
50+
"Hello from test!",
51+
],
52+
capture_output=True,
53+
text=True,
54+
timeout=5,
55+
)
56+
57+
assert pub_result.returncode == 0
58+
assert "Published [test.subject]" in pub_result.stdout
59+
60+
# Give subscriber time to receive
61+
await asyncio.sleep(0.5)
62+
63+
finally:
64+
sub_proc.terminate()
65+
sub_proc.wait(timeout=2)
66+
67+
68+
@pytest.mark.asyncio
69+
async def test_request_reply_example(server: Server, examples_dir: Path):
70+
"""Test that nats-req and nats-rply work together."""
71+
# Start a replier in the background
72+
rply_proc = subprocess.Popen(
73+
[
74+
sys.executable,
75+
str(examples_dir / "nats-rply.py"),
76+
"-s",
77+
server.client_url,
78+
"test.help",
79+
"I can help!",
80+
],
81+
stdout=subprocess.PIPE,
82+
stderr=subprocess.PIPE,
83+
text=True,
84+
)
85+
86+
try:
87+
# Give replier time to connect
88+
await asyncio.sleep(0.5)
89+
90+
# Send a request
91+
req_result = subprocess.run(
92+
[
93+
sys.executable,
94+
str(examples_dir / "nats-req.py"),
95+
"-s",
96+
server.client_url,
97+
"test.help",
98+
"What is NATS?",
99+
],
100+
capture_output=True,
101+
text=True,
102+
timeout=5,
103+
)
104+
105+
assert req_result.returncode == 0
106+
assert "Published [test.help]" in req_result.stdout
107+
assert "I can help!" in req_result.stdout
108+
109+
finally:
110+
rply_proc.terminate()
111+
rply_proc.wait(timeout=2)
112+
113+
114+
@pytest.mark.asyncio
115+
async def test_echo_example(server: Server, examples_dir: Path):
116+
"""Test that nats-echo works correctly."""
117+
# Start echo service in the background
118+
echo_proc = subprocess.Popen(
119+
[
120+
sys.executable,
121+
str(examples_dir / "nats-echo.py"),
122+
"-s",
123+
server.client_url,
124+
"-id",
125+
"test-echo",
126+
"echo.test",
127+
],
128+
stdout=subprocess.PIPE,
129+
stderr=subprocess.PIPE,
130+
text=True,
131+
)
132+
133+
try:
134+
# Give echo service time to connect
135+
await asyncio.sleep(0.5)
136+
137+
# Test echo functionality
138+
echo_result = subprocess.run(
139+
[
140+
sys.executable,
141+
str(examples_dir / "nats-req.py"),
142+
"-s",
143+
server.client_url,
144+
"echo.test",
145+
"Echo this!",
146+
],
147+
capture_output=True,
148+
text=True,
149+
timeout=5,
150+
)
151+
152+
assert echo_result.returncode == 0
153+
assert "Echo this!" in echo_result.stdout
154+
155+
# Test status endpoint
156+
status_result = subprocess.run(
157+
[
158+
sys.executable,
159+
str(examples_dir / "nats-req.py"),
160+
"-s",
161+
server.client_url,
162+
"echo.test.status",
163+
"",
164+
],
165+
capture_output=True,
166+
text=True,
167+
timeout=5,
168+
)
169+
170+
assert status_result.returncode == 0
171+
assert "test-echo" in status_result.stdout
172+
assert "echo_count" in status_result.stdout
173+
174+
finally:
175+
echo_proc.terminate()
176+
echo_proc.wait(timeout=2)
177+
178+
179+
@pytest.mark.asyncio
180+
async def test_queue_group_example(server: Server, examples_dir: Path):
181+
"""Test that nats-qsub distributes messages across queue members."""
182+
# Start two queue subscribers
183+
qsub1_proc = subprocess.Popen(
184+
[
185+
sys.executable,
186+
str(examples_dir / "nats-qsub.py"),
187+
"-s",
188+
server.client_url,
189+
"test.queue",
190+
"workers",
191+
],
192+
stdout=subprocess.PIPE,
193+
stderr=subprocess.PIPE,
194+
text=True,
195+
)
196+
197+
qsub2_proc = subprocess.Popen(
198+
[
199+
sys.executable,
200+
str(examples_dir / "nats-qsub.py"),
201+
"-s",
202+
server.client_url,
203+
"test.queue",
204+
"workers",
205+
],
206+
stdout=subprocess.PIPE,
207+
stderr=subprocess.PIPE,
208+
text=True,
209+
)
210+
211+
try:
212+
# Give subscribers time to connect
213+
await asyncio.sleep(0.5)
214+
215+
# Publish multiple messages
216+
for i in range(10):
217+
pub_result = subprocess.run(
218+
[
219+
sys.executable,
220+
str(examples_dir / "nats-pub.py"),
221+
"-s",
222+
server.client_url,
223+
"test.queue",
224+
f"Message {i + 1}",
225+
],
226+
capture_output=True,
227+
text=True,
228+
timeout=5,
229+
)
230+
assert pub_result.returncode == 0
231+
232+
# Give subscribers time to receive messages
233+
await asyncio.sleep(1)
234+
235+
finally:
236+
qsub1_proc.terminate()
237+
qsub2_proc.terminate()
238+
qsub1_proc.wait(timeout=2)
239+
qsub2_proc.wait(timeout=2)
240+
241+
242+
@pytest.mark.asyncio
243+
async def test_examples_help_text(examples_dir: Path):
244+
"""Test that all examples have working --help."""
245+
examples = [
246+
"nats-pub.py",
247+
"nats-sub.py",
248+
"nats-qsub.py",
249+
"nats-req.py",
250+
"nats-rply.py",
251+
"nats-echo.py",
252+
]
253+
254+
for example in examples:
255+
result = subprocess.run(
256+
[sys.executable, str(examples_dir / example), "--help"],
257+
capture_output=True,
258+
text=True,
259+
timeout=5,
260+
)
261+
assert result.returncode == 0
262+
assert "usage:" in result.stdout.lower()
263+
assert "--server" in result.stdout or "-s" in result.stdout

0 commit comments

Comments
 (0)