|
1 | 1 | using System; |
| 2 | +using System.Collections.Concurrent; |
2 | 3 | using System.Collections.Generic; |
3 | 4 | using System.Linq; |
4 | 5 | using System.Threading; |
@@ -176,43 +177,79 @@ public async Task<IDictionary<string, CacheValue<T>>> GetAllAsync<T>(IEnumerable |
176 | 177 | throw new ArgumentNullException(nameof(keys)); |
177 | 178 |
|
178 | 179 | string[] keyArray = keys.ToArray(); |
179 | | - var result = new Dictionary<string, CacheValue<T>>(keyArray.Length); |
| 180 | + // Use ConcurrentDictionary for thread-safe access without locks |
| 181 | + var result = new ConcurrentDictionary<string, CacheValue<T>>(StringComparer.OrdinalIgnoreCase); |
180 | 182 | if (keyArray.Length is 0) |
181 | | - return result; |
| 183 | + return result.ToDictionary(kvp => kvp.Key, kvp => kvp.Value); |
182 | 184 |
|
183 | | - var missedKeys = new List<string>(keyArray.Length); |
184 | | - foreach (string key in keyArray.Where(k => !String.IsNullOrEmpty(k))) |
| 185 | + // Use ConcurrentBag for thread-safe collection without locks |
| 186 | + var missedKeys = new ConcurrentBag<string>(); |
| 187 | + |
| 188 | + // Parallelize local cache lookups (in-memory only, no external IO) |
| 189 | + // See: https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/how-to-write-a-parallel-foreach-loop |
| 190 | + await Parallel.ForEachAsync(keyArray.Where(k => !String.IsNullOrEmpty(k)), async (key, ct) => |
185 | 191 | { |
186 | 192 | var localValue = await _localCache.GetAsync<T>(key).AnyContext(); |
187 | 193 | if (localValue.HasValue) |
188 | 194 | { |
189 | 195 | _logger.LogTrace("Local cache hit: {Key}", key); |
190 | 196 | Interlocked.Increment(ref _localCacheHits); |
191 | | - result[key] = localValue; |
| 197 | + // Try to add the key/value pair, and log if it already exists |
| 198 | + if (!result.TryAdd(key, localValue)) { |
| 199 | + // Duplicate key detected - could happen with case-insensitive comparer or race condition |
| 200 | + _logger.LogWarning("Duplicate cache key detected during parallel processing: {Key}", key); |
| 201 | + |
| 202 | + // Overwrite with new value (last one wins) to match original behavior |
| 203 | + result[key] = localValue; |
| 204 | + } |
192 | 205 | } |
193 | 206 | else |
194 | 207 | { |
195 | 208 | _logger.LogTrace("Local cache miss: {Key}", key); |
| 209 | + // ConcurrentBag doesn't need locks |
196 | 210 | missedKeys.Add(key); |
197 | 211 | } |
198 | | - } |
| 212 | + }).AnyContext(); |
199 | 213 |
|
200 | 214 | if (missedKeys.Count > 0) |
201 | 215 | { |
202 | 216 | var distributedResults = await _distributedCache.GetAllAsync<T>(missedKeys).AnyContext(); |
203 | | - foreach (var kvp in distributedResults) |
| 217 | + |
| 218 | + // Get all expirations in a single bulk operation to avoid n+1 problem |
| 219 | + // where we were calling GetExpirationAsync for each key individually |
| 220 | + var keysWithValues = distributedResults.Where(kvp => kvp.Value.HasValue).Select(kvp => kvp.Key).ToList(); |
| 221 | + var expirations = keysWithValues.Count > 0 |
| 222 | + ? await _distributedCache.GetAllExpirationAsync(keysWithValues).AnyContext() |
| 223 | + : new Dictionary<string, TimeSpan?>(); |
| 224 | + |
| 225 | + // Parallelize populating local cache from distributed results |
| 226 | + // Limit concurrency to avoid overwhelming local cache with SetAsync requests |
| 227 | + // We now only need 1 network call per key (SetAsync) since we got all expirations in bulk |
| 228 | + // See: https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.parallel.foreachasync |
| 229 | + int maxParallelism = Math.Min(10, Environment.ProcessorCount); |
| 230 | + await Parallel.ForEachAsync(distributedResults, new ParallelOptions { MaxDegreeOfParallelism = maxParallelism }, async (kvp, ct) => |
204 | 231 | { |
205 | | - result[kvp.Key] = kvp.Value; |
| 232 | + // Try to add the key/value pair, log error if it already exists (shouldn't happen) |
| 233 | + if (!result.TryAdd(kvp.Key, kvp.Value)) { |
| 234 | + // Duplicate key detected - this really shouldn't happen as it means the distributed cache |
| 235 | + // returned the same key twice or a key was somehow both in the local and distributed cache |
| 236 | + _logger.LogError("Unexpected duplicate key from distributed results: {Key} - possible cache inconsistency", kvp.Key); |
| 237 | + |
| 238 | + // Overwrite with new value (last one wins) to match original behavior |
| 239 | + result[kvp.Key] = kvp.Value; |
| 240 | + } |
| 241 | + |
206 | 242 | if (kvp.Value.HasValue) |
207 | 243 | { |
208 | | - var expiration = await _distributedCache.GetExpirationAsync(kvp.Key).AnyContext(); |
| 244 | + // Use pre-fetched expiration from bulk call |
| 245 | + var expiration = expirations.TryGetValue(kvp.Key, out var exp) ? exp : null; |
209 | 246 | _logger.LogTrace("Setting Local cache key: {Key} with expiration: {Expiration}", kvp.Key, expiration); |
210 | 247 | await _localCache.SetAsync(kvp.Key, kvp.Value.Value, expiration).AnyContext(); |
211 | 248 | } |
212 | | - } |
| 249 | + }).AnyContext(); |
213 | 250 | } |
214 | 251 |
|
215 | | - return result; |
| 252 | + return result.AsReadOnly(); |
216 | 253 | } |
217 | 254 |
|
218 | 255 | public async Task<bool> AddAsync<T>(string key, T value, TimeSpan? expiresIn = null) |
|
0 commit comments