Skip to content

Conversation

@AnkitKmrGupta
Copy link
Collaborator

No description provided.

Copilot AI review requested due to automatic review settings November 18, 2025 13:23
Copilot finished reviewing on behalf of AnkitKmrGupta November 18, 2025 13:28
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements an In-Memory Cache (IMC) feature using a sharded queue system. The implementation adds a distributed caching layer with 10 shards backed by Redis, featuring automatic cache updates via polling and TTL-based expiration.

Key Changes:

  • Adds a sharded queue system with 10 Redis-backed shards and a polling mechanism that runs every 10 seconds
  • Implements an in-memory cache (IMC) using a Registry pattern with TTL support for service configurations
  • Modifies service configuration lookup to check IMC first before database queries
  • Introduces a new findByNameFromRedisWithDefault function to cache default values when configurations are not found

Reviewed Changes

Copilot reviewed 10 out of 11 changed files in this pull request and generated 12 comments.

Show a summary per file
File Description
src/shard_queue/mod.rs Module definition for the sharded queue system with global singleton handler
src/shard_queue/types.rs Type definitions for shard queue items, metadata, and error types
src/shard_queue/registry.rs Registry implementation for in-memory caching with TTL support and expiration cleanup
src/shard_queue/handler.rs Core handler implementing sharded queue operations, polling logic, and IMC integration
src/types/service_configuration.rs Updated to check IMC before database and push new configs to shard queue
src/redis/cache.rs Enhanced cache lookup with IMC integration and new default value caching function
src/redis/commands.rs Added get_range_from_list method for retrieving items from Redis lists
src/storage/types.rs Added Serialize/Deserialize traits to ServiceConfiguration for caching
src/lib.rs Registered new shard_queue module
src/decider/gatewaydecider/gw_scoring.rs Refactored to use new findByNameFromRedisWithDefault API
src/bin/open_router.rs Spawns shard queue polling task alongside main and metrics servers

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +72 to +79
let default_config = crate::storage::types::ServiceConfiguration {
id: 0, // Placeholder ID since we're not storing in DB
name: key.clone(),
value: Some(serde_json::to_string(&default_value).unwrap_or_else(|_| "null".to_string())),
new_value: None,
previous_value: None,
new_value_status: None,
};
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The serialization logic here may cause issues. You're serializing default_value to a JSON string and storing it in the value field of ServiceConfiguration. However, the value field is Option<String>, which typically stores a string representation. When this is later retrieved and deserialized, the code expects to deserialize from this string value, which could lead to double-encoding issues (JSON string containing a JSON string). Consider whether the value field should contain the direct JSON representation or if the deserialization logic needs adjustment.

Copilot uses AI. Check for mistakes.
Comment on lines +84 to +86
// Remove oldest entry (simple eviction policy)
if let Some(oldest_key) = data.keys().next().cloned() {
data.remove(&oldest_key);
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says "Remove oldest entry" but HashMap::keys().next() returns an arbitrary key, not necessarily the oldest. HashMap iteration order is not guaranteed to be insertion order. Consider using a more appropriate data structure like LinkedHashMap or updating the comment to reflect the actual behavior (e.g., "Remove arbitrary entry").

Copilot uses AI. Check for mistakes.
for item in &new_items {
// Store in global registry with 600 second TTL
if let Err(_) =
GLOBAL_SHARD_REGISTRY.store(item.key.clone(), item.value.clone(), Some(600))
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The TTL value of 600 seconds is hardcoded in multiple places. Consider defining it as a constant (e.g., const CACHE_TTL_SECONDS: u64 = 600;) to improve maintainability and make it easier to adjust if needed.

Copilot uses AI. Check for mistakes.

let inner = ShardedQueueHandlerInner {
shard_metadata: Arc::new(Mutex::new(shard_metadata)),
loop_interval: Duration::from_secs(10), // 30 seconds for testing
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says "30 seconds for testing" but the code sets the duration to 10 seconds. Update the comment to match the actual value: "10 seconds".

Suggested change
loop_interval: Duration::from_secs(10), // 30 seconds for testing
loop_interval: Duration::from_secs(10), // 10 seconds

Copilot uses AI. Check for mistakes.
use crate::storage::schema_pg::service_configuration::dsl;
use diesel::associations::HasTable;
use diesel::*;
use serde_json::json;
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The import use serde_json::json; is unused and should be removed.

Suggested change
use serde_json::json;

Copilot uses AI. Check for mistakes.
Comment on lines +120 to +121
pub fn size(&self) -> usize {
self.data.read().unwrap().len()
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The size() method uses unwrap() which will panic if the lock is poisoned, while other methods properly handle lock errors. For consistency, this should return Result<usize, String> and use map_err like the other methods.

Suggested change
pub fn size(&self) -> usize {
self.data.read().unwrap().len()
pub fn size(&self) -> Result<usize, String> {
let data = self.data.read().map_err(|e| format!("Read lock error: {}", e))?;
Ok(data.len())

Copilot uses AI. Check for mistakes.
Comment on lines 186 to 189
if let Err(_) =
GLOBAL_SHARD_REGISTRY.store(item.key.clone(), item.value.clone(), Some(600))
{
logger::error!("Failed to store item in registry: {}", item.key);
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message from the registry store operation is being discarded. Consider logging it: if let Err(e) = ... instead of if let Err(_) = ... to include the actual error message in the log for better debugging.

Suggested change
if let Err(_) =
GLOBAL_SHARD_REGISTRY.store(item.key.clone(), item.value.clone(), Some(600))
{
logger::error!("Failed to store item in registry: {}", item.key);
if let Err(e) =
GLOBAL_SHARD_REGISTRY.store(item.key.clone(), item.value.clone(), Some(600))
{
logger::error!("Failed to store item in registry: {} (error: {})", item.key, e);

Copilot uses AI. Check for mistakes.

if let Some(shard_meta) = metadata.get_mut(&shard_id) {
shard_meta.update_last_modified();
logger::debug!("Updated last_modified_at for shard {}", shard_id);
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] If the shard metadata is not found, the update is silently skipped. While the metadata is initialized for all shards 0-9 in new(), consider adding a warning log if the metadata is unexpectedly missing to help diagnose potential issues.

Suggested change
logger::debug!("Updated last_modified_at for shard {}", shard_id);
logger::debug!("Updated last_modified_at for shard {}", shard_id);
} else {
logger::warn!(
"Shard metadata for shard {} was unexpectedly missing when attempting to update last_modified_at.",
shard_id
);

Copilot uses AI. Check for mistakes.
Comment on lines 159 to 163
for raw_item in raw_items {
match serde_json::from_str::<ShardQueueItem>(&raw_item) {
Ok(item) => {
if item.modified_at > last_modified_at {
new_items.push(item);
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable new_items is used on line 163 but is never declared. You need to add let mut new_items = Vec::new(); before the loop that starts on line 159.

Copilot uses AI. Check for mistakes.
Comment on lines 313 to 322
#[test]
fn test_push_and_get_sizes() {
let handler = ShardedQueueHandler::new();

let item = ShardQueueItem::new("test_key".to_string(), json!({"data": "test"}));
let result = handler.push_to_shard(item);

assert!(result.is_ok());

let sizes = handler.get_queue_sizes().unwrap();
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test calls async functions (push_to_shard and get_queue_sizes) but is not marked as an async test. The test should be annotated with #[tokio::test] instead of #[test], and the test function should be async fn.

Suggested change
#[test]
fn test_push_and_get_sizes() {
let handler = ShardedQueueHandler::new();
let item = ShardQueueItem::new("test_key".to_string(), json!({"data": "test"}));
let result = handler.push_to_shard(item);
assert!(result.is_ok());
let sizes = handler.get_queue_sizes().unwrap();
#[tokio::test]
async fn test_push_and_get_sizes() {
let handler = ShardedQueueHandler::new();
let item = ShardQueueItem::new("test_key".to_string(), json!({"data": "test"}));
let result = handler.push_to_shard(item).await;
assert!(result.is_ok());
let sizes = handler.get_queue_sizes().await.unwrap();

Copilot uses AI. Check for mistakes.
Comment on lines 85 to 87
crate::logger::warn!("Failed to push default config '{}' to shard queue: {:?}", key, e);
} else {
crate::logger::debug!("Cached default value for config '{}' in IMC", key);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
crate::logger::warn!("Failed to push default config '{}' to shard queue: {:?}", key, e);
} else {
crate::logger::debug!("Cached default value for config '{}' in IMC", key);
logger::warn!("Failed to push default config '{}' to shard queue: {:?}", key, e);
} else {
logger::debug!("Cached default value for config '{}' in IMC", key);

A: for<'de> Deserialize<'de>,
{
let res = service_configuration::find_config_by_name(key).await;
use crate::shard_queue::find_config_in_mem;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import it in file top

use crate::shard_queue::find_config_in_mem;

if let Ok(cached_value) = find_config_in_mem(&key) {
crate::logger::debug!("Cache HIT: Found config '{}' in IMC", key);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
crate::logger::debug!("Cache HIT: Found config '{}' in IMC", key);
logger::debug!("Cache HIT: Found config '{}' in IMC", key);

do this change in all other places also

}
crate::logger::debug!("Cache MISS: Config '{}' not found in IMC, checking DB", key);

if let Ok(Some(config)) = check_database_for_service_config(key.clone()).await {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the existing method service_configuration ::find_config_by_name

None
}

async fn check_database_for_service_config(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need this method as it already available in service_configuration::find_config_by_name

let mut shard_metadata = HashMap::new();

// Initialize metadata for 10 shards (0-9)
for shard_id in 0..10 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetch this value from config so that we change it using envs


let inner = ShardedQueueHandlerInner {
shard_metadata: Arc::new(Mutex::new(shard_metadata)),
loop_interval: Duration::from_secs(10), // 30 seconds for testing
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this also to configs

pub fn get_shard_id(&self, key: &str) -> u8 {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
key.hash(&mut hasher);
(hasher.finish() % 10) as u8
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use config here

Comment on lines 43 to 62
crate::generics::generic_insert(&app_state.db, config).await?;

// Create ServiceConfiguration for shard queue (after successful DB insert)
let service_config = ServiceConfiguration {
id: 0, // We don't need the actual DB ID for IMC, using 0 as placeholder
name: name.clone(),
value,
new_value: None,
previous_value: None,
new_value_status: None,
};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
crate::generics::generic_insert(&app_state.db, config).await?;
// Create ServiceConfiguration for shard queue (after successful DB insert)
let service_config = ServiceConfiguration {
id: 0, // We don't need the actual DB ID for IMC, using 0 as placeholder
name: name.clone(),
value,
new_value: None,
previous_value: None,
new_value_status: None,
};
let service_config = crate::generics::generic_insert(&app_state.db, config).await?;

}

/// IMC functions following your existing pattern for service_configuration caching
pub fn find_config_in_mem(key: &str) -> StorageResult<serde_json::Value> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn find_config_in_mem(key: &str) -> StorageResult<serde_json::Value> {
pub fn find_config_in_mem(key: &str) -> StorageResult< ServiceConfiguration > {

@AnkitKmrGupta AnkitKmrGupta changed the title Feature imc feat: In Memory Cache Nov 18, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants