From aea92608118fe7edb7a04a2bd67c05d350ce61dc Mon Sep 17 00:00:00 2001 From: ltoniazzi Date: Fri, 21 Nov 2025 12:54:56 +0000 Subject: [PATCH 1/6] Wrap in executor the sync methods --- .../langgraph_checkpoint_aws/agentcore/saver.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/agentcore/saver.py b/libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/agentcore/saver.py index ec1b396f..11d72944 100644 --- a/libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/agentcore/saver.py +++ b/libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/agentcore/saver.py @@ -8,7 +8,7 @@ from collections.abc import AsyncIterator, Iterator, Sequence from typing import Any, TypeAlias, cast -from langchain_core.runnables import RunnableConfig +from langchain_core.runnables import RunnableConfig, run_in_executor from langgraph.checkpoint.base import ( BaseCheckpointSaver, ChannelVersions, @@ -273,9 +273,9 @@ def delete_thread(self, thread_id: str, actor_id: str = "") -> None: """Delete all checkpoints and writes associated with a thread.""" self.checkpoint_event_client.delete_events(thread_id, actor_id) - # ===== Async methods ( TODO: NOT IMPLEMENTED YET ) ===== + # ===== Async methods ( TODO: Check running sync methods inside executor ) ===== async def aget_tuple(self, config: RunnableConfig) -> CheckpointTuple | None: - return self.get_tuple(config) + return await run_in_executor(None, self.get_tuple, config) async def alist( self, @@ -285,7 +285,7 @@ async def alist( before: RunnableConfig | None = None, limit: int | None = None, ) -> AsyncIterator[CheckpointTuple]: - for item in self.list(config, filter=filter, before=before, limit=limit): + for item in await run_in_executor(None, self.list, config, filter=filter, before=before, limit=limit): yield item async def aput( @@ -295,7 +295,10 @@ async def aput( metadata: CheckpointMetadata, new_versions: ChannelVersions, ) -> RunnableConfig: - return self.put(config, checkpoint, metadata, new_versions) + # return self.put(config, checkpoint, metadata, new_versions) + return await run_in_executor( + None, self.put, config, checkpoint, metadata, new_versions + ) async def aput_writes( self, @@ -304,7 +307,9 @@ async def aput_writes( task_id: str, task_path: str = "", ) -> None: - return self.put_writes(config, writes, task_id, task_path) + return await run_in_executor( + None, self.put_writes, config, writes, task_id, task_path + ) async def adelete_thread(self, thread_id: str, actor_id: str = "") -> None: self.delete_thread(thread_id, actor_id) From 7192149fb56dbac56b10c3b45340aed022b7c5dc Mon Sep 17 00:00:00 2001 From: ltoniazzi Date: Sun, 23 Nov 2025 11:08:41 +0000 Subject: [PATCH 2/6] Add tests --- .../agentcore/saver.py | 8 +- .../tests/unit_tests/agentcore/test_saver.py | 263 ++++++++++++++++++ 2 files changed, 268 insertions(+), 3 deletions(-) diff --git a/libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/agentcore/saver.py b/libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/agentcore/saver.py index 11d72944..cef74253 100644 --- a/libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/agentcore/saver.py +++ b/libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/agentcore/saver.py @@ -273,7 +273,7 @@ def delete_thread(self, thread_id: str, actor_id: str = "") -> None: """Delete all checkpoints and writes associated with a thread.""" self.checkpoint_event_client.delete_events(thread_id, actor_id) - # ===== Async methods ( TODO: Check running sync methods inside executor ) ===== + # ===== Async methods ( Running sync methods inside executor ) ===== async def aget_tuple(self, config: RunnableConfig) -> CheckpointTuple | None: return await run_in_executor(None, self.get_tuple, config) @@ -285,7 +285,9 @@ async def alist( before: RunnableConfig | None = None, limit: int | None = None, ) -> AsyncIterator[CheckpointTuple]: - for item in await run_in_executor(None, self.list, config, filter=filter, before=before, limit=limit): + for item in await run_in_executor( + None, self.list, config, filter=filter, before=before, limit=limit + ): yield item async def aput( @@ -312,7 +314,7 @@ async def aput_writes( ) async def adelete_thread(self, thread_id: str, actor_id: str = "") -> None: - self.delete_thread(thread_id, actor_id) + await run_in_executor(None, self.delete_thread, thread_id, actor_id) return None def get_next_version( diff --git a/libs/langgraph-checkpoint-aws/tests/unit_tests/agentcore/test_saver.py b/libs/langgraph-checkpoint-aws/tests/unit_tests/agentcore/test_saver.py index 1de51219..f4c34a4b 100644 --- a/libs/langgraph-checkpoint-aws/tests/unit_tests/agentcore/test_saver.py +++ b/libs/langgraph-checkpoint-aws/tests/unit_tests/agentcore/test_saver.py @@ -2,7 +2,9 @@ Unit tests for AgentCore Memory Checkpoint Saver. """ +import asyncio import json +import time from unittest.mock import ANY, MagicMock, Mock, patch import pytest @@ -30,6 +32,63 @@ ) from langgraph_checkpoint_aws.agentcore.saver import AgentCoreMemorySaver +# Configure pytest to use anyio for async tests (asyncio backend only) +pytestmark = pytest.mark.anyio + +# Test constants for async testing +N_ASYNC_CALLS = 10 +MOCK_SLEEP_DURATION = 0.5 / N_ASYNC_CALLS +OVERHEAD_RUNNER_TIME = 0.05 +TOTAL_EXPECTED_TIME = MOCK_SLEEP_DURATION + OVERHEAD_RUNNER_TIME + + +# Mock helper functions for async testing +def _create_mock_checkpoint_tuple( + thread_id="test-thread", checkpoint_id="test-checkpoint" +): + """Helper to create a mock checkpoint tuple with configurable IDs.""" + mock_tuple = MagicMock() + mock_tuple.config = { + "configurable": { + "thread_id": thread_id, + "actor_id": "test-actor", + "checkpoint_id": checkpoint_id, + } + } + mock_tuple.checkpoint = {"id": checkpoint_id} + mock_tuple.metadata = {"source": "input", "step": 0} + return mock_tuple + + +def slow_get_tuple(config): # noqa: ARG001 + """Mock get_tuple with artificial delay for testing async concurrency.""" + time.sleep(MOCK_SLEEP_DURATION) + return _create_mock_checkpoint_tuple() + + +def slow_list(config, *, filter=None, before=None, limit=None): # noqa: ARG001 A002 + """Mock list with artificial delay for testing async concurrency.""" + time.sleep(MOCK_SLEEP_DURATION) + return [_create_mock_checkpoint_tuple()] + + +def slow_put(config, checkpoint, metadata, new_versions): # noqa: ARG001 + """Mock put with artificial delay for testing async concurrency.""" + time.sleep(MOCK_SLEEP_DURATION) + return config + + +def slow_put_writes(config, writes, task_id, task_path=""): # noqa: ARG001 + """Mock put_writes with artificial delay for testing async concurrency.""" + time.sleep(MOCK_SLEEP_DURATION) + return + + +def slow_delete_thread(thread_id, actor_id=""): # noqa: ARG001 + """Mock delete_thread with artificial delay for testing async concurrency.""" + time.sleep(MOCK_SLEEP_DURATION) + return + @pytest.fixture def sample_checkpoint_event(): @@ -587,6 +646,210 @@ def test_get_next_version(self, saver): ) assert version.startswith("00000000000000000000000000000011.") + async def test_aget_tuple_calls_sync_method_with_correct_args( + self, saver, runnable_config + ): + """ + Test that aget_tuple calls the sync get_tuple method with correct arguments. + """ + + with patch.object(saver, "get_tuple", side_effect=slow_get_tuple) as mock_get: + result = await saver.aget_tuple(runnable_config) + + # Verify sync method was called with correct arguments + mock_get.assert_called_once_with(runnable_config) + + # Verify result is returned correctly + assert result is not None + + async def test_alist_calls_sync_method_with_correct_args( + self, saver, runnable_config + ): + """Test that alist calls the sync list method with correct arguments.""" + + filter_dict = {"test": "filter"} + before_config = {"before": "config"} + limit_value = 10 + + with patch.object(saver, "list", side_effect=slow_list) as mock_list: + # Collect all items from async iterator + items = [] + async for item in saver.alist( + runnable_config, + filter=filter_dict, + before=before_config, + limit=limit_value, + ): + items.append(item) + + # Verify sync method was called with correct arguments + mock_list.assert_called_once_with( + runnable_config, + filter=filter_dict, + before=before_config, + limit=limit_value, + ) + + async def test_aput_calls_sync_method_with_correct_args( + self, saver, runnable_config, sample_checkpoint, sample_checkpoint_metadata + ): + """Test that aput calls the sync put method with correct arguments.""" + + new_versions = {"default": "v2"} + + with patch.object(saver, "put", side_effect=slow_put) as mock_put: + result = await saver.aput( + runnable_config, + sample_checkpoint, + sample_checkpoint_metadata, + new_versions, + ) + + # Verify sync method was called with correct arguments + mock_put.assert_called_once_with( + runnable_config, + sample_checkpoint, + sample_checkpoint_metadata, + new_versions, + ) + + # Verify result is returned correctly + assert result == runnable_config + + async def test_aput_writes_calls_sync_method_with_correct_args( + self, saver, runnable_config + ): + """ + Test that aput_writes calls the sync put_writes method with correct arguments. + """ + + writes = [("channel", "value")] + task_id = "test-task" + task_path = "test-path" + + with patch.object( + saver, "put_writes", side_effect=slow_put_writes + ) as mock_put_writes: + result = await saver.aput_writes( + runnable_config, writes, task_id, task_path + ) + + # Verify sync method was called with correct arguments + mock_put_writes.assert_called_once_with( + runnable_config, writes, task_id, task_path + ) + + # Verify result (should be None for put_writes) + assert result is None + + async def test_adelete_thread_calls_sync_method_with_correct_args( + self, saver, runnable_config + ): + """ + Test that adelete_thread calls the sync delete_thread method + with correct arguments + """ + + thread_id = runnable_config["configurable"]["thread_id"] + actor_id = runnable_config["configurable"]["actor_id"] + + with patch.object( + saver, "delete_thread", side_effect=slow_delete_thread + ) as mock_delete: + result = await saver.adelete_thread(thread_id, actor_id) + + # Verify sync method was called with correct arguments + mock_delete.assert_called_once_with(thread_id, actor_id) + + # Verify result (should be None for delete_thread) + assert result is None + + async def test_concurrent_calls_aget_tuple(self, saver, runnable_config): + """Test that concurrent calls are faster than sequential calls.""" + with patch.object(saver, "get_tuple", side_effect=slow_get_tuple): + await self.assert_concurrent_calls_are_faster_than_sequential( + N_ASYNC_CALLS, saver.aget_tuple, runnable_config + ) + + async def test_concurrent_calls_adelete_thread(self, saver, runnable_config): + """Test that concurrent calls are faster than sequential calls.""" + thread_id = runnable_config["configurable"]["thread_id"] + actor_id = runnable_config["configurable"]["actor_id"] + + with patch.object(saver, "delete_thread", side_effect=slow_delete_thread): + await self.assert_concurrent_calls_are_faster_than_sequential( + N_ASYNC_CALLS, saver.adelete_thread, thread_id, actor_id + ) + + async def test_concurrent_calls_aput_writes(self, saver, runnable_config): + """Test that concurrent calls are faster than sequential calls.""" + writes = [("channel", "value")] + task_id = "test-task" + task_path = "test-path" + + with patch.object(saver, "put_writes", side_effect=slow_put_writes): + await self.assert_concurrent_calls_are_faster_than_sequential( + N_ASYNC_CALLS, + saver.aput_writes, + runnable_config, + writes, + task_id, + task_path, + ) + + async def test_concurrent_calls_aput( + self, saver, runnable_config, sample_checkpoint, sample_checkpoint_metadata + ): + """Test that concurrent calls are faster than sequential calls.""" + new_versions = {"default": "v2"} + + with patch.object(saver, "put", side_effect=slow_put_writes): + await self.assert_concurrent_calls_are_faster_than_sequential( + N_ASYNC_CALLS, + saver.aput, + runnable_config, + sample_checkpoint, + sample_checkpoint_metadata, + new_versions, + ) + + async def test_concurrent_calls_alist(self, saver, runnable_config): + """Test that concurrent calls are faster than sequential calls.""" + filter_dict = {"test": "filter"} + before_config = {"before": "config"} + limit_value = 10 + + with patch.object(saver, "list", side_effect=slow_list): + + async def consume_alist() -> list: + """Helper coroutine to consume the async iterator.""" + items = [] + async for item in saver.alist( + runnable_config, + filter=filter_dict, + before=before_config, + limit=limit_value, + ): + items.append(item) + return items + + await self.assert_concurrent_calls_are_faster_than_sequential( + N_ASYNC_CALLS, consume_alist + ) + + async def assert_concurrent_calls_are_faster_than_sequential( + self, n_async_calls: int, func, *args, **kwargs + ) -> None: + """Helper to run n async tasks concurrently.""" + tasks = [func(*args, **kwargs) for _ in range(n_async_calls)] + start_time = time.time() + await asyncio.gather(*tasks) + concurrent_time = time.time() - start_time + assert concurrent_time < TOTAL_EXPECTED_TIME, ( + f"Concurrent execution took {concurrent_time:.2f}s, " + f"expected < {TOTAL_EXPECTED_TIME}s" + ) + class TestCheckpointerConfig: """Test suite for CheckpointerConfig.""" From aa54e9a870450aaeee670e0be8a99c7627055c9f Mon Sep 17 00:00:00 2001 From: ltoniazzi Date: Sun, 23 Nov 2025 11:58:29 +0000 Subject: [PATCH 3/6] Cleanup tests --- .../agentcore/saver.py | 1 - .../tests/unit_tests/agentcore/test_saver.py | 123 ++++++++++-------- 2 files changed, 66 insertions(+), 58 deletions(-) diff --git a/libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/agentcore/saver.py b/libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/agentcore/saver.py index cef74253..5f03cf0c 100644 --- a/libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/agentcore/saver.py +++ b/libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/agentcore/saver.py @@ -297,7 +297,6 @@ async def aput( metadata: CheckpointMetadata, new_versions: ChannelVersions, ) -> RunnableConfig: - # return self.put(config, checkpoint, metadata, new_versions) return await run_in_executor( None, self.put, config, checkpoint, metadata, new_versions ) diff --git a/libs/langgraph-checkpoint-aws/tests/unit_tests/agentcore/test_saver.py b/libs/langgraph-checkpoint-aws/tests/unit_tests/agentcore/test_saver.py index f4c34a4b..8f46a2a0 100644 --- a/libs/langgraph-checkpoint-aws/tests/unit_tests/agentcore/test_saver.py +++ b/libs/langgraph-checkpoint-aws/tests/unit_tests/agentcore/test_saver.py @@ -41,53 +41,26 @@ OVERHEAD_RUNNER_TIME = 0.05 TOTAL_EXPECTED_TIME = MOCK_SLEEP_DURATION + OVERHEAD_RUNNER_TIME - -# Mock helper functions for async testing -def _create_mock_checkpoint_tuple( - thread_id="test-thread", checkpoint_id="test-checkpoint" -): +@pytest.fixture +def sample_checkpoint_tuple(): """Helper to create a mock checkpoint tuple with configurable IDs.""" - mock_tuple = MagicMock() - mock_tuple.config = { + config = { "configurable": { - "thread_id": thread_id, - "actor_id": "test-actor", - "checkpoint_id": checkpoint_id, + "thread_id": "test_thread_id", + "actor_id": "test_actor", + "checkpoint_id": "test_checkpoint_id", } } - mock_tuple.checkpoint = {"id": checkpoint_id} - mock_tuple.metadata = {"source": "input", "step": 0} - return mock_tuple - - -def slow_get_tuple(config): # noqa: ARG001 - """Mock get_tuple with artificial delay for testing async concurrency.""" - time.sleep(MOCK_SLEEP_DURATION) - return _create_mock_checkpoint_tuple() - - -def slow_list(config, *, filter=None, before=None, limit=None): # noqa: ARG001 A002 - """Mock list with artificial delay for testing async concurrency.""" - time.sleep(MOCK_SLEEP_DURATION) - return [_create_mock_checkpoint_tuple()] - - -def slow_put(config, checkpoint, metadata, new_versions): # noqa: ARG001 - """Mock put with artificial delay for testing async concurrency.""" - time.sleep(MOCK_SLEEP_DURATION) - return config - + checkpoint = {"id": "test_checkpoint_id"} + metadata = {"source": "input", "step": 0} + return CheckpointTuple( + config=config, + checkpoint=checkpoint, + metadata=metadata, + ) -def slow_put_writes(config, writes, task_id, task_path=""): # noqa: ARG001 - """Mock put_writes with artificial delay for testing async concurrency.""" - time.sleep(MOCK_SLEEP_DURATION) - return -def slow_delete_thread(thread_id, actor_id=""): # noqa: ARG001 - """Mock delete_thread with artificial delay for testing async concurrency.""" - time.sleep(MOCK_SLEEP_DURATION) - return @pytest.fixture @@ -203,6 +176,46 @@ def sample_checkpoint_metadata(self): "namespace2": "parent_checkpoint_2", }, ) + + @pytest.fixture + def slow_get_tuple(self, sample_checkpoint_tuple): + """Mock get_tuple with artificial delay for testing async concurrency.""" + def _slow_get_tuple(config): # noqa: ARG001 + time.sleep(MOCK_SLEEP_DURATION) + return sample_checkpoint_tuple + return _slow_get_tuple + + @pytest.fixture + def slow_list(self, sample_checkpoint_tuple): + """Mock list with artificial delay for testing async concurrency.""" + def _slow_list(config, *, filter=None, before=None, limit=None): # noqa: ARG001 A002 + time.sleep(MOCK_SLEEP_DURATION) + return [sample_checkpoint_tuple] + return _slow_list + + @pytest.fixture + def slow_put(self): + """Mock put with artificial delay for testing async concurrency.""" + def _slow_put(config, checkpoint, metadata, new_versions): # noqa: ARG001 + time.sleep(MOCK_SLEEP_DURATION) + return config + return _slow_put + + @pytest.fixture + def slow_put_writes(self): + """Mock put_writes with artificial delay for testing async concurrency.""" + def _slow_put_writes(config, writes, task_id, task_path=""): # noqa: ARG001 + time.sleep(MOCK_SLEEP_DURATION) + return + return _slow_put_writes + + @pytest.fixture + def slow_delete_thread(self): + """Mock delete_thread with artificial delay for testing async concurrency.""" + def _slow_delete_thread(thread_id, actor_id=""): # noqa: ARG001 + time.sleep(MOCK_SLEEP_DURATION) + return + return _slow_delete_thread def test_init_with_default_client(self, memory_id): with patch("boto3.client") as mock_boto3_client: @@ -647,7 +660,7 @@ def test_get_next_version(self, saver): assert version.startswith("00000000000000000000000000000011.") async def test_aget_tuple_calls_sync_method_with_correct_args( - self, saver, runnable_config + self, saver, runnable_config, slow_get_tuple ): """ Test that aget_tuple calls the sync get_tuple method with correct arguments. @@ -659,11 +672,10 @@ async def test_aget_tuple_calls_sync_method_with_correct_args( # Verify sync method was called with correct arguments mock_get.assert_called_once_with(runnable_config) - # Verify result is returned correctly assert result is not None async def test_alist_calls_sync_method_with_correct_args( - self, saver, runnable_config + self, saver, runnable_config, slow_list ): """Test that alist calls the sync list method with correct arguments.""" @@ -689,9 +701,11 @@ async def test_alist_calls_sync_method_with_correct_args( before=before_config, limit=limit_value, ) + assert len(items) == 1 + assert isinstance(items[0], CheckpointTuple) async def test_aput_calls_sync_method_with_correct_args( - self, saver, runnable_config, sample_checkpoint, sample_checkpoint_metadata + self, saver, runnable_config, sample_checkpoint, sample_checkpoint_metadata, slow_put ): """Test that aput calls the sync put method with correct arguments.""" @@ -713,11 +727,10 @@ async def test_aput_calls_sync_method_with_correct_args( new_versions, ) - # Verify result is returned correctly assert result == runnable_config async def test_aput_writes_calls_sync_method_with_correct_args( - self, saver, runnable_config + self, saver, runnable_config, slow_put_writes ): """ Test that aput_writes calls the sync put_writes method with correct arguments. @@ -738,12 +751,10 @@ async def test_aput_writes_calls_sync_method_with_correct_args( mock_put_writes.assert_called_once_with( runnable_config, writes, task_id, task_path ) - - # Verify result (should be None for put_writes) assert result is None async def test_adelete_thread_calls_sync_method_with_correct_args( - self, saver, runnable_config + self, saver, runnable_config, slow_delete_thread ): """ Test that adelete_thread calls the sync delete_thread method @@ -760,18 +771,16 @@ async def test_adelete_thread_calls_sync_method_with_correct_args( # Verify sync method was called with correct arguments mock_delete.assert_called_once_with(thread_id, actor_id) - - # Verify result (should be None for delete_thread) assert result is None - async def test_concurrent_calls_aget_tuple(self, saver, runnable_config): + async def test_concurrent_calls_aget_tuple(self, saver, runnable_config, slow_get_tuple): """Test that concurrent calls are faster than sequential calls.""" with patch.object(saver, "get_tuple", side_effect=slow_get_tuple): await self.assert_concurrent_calls_are_faster_than_sequential( N_ASYNC_CALLS, saver.aget_tuple, runnable_config ) - async def test_concurrent_calls_adelete_thread(self, saver, runnable_config): + async def test_concurrent_calls_adelete_thread(self, saver, runnable_config, slow_delete_thread): """Test that concurrent calls are faster than sequential calls.""" thread_id = runnable_config["configurable"]["thread_id"] actor_id = runnable_config["configurable"]["actor_id"] @@ -781,7 +790,7 @@ async def test_concurrent_calls_adelete_thread(self, saver, runnable_config): N_ASYNC_CALLS, saver.adelete_thread, thread_id, actor_id ) - async def test_concurrent_calls_aput_writes(self, saver, runnable_config): + async def test_concurrent_calls_aput_writes(self, saver, runnable_config, slow_put_writes): """Test that concurrent calls are faster than sequential calls.""" writes = [("channel", "value")] task_id = "test-task" @@ -798,12 +807,12 @@ async def test_concurrent_calls_aput_writes(self, saver, runnable_config): ) async def test_concurrent_calls_aput( - self, saver, runnable_config, sample_checkpoint, sample_checkpoint_metadata + self, saver, runnable_config, sample_checkpoint, sample_checkpoint_metadata, slow_put ): """Test that concurrent calls are faster than sequential calls.""" new_versions = {"default": "v2"} - with patch.object(saver, "put", side_effect=slow_put_writes): + with patch.object(saver, "put", side_effect=slow_put): await self.assert_concurrent_calls_are_faster_than_sequential( N_ASYNC_CALLS, saver.aput, @@ -813,7 +822,7 @@ async def test_concurrent_calls_aput( new_versions, ) - async def test_concurrent_calls_alist(self, saver, runnable_config): + async def test_concurrent_calls_alist(self, saver, runnable_config, slow_list): """Test that concurrent calls are faster than sequential calls.""" filter_dict = {"test": "filter"} before_config = {"before": "config"} From 97aa28b75916b48ce432714ac37a686e9a4289d5 Mon Sep 17 00:00:00 2001 From: ltoniazzi Date: Sun, 23 Nov 2025 11:59:32 +0000 Subject: [PATCH 4/6] Cleanup tests --- .../tests/unit_tests/agentcore/test_saver.py | 42 +++++++++++++++---- 1 file changed, 33 insertions(+), 9 deletions(-) diff --git a/libs/langgraph-checkpoint-aws/tests/unit_tests/agentcore/test_saver.py b/libs/langgraph-checkpoint-aws/tests/unit_tests/agentcore/test_saver.py index 8f46a2a0..f7ff7caf 100644 --- a/libs/langgraph-checkpoint-aws/tests/unit_tests/agentcore/test_saver.py +++ b/libs/langgraph-checkpoint-aws/tests/unit_tests/agentcore/test_saver.py @@ -41,6 +41,7 @@ OVERHEAD_RUNNER_TIME = 0.05 TOTAL_EXPECTED_TIME = MOCK_SLEEP_DURATION + OVERHEAD_RUNNER_TIME + @pytest.fixture def sample_checkpoint_tuple(): """Helper to create a mock checkpoint tuple with configurable IDs.""" @@ -60,9 +61,6 @@ def sample_checkpoint_tuple(): ) - - - @pytest.fixture def sample_checkpoint_event(): return CheckpointEvent( @@ -176,45 +174,55 @@ def sample_checkpoint_metadata(self): "namespace2": "parent_checkpoint_2", }, ) - + @pytest.fixture def slow_get_tuple(self, sample_checkpoint_tuple): """Mock get_tuple with artificial delay for testing async concurrency.""" + def _slow_get_tuple(config): # noqa: ARG001 time.sleep(MOCK_SLEEP_DURATION) return sample_checkpoint_tuple + return _slow_get_tuple @pytest.fixture def slow_list(self, sample_checkpoint_tuple): """Mock list with artificial delay for testing async concurrency.""" + def _slow_list(config, *, filter=None, before=None, limit=None): # noqa: ARG001 A002 time.sleep(MOCK_SLEEP_DURATION) return [sample_checkpoint_tuple] + return _slow_list @pytest.fixture def slow_put(self): """Mock put with artificial delay for testing async concurrency.""" + def _slow_put(config, checkpoint, metadata, new_versions): # noqa: ARG001 time.sleep(MOCK_SLEEP_DURATION) return config + return _slow_put @pytest.fixture def slow_put_writes(self): """Mock put_writes with artificial delay for testing async concurrency.""" + def _slow_put_writes(config, writes, task_id, task_path=""): # noqa: ARG001 time.sleep(MOCK_SLEEP_DURATION) return + return _slow_put_writes @pytest.fixture def slow_delete_thread(self): """Mock delete_thread with artificial delay for testing async concurrency.""" + def _slow_delete_thread(thread_id, actor_id=""): # noqa: ARG001 time.sleep(MOCK_SLEEP_DURATION) return + return _slow_delete_thread def test_init_with_default_client(self, memory_id): @@ -705,7 +713,12 @@ async def test_alist_calls_sync_method_with_correct_args( assert isinstance(items[0], CheckpointTuple) async def test_aput_calls_sync_method_with_correct_args( - self, saver, runnable_config, sample_checkpoint, sample_checkpoint_metadata, slow_put + self, + saver, + runnable_config, + sample_checkpoint, + sample_checkpoint_metadata, + slow_put, ): """Test that aput calls the sync put method with correct arguments.""" @@ -773,14 +786,18 @@ async def test_adelete_thread_calls_sync_method_with_correct_args( mock_delete.assert_called_once_with(thread_id, actor_id) assert result is None - async def test_concurrent_calls_aget_tuple(self, saver, runnable_config, slow_get_tuple): + async def test_concurrent_calls_aget_tuple( + self, saver, runnable_config, slow_get_tuple + ): """Test that concurrent calls are faster than sequential calls.""" with patch.object(saver, "get_tuple", side_effect=slow_get_tuple): await self.assert_concurrent_calls_are_faster_than_sequential( N_ASYNC_CALLS, saver.aget_tuple, runnable_config ) - async def test_concurrent_calls_adelete_thread(self, saver, runnable_config, slow_delete_thread): + async def test_concurrent_calls_adelete_thread( + self, saver, runnable_config, slow_delete_thread + ): """Test that concurrent calls are faster than sequential calls.""" thread_id = runnable_config["configurable"]["thread_id"] actor_id = runnable_config["configurable"]["actor_id"] @@ -790,7 +807,9 @@ async def test_concurrent_calls_adelete_thread(self, saver, runnable_config, slo N_ASYNC_CALLS, saver.adelete_thread, thread_id, actor_id ) - async def test_concurrent_calls_aput_writes(self, saver, runnable_config, slow_put_writes): + async def test_concurrent_calls_aput_writes( + self, saver, runnable_config, slow_put_writes + ): """Test that concurrent calls are faster than sequential calls.""" writes = [("channel", "value")] task_id = "test-task" @@ -807,7 +826,12 @@ async def test_concurrent_calls_aput_writes(self, saver, runnable_config, slow_p ) async def test_concurrent_calls_aput( - self, saver, runnable_config, sample_checkpoint, sample_checkpoint_metadata, slow_put + self, + saver, + runnable_config, + sample_checkpoint, + sample_checkpoint_metadata, + slow_put, ): """Test that concurrent calls are faster than sequential calls.""" new_versions = {"default": "v2"} From bff9e6245d3b964cea56022c696e7166db47d8f8 Mon Sep 17 00:00:00 2001 From: ltoniazzi Date: Sun, 23 Nov 2025 13:56:04 +0000 Subject: [PATCH 5/6] Cleanup tests 2 --- .../tests/unit_tests/agentcore/test_saver.py | 116 +++++++----------- 1 file changed, 46 insertions(+), 70 deletions(-) diff --git a/libs/langgraph-checkpoint-aws/tests/unit_tests/agentcore/test_saver.py b/libs/langgraph-checkpoint-aws/tests/unit_tests/agentcore/test_saver.py index f7ff7caf..fc2775da 100644 --- a/libs/langgraph-checkpoint-aws/tests/unit_tests/agentcore/test_saver.py +++ b/libs/langgraph-checkpoint-aws/tests/unit_tests/agentcore/test_saver.py @@ -44,20 +44,16 @@ @pytest.fixture def sample_checkpoint_tuple(): - """Helper to create a mock checkpoint tuple with configurable IDs.""" - config = { - "configurable": { - "thread_id": "test_thread_id", - "actor_id": "test_actor", - "checkpoint_id": "test_checkpoint_id", - } - } - checkpoint = {"id": "test_checkpoint_id"} - metadata = {"source": "input", "step": 0} return CheckpointTuple( - config=config, - checkpoint=checkpoint, - metadata=metadata, + config={ + "configurable": { + "thread_id": "test_thread_id", + "actor_id": "test_actor", + "checkpoint_id": "test_checkpoint_id", + } + }, + checkpoint={"id": "test_checkpoint_id"}, + metadata={"source": "input", "step": 0}, ) @@ -176,54 +172,54 @@ def sample_checkpoint_metadata(self): ) @pytest.fixture - def slow_get_tuple(self, sample_checkpoint_tuple): + def mock_slow_get_tuple(self, sample_checkpoint_tuple): """Mock get_tuple with artificial delay for testing async concurrency.""" - def _slow_get_tuple(config): # noqa: ARG001 + def _mock_slow_get_tuple(config): # noqa: ARG001 time.sleep(MOCK_SLEEP_DURATION) return sample_checkpoint_tuple - return _slow_get_tuple + return _mock_slow_get_tuple @pytest.fixture - def slow_list(self, sample_checkpoint_tuple): + def mock_slow_list(self, sample_checkpoint_tuple): """Mock list with artificial delay for testing async concurrency.""" - def _slow_list(config, *, filter=None, before=None, limit=None): # noqa: ARG001 A002 + def _mock_slow_list(config, *, filter=None, before=None, limit=None): # noqa: ARG001 A002 time.sleep(MOCK_SLEEP_DURATION) return [sample_checkpoint_tuple] - return _slow_list + return _mock_slow_list @pytest.fixture - def slow_put(self): + def mock_slow_put(self): """Mock put with artificial delay for testing async concurrency.""" - def _slow_put(config, checkpoint, metadata, new_versions): # noqa: ARG001 + def _mock_slow_put(config, checkpoint, metadata, new_versions): # noqa: ARG001 time.sleep(MOCK_SLEEP_DURATION) return config - return _slow_put + return _mock_slow_put @pytest.fixture - def slow_put_writes(self): + def mock_slow_put_writes(self): """Mock put_writes with artificial delay for testing async concurrency.""" - def _slow_put_writes(config, writes, task_id, task_path=""): # noqa: ARG001 + def _mock_slow_put_writes(config, writes, task_id, task_path=""): # noqa: ARG001 time.sleep(MOCK_SLEEP_DURATION) return - return _slow_put_writes + return _mock_slow_put_writes @pytest.fixture - def slow_delete_thread(self): + def mock_slow_delete_thread(self): """Mock delete_thread with artificial delay for testing async concurrency.""" - def _slow_delete_thread(thread_id, actor_id=""): # noqa: ARG001 + def _mock_slow_delete_thread(thread_id, actor_id=""): # noqa: ARG001 time.sleep(MOCK_SLEEP_DURATION) return - return _slow_delete_thread + return _mock_slow_delete_thread def test_init_with_default_client(self, memory_id): with patch("boto3.client") as mock_boto3_client: @@ -668,13 +664,11 @@ def test_get_next_version(self, saver): assert version.startswith("00000000000000000000000000000011.") async def test_aget_tuple_calls_sync_method_with_correct_args( - self, saver, runnable_config, slow_get_tuple + self, saver, runnable_config, mock_slow_get_tuple ): - """ - Test that aget_tuple calls the sync get_tuple method with correct arguments. - """ - - with patch.object(saver, "get_tuple", side_effect=slow_get_tuple) as mock_get: + with patch.object( + saver, "get_tuple", side_effect=mock_slow_get_tuple + ) as mock_get: result = await saver.aget_tuple(runnable_config) # Verify sync method was called with correct arguments @@ -683,15 +677,13 @@ async def test_aget_tuple_calls_sync_method_with_correct_args( assert result is not None async def test_alist_calls_sync_method_with_correct_args( - self, saver, runnable_config, slow_list + self, saver, runnable_config, mock_slow_list ): - """Test that alist calls the sync list method with correct arguments.""" - filter_dict = {"test": "filter"} before_config = {"before": "config"} limit_value = 10 - with patch.object(saver, "list", side_effect=slow_list) as mock_list: + with patch.object(saver, "list", side_effect=mock_slow_list) as mock_list: # Collect all items from async iterator items = [] async for item in saver.alist( @@ -718,13 +710,11 @@ async def test_aput_calls_sync_method_with_correct_args( runnable_config, sample_checkpoint, sample_checkpoint_metadata, - slow_put, + mock_slow_put, ): - """Test that aput calls the sync put method with correct arguments.""" - new_versions = {"default": "v2"} - with patch.object(saver, "put", side_effect=slow_put) as mock_put: + with patch.object(saver, "put", side_effect=mock_slow_put) as mock_put: result = await saver.aput( runnable_config, sample_checkpoint, @@ -743,18 +733,14 @@ async def test_aput_calls_sync_method_with_correct_args( assert result == runnable_config async def test_aput_writes_calls_sync_method_with_correct_args( - self, saver, runnable_config, slow_put_writes + self, saver, runnable_config, mock_slow_put_writes ): - """ - Test that aput_writes calls the sync put_writes method with correct arguments. - """ - writes = [("channel", "value")] task_id = "test-task" task_path = "test-path" with patch.object( - saver, "put_writes", side_effect=slow_put_writes + saver, "put_writes", side_effect=mock_slow_put_writes ) as mock_put_writes: result = await saver.aput_writes( runnable_config, writes, task_id, task_path @@ -767,18 +753,13 @@ async def test_aput_writes_calls_sync_method_with_correct_args( assert result is None async def test_adelete_thread_calls_sync_method_with_correct_args( - self, saver, runnable_config, slow_delete_thread + self, saver, runnable_config, mock_slow_delete_thread ): - """ - Test that adelete_thread calls the sync delete_thread method - with correct arguments - """ - thread_id = runnable_config["configurable"]["thread_id"] actor_id = runnable_config["configurable"]["actor_id"] with patch.object( - saver, "delete_thread", side_effect=slow_delete_thread + saver, "delete_thread", side_effect=mock_slow_delete_thread ) as mock_delete: result = await saver.adelete_thread(thread_id, actor_id) @@ -787,35 +768,32 @@ async def test_adelete_thread_calls_sync_method_with_correct_args( assert result is None async def test_concurrent_calls_aget_tuple( - self, saver, runnable_config, slow_get_tuple + self, saver, runnable_config, mock_slow_get_tuple ): - """Test that concurrent calls are faster than sequential calls.""" - with patch.object(saver, "get_tuple", side_effect=slow_get_tuple): + with patch.object(saver, "get_tuple", side_effect=mock_slow_get_tuple): await self.assert_concurrent_calls_are_faster_than_sequential( N_ASYNC_CALLS, saver.aget_tuple, runnable_config ) async def test_concurrent_calls_adelete_thread( - self, saver, runnable_config, slow_delete_thread + self, saver, runnable_config, mock_slow_delete_thread ): - """Test that concurrent calls are faster than sequential calls.""" thread_id = runnable_config["configurable"]["thread_id"] actor_id = runnable_config["configurable"]["actor_id"] - with patch.object(saver, "delete_thread", side_effect=slow_delete_thread): + with patch.object(saver, "delete_thread", side_effect=mock_slow_delete_thread): await self.assert_concurrent_calls_are_faster_than_sequential( N_ASYNC_CALLS, saver.adelete_thread, thread_id, actor_id ) async def test_concurrent_calls_aput_writes( - self, saver, runnable_config, slow_put_writes + self, saver, runnable_config, mock_slow_put_writes ): - """Test that concurrent calls are faster than sequential calls.""" writes = [("channel", "value")] task_id = "test-task" task_path = "test-path" - with patch.object(saver, "put_writes", side_effect=slow_put_writes): + with patch.object(saver, "put_writes", side_effect=mock_slow_put_writes): await self.assert_concurrent_calls_are_faster_than_sequential( N_ASYNC_CALLS, saver.aput_writes, @@ -831,12 +809,11 @@ async def test_concurrent_calls_aput( runnable_config, sample_checkpoint, sample_checkpoint_metadata, - slow_put, + mock_slow_put, ): - """Test that concurrent calls are faster than sequential calls.""" new_versions = {"default": "v2"} - with patch.object(saver, "put", side_effect=slow_put): + with patch.object(saver, "put", side_effect=mock_slow_put): await self.assert_concurrent_calls_are_faster_than_sequential( N_ASYNC_CALLS, saver.aput, @@ -846,13 +823,12 @@ async def test_concurrent_calls_aput( new_versions, ) - async def test_concurrent_calls_alist(self, saver, runnable_config, slow_list): - """Test that concurrent calls are faster than sequential calls.""" + async def test_concurrent_calls_alist(self, saver, runnable_config, mock_slow_list): filter_dict = {"test": "filter"} before_config = {"before": "config"} limit_value = 10 - with patch.object(saver, "list", side_effect=slow_list): + with patch.object(saver, "list", side_effect=mock_slow_list): async def consume_alist() -> list: """Helper coroutine to consume the async iterator.""" From 6e6b87e22f9e68114122ffbac12d5de23d781ba4 Mon Sep 17 00:00:00 2001 From: ltoniazzi Date: Sun, 23 Nov 2025 15:11:28 +0000 Subject: [PATCH 6/6] Cleanup tests 3 --- .../tests/unit_tests/agentcore/test_saver.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/libs/langgraph-checkpoint-aws/tests/unit_tests/agentcore/test_saver.py b/libs/langgraph-checkpoint-aws/tests/unit_tests/agentcore/test_saver.py index fc2775da..574c609a 100644 --- a/libs/langgraph-checkpoint-aws/tests/unit_tests/agentcore/test_saver.py +++ b/libs/langgraph-checkpoint-aws/tests/unit_tests/agentcore/test_saver.py @@ -32,14 +32,14 @@ ) from langgraph_checkpoint_aws.agentcore.saver import AgentCoreMemorySaver -# Configure pytest to use anyio for async tests (asyncio backend only) +# Configure pytest to use anyio for async tests pytestmark = pytest.mark.anyio # Test constants for async testing -N_ASYNC_CALLS = 10 -MOCK_SLEEP_DURATION = 0.5 / N_ASYNC_CALLS -OVERHEAD_RUNNER_TIME = 0.05 -TOTAL_EXPECTED_TIME = MOCK_SLEEP_DURATION + OVERHEAD_RUNNER_TIME +N_ASYNC_CALLS = 5 +MOCK_SLEEP_DURATION = 0.1 / N_ASYNC_CALLS +OVERHEAD_DURATION = 0.01 +TOTAL_EXPECTED_TIME = MOCK_SLEEP_DURATION + OVERHEAD_DURATION @pytest.fixture