-
Notifications
You must be signed in to change notification settings - Fork 27
feat: In Memory Cache #182
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements an In-Memory Cache (IMC) feature using a sharded queue system. The implementation adds a distributed caching layer with 10 shards backed by Redis, featuring automatic cache updates via polling and TTL-based expiration.
Key Changes:
- Adds a sharded queue system with 10 Redis-backed shards and a polling mechanism that runs every 10 seconds
- Implements an in-memory cache (IMC) using a Registry pattern with TTL support for service configurations
- Modifies service configuration lookup to check IMC first before database queries
- Introduces a new
findByNameFromRedisWithDefaultfunction to cache default values when configurations are not found
Reviewed Changes
Copilot reviewed 10 out of 11 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| src/shard_queue/mod.rs | Module definition for the sharded queue system with global singleton handler |
| src/shard_queue/types.rs | Type definitions for shard queue items, metadata, and error types |
| src/shard_queue/registry.rs | Registry implementation for in-memory caching with TTL support and expiration cleanup |
| src/shard_queue/handler.rs | Core handler implementing sharded queue operations, polling logic, and IMC integration |
| src/types/service_configuration.rs | Updated to check IMC before database and push new configs to shard queue |
| src/redis/cache.rs | Enhanced cache lookup with IMC integration and new default value caching function |
| src/redis/commands.rs | Added get_range_from_list method for retrieving items from Redis lists |
| src/storage/types.rs | Added Serialize/Deserialize traits to ServiceConfiguration for caching |
| src/lib.rs | Registered new shard_queue module |
| src/decider/gatewaydecider/gw_scoring.rs | Refactored to use new findByNameFromRedisWithDefault API |
| src/bin/open_router.rs | Spawns shard queue polling task alongside main and metrics servers |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let default_config = crate::storage::types::ServiceConfiguration { | ||
| id: 0, // Placeholder ID since we're not storing in DB | ||
| name: key.clone(), | ||
| value: Some(serde_json::to_string(&default_value).unwrap_or_else(|_| "null".to_string())), | ||
| new_value: None, | ||
| previous_value: None, | ||
| new_value_status: None, | ||
| }; |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The serialization logic here may cause issues. You're serializing default_value to a JSON string and storing it in the value field of ServiceConfiguration. However, the value field is Option<String>, which typically stores a string representation. When this is later retrieved and deserialized, the code expects to deserialize from this string value, which could lead to double-encoding issues (JSON string containing a JSON string). Consider whether the value field should contain the direct JSON representation or if the deserialization logic needs adjustment.
| // Remove oldest entry (simple eviction policy) | ||
| if let Some(oldest_key) = data.keys().next().cloned() { | ||
| data.remove(&oldest_key); |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment says "Remove oldest entry" but HashMap::keys().next() returns an arbitrary key, not necessarily the oldest. HashMap iteration order is not guaranteed to be insertion order. Consider using a more appropriate data structure like LinkedHashMap or updating the comment to reflect the actual behavior (e.g., "Remove arbitrary entry").
src/shard_queue/handler.rs
Outdated
| for item in &new_items { | ||
| // Store in global registry with 600 second TTL | ||
| if let Err(_) = | ||
| GLOBAL_SHARD_REGISTRY.store(item.key.clone(), item.value.clone(), Some(600)) |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The TTL value of 600 seconds is hardcoded in multiple places. Consider defining it as a constant (e.g., const CACHE_TTL_SECONDS: u64 = 600;) to improve maintainability and make it easier to adjust if needed.
src/shard_queue/handler.rs
Outdated
|
|
||
| let inner = ShardedQueueHandlerInner { | ||
| shard_metadata: Arc::new(Mutex::new(shard_metadata)), | ||
| loop_interval: Duration::from_secs(10), // 30 seconds for testing |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment says "30 seconds for testing" but the code sets the duration to 10 seconds. Update the comment to match the actual value: "10 seconds".
| loop_interval: Duration::from_secs(10), // 30 seconds for testing | |
| loop_interval: Duration::from_secs(10), // 10 seconds |
| use crate::storage::schema_pg::service_configuration::dsl; | ||
| use diesel::associations::HasTable; | ||
| use diesel::*; | ||
| use serde_json::json; |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The import use serde_json::json; is unused and should be removed.
| use serde_json::json; |
| pub fn size(&self) -> usize { | ||
| self.data.read().unwrap().len() |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The size() method uses unwrap() which will panic if the lock is poisoned, while other methods properly handle lock errors. For consistency, this should return Result<usize, String> and use map_err like the other methods.
| pub fn size(&self) -> usize { | |
| self.data.read().unwrap().len() | |
| pub fn size(&self) -> Result<usize, String> { | |
| let data = self.data.read().map_err(|e| format!("Read lock error: {}", e))?; | |
| Ok(data.len()) |
src/shard_queue/handler.rs
Outdated
| if let Err(_) = | ||
| GLOBAL_SHARD_REGISTRY.store(item.key.clone(), item.value.clone(), Some(600)) | ||
| { | ||
| logger::error!("Failed to store item in registry: {}", item.key); |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message from the registry store operation is being discarded. Consider logging it: if let Err(e) = ... instead of if let Err(_) = ... to include the actual error message in the log for better debugging.
| if let Err(_) = | |
| GLOBAL_SHARD_REGISTRY.store(item.key.clone(), item.value.clone(), Some(600)) | |
| { | |
| logger::error!("Failed to store item in registry: {}", item.key); | |
| if let Err(e) = | |
| GLOBAL_SHARD_REGISTRY.store(item.key.clone(), item.value.clone(), Some(600)) | |
| { | |
| logger::error!("Failed to store item in registry: {} (error: {})", item.key, e); |
|
|
||
| if let Some(shard_meta) = metadata.get_mut(&shard_id) { | ||
| shard_meta.update_last_modified(); | ||
| logger::debug!("Updated last_modified_at for shard {}", shard_id); |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] If the shard metadata is not found, the update is silently skipped. While the metadata is initialized for all shards 0-9 in new(), consider adding a warning log if the metadata is unexpectedly missing to help diagnose potential issues.
| logger::debug!("Updated last_modified_at for shard {}", shard_id); | |
| logger::debug!("Updated last_modified_at for shard {}", shard_id); | |
| } else { | |
| logger::warn!( | |
| "Shard metadata for shard {} was unexpectedly missing when attempting to update last_modified_at.", | |
| shard_id | |
| ); |
| for raw_item in raw_items { | ||
| match serde_json::from_str::<ShardQueueItem>(&raw_item) { | ||
| Ok(item) => { | ||
| if item.modified_at > last_modified_at { | ||
| new_items.push(item); |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variable new_items is used on line 163 but is never declared. You need to add let mut new_items = Vec::new(); before the loop that starts on line 159.
src/shard_queue/handler.rs
Outdated
| #[test] | ||
| fn test_push_and_get_sizes() { | ||
| let handler = ShardedQueueHandler::new(); | ||
|
|
||
| let item = ShardQueueItem::new("test_key".to_string(), json!({"data": "test"})); | ||
| let result = handler.push_to_shard(item); | ||
|
|
||
| assert!(result.is_ok()); | ||
|
|
||
| let sizes = handler.get_queue_sizes().unwrap(); |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test calls async functions (push_to_shard and get_queue_sizes) but is not marked as an async test. The test should be annotated with #[tokio::test] instead of #[test], and the test function should be async fn.
| #[test] | |
| fn test_push_and_get_sizes() { | |
| let handler = ShardedQueueHandler::new(); | |
| let item = ShardQueueItem::new("test_key".to_string(), json!({"data": "test"})); | |
| let result = handler.push_to_shard(item); | |
| assert!(result.is_ok()); | |
| let sizes = handler.get_queue_sizes().unwrap(); | |
| #[tokio::test] | |
| async fn test_push_and_get_sizes() { | |
| let handler = ShardedQueueHandler::new(); | |
| let item = ShardQueueItem::new("test_key".to_string(), json!({"data": "test"})); | |
| let result = handler.push_to_shard(item).await; | |
| assert!(result.is_ok()); | |
| let sizes = handler.get_queue_sizes().await.unwrap(); |
src/redis/cache.rs
Outdated
| crate::logger::warn!("Failed to push default config '{}' to shard queue: {:?}", key, e); | ||
| } else { | ||
| crate::logger::debug!("Cached default value for config '{}' in IMC", key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| crate::logger::warn!("Failed to push default config '{}' to shard queue: {:?}", key, e); | |
| } else { | |
| crate::logger::debug!("Cached default value for config '{}' in IMC", key); | |
| logger::warn!("Failed to push default config '{}' to shard queue: {:?}", key, e); | |
| } else { | |
| logger::debug!("Cached default value for config '{}' in IMC", key); |
| A: for<'de> Deserialize<'de>, | ||
| { | ||
| let res = service_configuration::find_config_by_name(key).await; | ||
| use crate::shard_queue::find_config_in_mem; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import it in file top
src/redis/cache.rs
Outdated
| use crate::shard_queue::find_config_in_mem; | ||
|
|
||
| if let Ok(cached_value) = find_config_in_mem(&key) { | ||
| crate::logger::debug!("Cache HIT: Found config '{}' in IMC", key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| crate::logger::debug!("Cache HIT: Found config '{}' in IMC", key); | |
| logger::debug!("Cache HIT: Found config '{}' in IMC", key); |
do this change in all other places also
src/redis/cache.rs
Outdated
| } | ||
| crate::logger::debug!("Cache MISS: Config '{}' not found in IMC, checking DB", key); | ||
|
|
||
| if let Ok(Some(config)) = check_database_for_service_config(key.clone()).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use the existing method service_configuration ::find_config_by_name
src/redis/cache.rs
Outdated
| None | ||
| } | ||
|
|
||
| async fn check_database_for_service_config( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need this method as it already available in service_configuration::find_config_by_name
src/shard_queue/handler.rs
Outdated
| let mut shard_metadata = HashMap::new(); | ||
|
|
||
| // Initialize metadata for 10 shards (0-9) | ||
| for shard_id in 0..10 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fetch this value from config so that we change it using envs
src/shard_queue/handler.rs
Outdated
|
|
||
| let inner = ShardedQueueHandlerInner { | ||
| shard_metadata: Arc::new(Mutex::new(shard_metadata)), | ||
| loop_interval: Duration::from_secs(10), // 30 seconds for testing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this also to configs
src/shard_queue/handler.rs
Outdated
| pub fn get_shard_id(&self, key: &str) -> u8 { | ||
| let mut hasher = std::collections::hash_map::DefaultHasher::new(); | ||
| key.hash(&mut hasher); | ||
| (hasher.finish() % 10) as u8 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use config here
src/types/service_configuration.rs
Outdated
| crate::generics::generic_insert(&app_state.db, config).await?; | ||
|
|
||
| // Create ServiceConfiguration for shard queue (after successful DB insert) | ||
| let service_config = ServiceConfiguration { | ||
| id: 0, // We don't need the actual DB ID for IMC, using 0 as placeholder | ||
| name: name.clone(), | ||
| value, | ||
| new_value: None, | ||
| previous_value: None, | ||
| new_value_status: None, | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| crate::generics::generic_insert(&app_state.db, config).await?; | |
| // Create ServiceConfiguration for shard queue (after successful DB insert) | |
| let service_config = ServiceConfiguration { | |
| id: 0, // We don't need the actual DB ID for IMC, using 0 as placeholder | |
| name: name.clone(), | |
| value, | |
| new_value: None, | |
| previous_value: None, | |
| new_value_status: None, | |
| }; | |
| let service_config = crate::generics::generic_insert(&app_state.db, config).await?; | |
src/shard_queue/handler.rs
Outdated
| } | ||
|
|
||
| /// IMC functions following your existing pattern for service_configuration caching | ||
| pub fn find_config_in_mem(key: &str) -> StorageResult<serde_json::Value> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| pub fn find_config_in_mem(key: &str) -> StorageResult<serde_json::Value> { | |
| pub fn find_config_in_mem(key: &str) -> StorageResult< ServiceConfiguration > { |
No description provided.