Skip to content

Commit 170ae2c

Browse files
authored
Add per-message TTL support for KV operations (#783)
* Add per-message TTL support for KV operations --------- Signed-off-by: Casper Beyer <[email protected]>
1 parent 9afa315 commit 170ae2c

File tree

3 files changed

+222
-9
lines changed

3 files changed

+222
-9
lines changed

nats/src/nats/js/client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1403,6 +1403,7 @@ async def create_key_value(
14031403
subjects=[f"$KV.{config.bucket}.>"],
14041404
allow_direct=config.direct,
14051405
allow_rollup_hdrs=True,
1406+
allow_msg_ttl=True,
14061407
deny_delete=True,
14071408
discard=api.DiscardPolicy.NEW,
14081409
duplicate_window=duplicate_window,

nats/src/nats/js/kv.py

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -193,23 +193,34 @@ async def put(self, key: str, value: bytes, validate_keys: bool = True) -> int:
193193
"""
194194
put will place the new value for the key into the store
195195
and return the revision number.
196+
197+
Note: This method does not support TTL. Use create() if you need TTL support.
198+
199+
:param key: The key to put
200+
:param value: The value to store
201+
:param validate_keys: Whether to validate the key format
196202
"""
197203
if validate_keys and not _is_key_valid(key):
198204
raise nats.js.errors.InvalidKeyError(key)
199205

200206
pa = await self._js.publish(f"{self._pre}{key}", value)
201207
return pa.seq
202208

203-
async def create(self, key: str, value: bytes, validate_keys: bool = True) -> int:
209+
async def create(self, key: str, value: bytes, validate_keys: bool = True, msg_ttl: Optional[float] = None) -> int:
204210
"""
205211
create will add the key/value pair iff it does not exist.
212+
213+
:param key: The key to create
214+
:param value: The value to store
215+
:param validate_keys: Whether to validate the key format
216+
:param msg_ttl: Optional TTL (time-to-live) in seconds for this specific message
206217
"""
207218
if validate_keys and not _is_key_valid(key):
208219
raise nats.js.errors.InvalidKeyError(key)
209220

210221
pa = None
211222
try:
212-
pa = await self.update(key, value, last=0, validate_keys=validate_keys)
223+
pa = await self.update(key, value, last=0, validate_keys=validate_keys, msg_ttl=msg_ttl)
213224
except nats.js.errors.KeyWrongLastSequenceError as err:
214225
# In case of attempting to recreate an already deleted key,
215226
# the client would get a KeyWrongLastSequenceError. When this happens,
@@ -229,13 +240,25 @@ async def create(self, key: str, value: bytes, validate_keys: bool = True) -> in
229240
# to recreate using the last revision.
230241
raise err
231242
except nats.js.errors.KeyDeletedError as err:
232-
pa = await self.update(key, value, last=err.entry.revision, validate_keys=validate_keys)
243+
pa = await self.update(
244+
key, value, last=err.entry.revision, validate_keys=validate_keys, msg_ttl=msg_ttl
245+
)
233246

234247
return pa
235248

236-
async def update(self, key: str, value: bytes, last: Optional[int] = None, validate_keys: bool = True) -> int:
249+
async def update(
250+
self,
251+
key: str,
252+
value: bytes,
253+
last: Optional[int] = None,
254+
validate_keys: bool = True,
255+
msg_ttl: Optional[float] = None,
256+
) -> int:
237257
"""
238258
update will update the value if the latest revision matches.
259+
260+
Note: TTL parameter is accepted for internal use by create(), but should not be
261+
used directly on update operations per NATS KV semantics.
239262
"""
240263
if validate_keys and not _is_key_valid(key):
241264
raise nats.js.errors.InvalidKeyError(key)
@@ -247,7 +270,7 @@ async def update(self, key: str, value: bytes, last: Optional[int] = None, valid
247270

248271
pa = None
249272
try:
250-
pa = await self._js.publish(f"{self._pre}{key}", value, headers=hdrs)
273+
pa = await self._js.publish(f"{self._pre}{key}", value, headers=hdrs, msg_ttl=msg_ttl)
251274
except nats.js.errors.APIError as err:
252275
# Check for a BadRequest::KeyWrongLastSequenceError error code.
253276
if err.err_code == 10071:
@@ -256,9 +279,16 @@ async def update(self, key: str, value: bytes, last: Optional[int] = None, valid
256279
raise err
257280
return pa.seq
258281

259-
async def delete(self, key: str, last: Optional[int] = None, validate_keys: bool = True) -> bool:
282+
async def delete(
283+
self, key: str, last: Optional[int] = None, validate_keys: bool = True, msg_ttl: Optional[float] = None
284+
) -> bool:
260285
"""
261286
delete will place a delete marker and remove all previous revisions.
287+
288+
:param key: The key to delete
289+
:param last: Expected last revision number (for optimistic concurrency)
290+
:param validate_keys: Whether to validate the key format
291+
:param msg_ttl: Optional TTL (time-to-live) in seconds for the delete marker
262292
"""
263293
if validate_keys and not _is_key_valid(key):
264294
raise nats.js.errors.InvalidKeyError(key)
@@ -269,17 +299,20 @@ async def delete(self, key: str, last: Optional[int] = None, validate_keys: bool
269299
if last and last > 0:
270300
hdrs[api.Header.EXPECTED_LAST_SUBJECT_SEQUENCE] = str(last)
271301

272-
await self._js.publish(f"{self._pre}{key}", headers=hdrs)
302+
await self._js.publish(f"{self._pre}{key}", headers=hdrs, msg_ttl=msg_ttl)
273303
return True
274304

275-
async def purge(self, key: str) -> bool:
305+
async def purge(self, key: str, msg_ttl: Optional[float] = None) -> bool:
276306
"""
277307
purge will remove the key and all revisions.
308+
309+
:param key: The key to purge
310+
:param msg_ttl: Optional TTL (time-to-live) in seconds for the purge marker
278311
"""
279312
hdrs = {}
280313
hdrs[KV_OP] = KV_PURGE
281314
hdrs[api.Header.ROLLUP] = MSG_ROLLUP_SUBJECT
282-
await self._js.publish(f"{self._pre}{key}", headers=hdrs)
315+
await self._js.publish(f"{self._pre}{key}", headers=hdrs, msg_ttl=msg_ttl)
283316
return True
284317

285318
async def purge_deletes(self, olderthan: int = 30 * 60) -> bool:

nats/tests/test_js.py

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3830,6 +3830,185 @@ async def error_handler(e):
38303830
# Clean up
38313831
await nc.close()
38323832

3833+
@async_test
3834+
async def test_kv_create_with_ttl(self):
3835+
"""Test that create() supports msg_ttl parameter"""
3836+
errors = []
3837+
3838+
async def error_handler(e):
3839+
print("Error:", e, type(e))
3840+
errors.append(e)
3841+
3842+
nc = await nats.connect(error_cb=error_handler)
3843+
3844+
server_version = nc.connected_server_version
3845+
if server_version.major == 2 and server_version.minor < 11:
3846+
pytest.skip("per-message TTL requires nats-server v2.11.0 or later")
3847+
3848+
js = nc.jetstream()
3849+
3850+
# Create a KV bucket
3851+
kv = await js.create_key_value(bucket="TEST_TTL_CREATE", history=5)
3852+
3853+
# Create a key with TTL of 2 seconds
3854+
seq = await kv.create("age", b"30", msg_ttl=2.0)
3855+
assert seq == 1
3856+
3857+
# Key should exist immediately
3858+
entry = await kv.get("age")
3859+
assert entry.key == "age"
3860+
assert entry.value == b"30"
3861+
assert entry.revision == 1
3862+
3863+
# Wait for TTL to expire (2 seconds + buffer)
3864+
await asyncio.sleep(2.5)
3865+
3866+
# Key should be gone after TTL expires
3867+
with pytest.raises(KeyNotFoundError):
3868+
await kv.get("age")
3869+
3870+
await nc.close()
3871+
3872+
@async_test
3873+
async def test_kv_purge_with_ttl(self):
3874+
"""Test that purge() supports msg_ttl parameter for the purge marker"""
3875+
errors = []
3876+
3877+
async def error_handler(e):
3878+
print("Error:", e, type(e))
3879+
errors.append(e)
3880+
3881+
nc = await nats.connect(error_cb=error_handler)
3882+
3883+
server_version = nc.connected_server_version
3884+
if server_version.major == 2 and server_version.minor < 11:
3885+
pytest.skip("per-message TTL requires nats-server v2.11.0 or later")
3886+
3887+
js = nc.jetstream()
3888+
3889+
# Create a KV bucket
3890+
kv = await js.create_key_value(bucket="TEST_TTL_PURGE", history=10)
3891+
3892+
# Put a key
3893+
seq = await kv.put("name", b"alice")
3894+
assert seq == 1
3895+
3896+
# Purge with TTL of 2 seconds on the purge marker
3897+
await kv.purge("name", msg_ttl=2.0)
3898+
3899+
# Key should be purged immediately
3900+
with pytest.raises(KeyNotFoundError):
3901+
await kv.get("name")
3902+
3903+
# The purge marker should exist in the stream
3904+
# We can verify by checking stream info - there should be a message
3905+
status = await kv.status()
3906+
# After purge, there should still be a marker message
3907+
assert status.values >= 1
3908+
3909+
# Wait for the purge marker TTL to expire (2 seconds + buffer)
3910+
await asyncio.sleep(2.5)
3911+
3912+
# The marker itself should now be removed from the stream
3913+
# Note: This behavior depends on server version and configuration
3914+
3915+
await nc.close()
3916+
3917+
@async_test
3918+
async def test_kv_delete_with_ttl(self):
3919+
"""Test that delete() supports msg_ttl parameter for the delete marker"""
3920+
errors = []
3921+
3922+
async def error_handler(e):
3923+
print("Error:", e, type(e))
3924+
errors.append(e)
3925+
3926+
nc = await nats.connect(error_cb=error_handler)
3927+
3928+
server_version = nc.connected_server_version
3929+
if server_version.major == 2 and server_version.minor < 11:
3930+
pytest.skip("per-message TTL requires nats-server v2.11.0 or later")
3931+
3932+
js = nc.jetstream()
3933+
3934+
# Create a KV bucket
3935+
kv = await js.create_key_value(bucket="TEST_TTL_DELETE", history=10)
3936+
3937+
# Put a key
3938+
seq = await kv.put("city", b"paris")
3939+
assert seq == 1
3940+
3941+
# Verify the key exists
3942+
entry = await kv.get("city")
3943+
assert entry.value == b"paris"
3944+
3945+
# Delete with TTL of 2 seconds on the delete marker
3946+
await kv.delete("city", msg_ttl=2.0)
3947+
3948+
# Key should be deleted immediately
3949+
with pytest.raises(KeyNotFoundError):
3950+
await kv.get("city")
3951+
3952+
# The delete marker should exist in the stream
3953+
status = await kv.status()
3954+
# After delete, there should be both the original message and delete marker
3955+
assert status.values >= 1
3956+
3957+
# Wait for the delete marker TTL to expire (2 seconds + buffer)
3958+
await asyncio.sleep(2.5)
3959+
3960+
# The marker itself should now be removed from the stream
3961+
# Note: This behavior depends on server version and configuration
3962+
3963+
await nc.close()
3964+
3965+
@async_test
3966+
async def test_kv_put_no_ttl(self):
3967+
"""Test that put() does NOT support TTL (should not have msg_ttl parameter)"""
3968+
nc = await nats.connect()
3969+
js = nc.jetstream()
3970+
3971+
# Create a KV bucket
3972+
kv = await js.create_key_value(bucket="TEST_NO_TTL_PUT", history=5)
3973+
3974+
# Put should work normally without TTL
3975+
seq = await kv.put("key1", b"value1")
3976+
assert seq == 1
3977+
3978+
# Verify put() method signature doesn't accept msg_ttl
3979+
# This is a compile-time check - if this test compiles, the signature is correct
3980+
import inspect
3981+
3982+
sig = inspect.signature(kv.put)
3983+
params = list(sig.parameters.keys())
3984+
assert "msg_ttl" not in params, "put() should not accept msg_ttl parameter"
3985+
3986+
await nc.close()
3987+
3988+
@async_test
3989+
async def test_kv_update_no_direct_ttl(self):
3990+
"""Test that update() does not expose TTL for direct use"""
3991+
nc = await nats.connect()
3992+
js = nc.jetstream()
3993+
3994+
# Create a KV bucket
3995+
kv = await js.create_key_value(bucket="TEST_NO_TTL_UPDATE", history=5)
3996+
3997+
# Put initial value
3998+
seq = await kv.put("counter", b"1")
3999+
assert seq == 1
4000+
4001+
# Update should work normally
4002+
seq = await kv.update("counter", b"2", last=1)
4003+
assert seq == 2
4004+
4005+
# While update() technically has msg_ttl parameter for internal use by create(),
4006+
# it's documented as not for direct use
4007+
entry = await kv.get("counter")
4008+
assert entry.value == b"2"
4009+
4010+
await nc.close()
4011+
38334012

38344013
class ObjectStoreTest(SingleJetStreamServerTestCase):
38354014
@async_test

0 commit comments

Comments
 (0)