Skip to content

Commit d25fab3

Browse files
committed
Merge branch 'subscribe-bind'
2 parents a2f7f88 + d644c58 commit d25fab3

File tree

2 files changed

+77
-11
lines changed

2 files changed

+77
-11
lines changed

nats/js/client.py

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -128,13 +128,16 @@ async def subscribe(
128128
durable: Optional[str] = None,
129129
stream: Optional[str] = None,
130130
config: Optional[api.ConsumerConfig] = None,
131-
manual_ack: Optional[bool] = False,
132-
ordered_consumer: Optional[bool] = False,
131+
manual_ack: bool = False,
132+
ordered_consumer: bool = False,
133133
idle_heartbeat: Optional[float] = None,
134-
flow_control: Optional[bool] = False,
134+
flow_control: bool = False,
135135
) -> Subscription:
136-
"""
137-
subscribe returns a `Subscription` that is bound to a push based consumer.
136+
"""Create consumer if needed and push-subscribe to it.
137+
138+
1. Check if consumer exists.
139+
2. Creates consumer if needed.
140+
3. Calls `subscribe_bind`.
138141
139142
:param subject: Subject from a stream from JetStream.
140143
:param queue: Deliver group name from a set a of queue subscribers.
@@ -271,17 +274,36 @@ async def cb(msg):
271274
consumer_info = await self._jsm.add_consumer(stream, config=config)
272275
consumer = consumer_info.name
273276

277+
if consumer is None:
278+
raise TypeError("cannot detect consumer")
279+
if config is None:
280+
raise TypeError("config is required for existing durable consumer")
281+
return await self.subscribe_bind(
282+
cb=cb,
283+
stream=stream,
284+
config=config,
285+
manual_ack=manual_ack,
286+
ordered_consumer=ordered_consumer,
287+
consumer=consumer,
288+
)
289+
290+
async def subscribe_bind(
291+
self,
292+
stream: str,
293+
config: api.ConsumerConfig,
294+
consumer: str,
295+
cb: Optional[Callback] = None,
296+
manual_ack: bool = False,
297+
ordered_consumer: bool = False,
298+
) -> Subscription:
299+
"""Push-subscribe to an existing consumer.
300+
"""
274301
# By default, async subscribers wrap the original callback and
275302
# auto ack the messages as they are delivered.
276303
if cb and not manual_ack:
277304
cb = self._auto_ack_callback(cb)
278-
279-
# TODO (@orsinium): too many assumptions, refactor the code above to ensure they hold true.
280-
assert config is not None
281305
if config.deliver_subject is None:
282-
raise TypeError("config.deliver_subject is required")
283-
if consumer is None:
284-
raise TypeError("cannot detect consumer")
306+
config.deliver_subject = self._nc.new_inbox()
285307
sub = await self._nc.subscribe(
286308
subject=config.deliver_subject,
287309
queue=config.deliver_group or "",

tests/test_js.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -866,6 +866,50 @@ async def test_ephemeral_subscribe(self):
866866
assert len(info2.name) > 0
867867
assert info1.name != info2.name
868868

869+
@async_test
870+
async def test_subscribe_bind(self):
871+
nc = await nats.connect()
872+
js = nc.jetstream()
873+
874+
stream_name = "hello-stream"
875+
subject_name = "hello-subject"
876+
consumer_name = "alice"
877+
await js.add_stream(name=stream_name, subjects=[subject_name])
878+
879+
# Create the consumer and assign a deliver subject which
880+
# will then be picked up on bind.
881+
inbox = nc.new_inbox()
882+
config = nats.js.api.ConsumerConfig(deliver_subject=inbox)
883+
consumer_info = await js.add_consumer(
884+
stream=stream_name,
885+
config=config,
886+
durable_name=consumer_name,
887+
)
888+
assert consumer_info.stream_name == stream_name
889+
assert consumer_info.name == consumer_name
890+
assert consumer_info.config.durable_name == consumer_name
891+
892+
# Subscribe using the deliver subject that was chosen before.
893+
sub = await js.subscribe_bind(
894+
stream=consumer_info.stream_name,
895+
consumer=consumer_info.name,
896+
config=consumer_info.config,
897+
)
898+
for i in range(10):
899+
await js.publish(subject_name, f'Hello World {i}'.encode())
900+
901+
msgs = []
902+
for i in range(0, 10):
903+
msg = await sub.next_msg()
904+
msgs.append(msg)
905+
await msg.ack()
906+
assert len(msgs) == 10
907+
assert sub.pending_msgs == 0
908+
909+
info = await sub.consumer_info()
910+
assert info.num_ack_pending == 0
911+
assert info.num_pending == 0
912+
869913

870914
class AckPolicyTest(SingleJetStreamServerTestCase):
871915

0 commit comments

Comments
 (0)