11using System ;
2- using System . Collections . Concurrent ;
32using System . Collections . Generic ;
43using System . Collections . ObjectModel ;
54using System . Linq ;
@@ -92,8 +91,6 @@ public async Task<int> RemoveAllAsync(IEnumerable<string> keys = null)
9291 if ( server . IsReplica )
9392 continue ;
9493
95- // Try FLUSHDB first (fastest approach)
96- bool flushed = false ;
9794 try
9895 {
9996 long dbSize = await server . DatabaseSizeAsync ( _options . Database ) . AnyContext ( ) ;
@@ -138,21 +135,19 @@ public async Task<int> RemoveAllAsync(IEnumerable<string> keys = null)
138135
139136 foreach ( var batch in redisKeys . Chunk ( batchSize ) )
140137 {
141- await Parallel . ForEachAsync (
142- batch . GroupBy ( k => Database . Multiplexer . HashSlot ( k ) ) ,
143- async ( hashSlotGroup , ct ) =>
138+ foreach ( var hashSlotGroup in batch . GroupBy ( k => Database . Multiplexer . HashSlot ( k ) ) )
139+ {
140+ var hashSlotKeys = hashSlotGroup . ToArray ( ) ;
141+ try
142+ {
143+ long count = await Database . KeyDeleteAsync ( hashSlotKeys ) . AnyContext ( ) ;
144+ Interlocked . Add ( ref deleted , count ) ;
145+ }
146+ catch ( Exception ex )
144147 {
145- var hashSlotKeys = hashSlotGroup . ToArray ( ) ;
146- try
147- {
148- long count = await Database . KeyDeleteAsync ( hashSlotKeys ) . AnyContext ( ) ;
149- Interlocked . Add ( ref deleted , count ) ;
150- }
151- catch ( Exception ex )
152- {
153- _logger . LogError ( ex , "Unable to delete {HashSlot} keys ({Keys}): {Message}" , hashSlotGroup . Key , hashSlotKeys , ex . Message ) ;
154- }
155- } ) . AnyContext ( ) ;
148+ _logger . LogError ( ex , "Unable to delete {HashSlot} keys ({Keys}): {Message}" , hashSlotGroup . Key , hashSlotKeys , ex . Message ) ;
149+ }
150+ }
156151 }
157152 }
158153 else
@@ -167,13 +162,7 @@ await Parallel.ForEachAsync(
167162 if ( redisKeys . Count is 0 )
168163 return 0 ;
169164
170- var keyBatches = redisKeys . Chunk ( batchSize ) . ToArray ( ) ;
171-
172- // NOTE: StackExchange.Redis multiplexes all operations over a single connection and handles
173- // pipelining internally, so parallelism limits here only affect client-side Task creation,
174- // not Redis server load. Consider simplifying to Task.WhenAll in a future refactor.
175- int maxParallelism = Math . Min ( 8 , Environment . ProcessorCount ) ;
176- await Parallel . ForEachAsync ( keyBatches , new ParallelOptions { MaxDegreeOfParallelism = maxParallelism } , async ( batch , ct ) =>
165+ foreach ( var batch in redisKeys . Chunk ( batchSize ) )
177166 {
178167 try
179168 {
@@ -184,7 +173,7 @@ await Parallel.ForEachAsync(
184173 {
185174 _logger . LogError ( ex , "Unable to delete keys ({Keys}): {Message}" , batch , ex . Message ) ;
186175 }
187- } ) . AnyContext ( ) ;
176+ }
188177 }
189178
190179 return ( int ) deleted ;
@@ -217,13 +206,11 @@ public async Task<int> RemoveByPrefixAsync(string prefix)
217206 {
218207 if ( isCluster )
219208 {
220- await Parallel . ForEachAsync (
221- keys . GroupBy ( k => _options . ConnectionMultiplexer . HashSlot ( k ) ) ,
222- async ( slotGroup , ct ) =>
223- {
224- long count = await Database . KeyDeleteAsync ( slotGroup . ToArray ( ) ) . AnyContext ( ) ;
225- Interlocked . Add ( ref deleted , count ) ;
226- } ) . AnyContext ( ) ;
209+ foreach ( var slotGroup in keys . GroupBy ( k => _options . ConnectionMultiplexer . HashSlot ( k ) ) )
210+ {
211+ long count = await Database . KeyDeleteAsync ( slotGroup . ToArray ( ) ) . AnyContext ( ) ;
212+ Interlocked . Add ( ref deleted , count ) ;
213+ }
227214 }
228215 else
229216 {
@@ -310,21 +297,18 @@ public async Task<IDictionary<string, CacheValue<T>>> GetAllAsync<T>(IEnumerable
310297
311298 if ( _options . ConnectionMultiplexer . IsCluster ( ) )
312299 {
313- // Use the default concurrency on .NET 8 (-1)
314- var result = new ConcurrentDictionary < string , CacheValue < T > > ( - 1 , redisKeys . Count ) ;
315- await Parallel . ForEachAsync (
316- redisKeys . GroupBy ( k => _options . ConnectionMultiplexer . HashSlot ( k ) ) ,
317- async ( hashSlotGroup , ct ) =>
318- {
319- var hashSlotKeys = hashSlotGroup . ToArray ( ) ;
320- var values = await Database . StringGetAsync ( hashSlotKeys , _options . ReadMode ) . AnyContext ( ) ;
321-
322- // Redis MGET guarantees that values are returned in the same order as keys.
323- // Non-existent keys return nil/empty values in their respective positions.
324- // https://redis.io/commands/mget
325- for ( int i = 0 ; i < hashSlotKeys . Length ; i ++ )
326- result [ hashSlotKeys [ i ] ] = RedisValueToCacheValue < T > ( values [ i ] ) ;
327- } ) . AnyContext ( ) ;
300+ var result = new Dictionary < string , CacheValue < T > > ( redisKeys . Count ) ;
301+ foreach ( var hashSlotGroup in redisKeys . GroupBy ( k => _options . ConnectionMultiplexer . HashSlot ( k ) ) )
302+ {
303+ var hashSlotKeys = hashSlotGroup . ToArray ( ) ;
304+ var values = await Database . StringGetAsync ( hashSlotKeys , _options . ReadMode ) . AnyContext ( ) ;
305+
306+ // Redis MGET guarantees that values are returned in the same order as keys.
307+ // Non-existent keys return nil/empty values in their respective positions.
308+ // https://redis.io/commands/mget
309+ for ( int i = 0 ; i < hashSlotKeys . Length ; i ++ )
310+ result [ hashSlotKeys [ i ] ] = RedisValueToCacheValue < T > ( values [ i ] ) ;
311+ }
328312
329313 return result . AsReadOnly ( ) ;
330314 }
@@ -616,13 +600,11 @@ public async Task<int> SetAllAsync<T>(IDictionary<string, T> values, TimeSpan? e
616600 // For cluster/sentinel, group keys by hash slot since batch operations
617601 // require all keys to be in the same slot
618602 int successCount = 0 ;
619- await Parallel . ForEachAsync (
620- pairs . GroupBy ( p => _options . ConnectionMultiplexer . HashSlot ( p . Key ) ) ,
621- async ( slotGroup , ct ) =>
622- {
623- int count = await SetAllInternalAsync ( slotGroup . ToArray ( ) , expiresIn ) . AnyContext ( ) ;
624- Interlocked . Add ( ref successCount , count ) ;
625- } ) . AnyContext ( ) ;
603+ foreach ( var slotGroup in pairs . GroupBy ( p => _options . ConnectionMultiplexer . HashSlot ( p . Key ) ) )
604+ {
605+ int count = await SetAllInternalAsync ( slotGroup . ToArray ( ) , expiresIn ) . AnyContext ( ) ;
606+ Interlocked . Add ( ref successCount , count ) ;
607+ }
626608 return successCount ;
627609 }
628610
@@ -829,37 +811,34 @@ public Task SetExpirationAsync(string key, TimeSpan expiresIn)
829811
830812 if ( _options . ConnectionMultiplexer . IsCluster ( ) )
831813 {
832- // Use the default concurrency on .NET 8 (-1)
833- var result = new ConcurrentDictionary < string , TimeSpan ? > ( - 1 , keyList . Count ) ;
834- await Parallel . ForEachAsync (
835- keyList . GroupBy ( k => _options . ConnectionMultiplexer . HashSlot ( k ) ) ,
836- async ( hashSlotGroup , ct ) =>
814+ var result = new Dictionary < string , TimeSpan ? > ( keyList . Count ) ;
815+ foreach ( var hashSlotGroup in keyList . GroupBy ( k => _options . ConnectionMultiplexer . HashSlot ( k ) ) )
816+ {
817+ var hashSlotKeys = hashSlotGroup . ToArray ( ) ;
818+ var redisResult = await Database . ScriptEvaluateAsync ( _getAllExpiration . Hash , hashSlotKeys ) . AnyContext ( ) ;
819+ if ( redisResult . IsNull )
820+ continue ;
821+
822+ // Lua script returns array of TTL values in milliseconds (in same order as keys)
823+ // -2 = key doesn't exist, -1 = no expiration, positive = TTL in ms
824+ long [ ] ttls = ( long [ ] ) redisResult ;
825+ if ( ttls is null || ttls . Length != hashSlotKeys . Length )
826+ throw new InvalidOperationException ( $ "Script returned { ttls ? . Length ?? 0 } results for { hashSlotKeys . Length } keys") ;
827+
828+ for ( int hashSlotIndex = 0 ; hashSlotIndex < hashSlotKeys . Length ; hashSlotIndex ++ )
837829 {
838- var hashSlotKeys = hashSlotGroup . Select ( k => ( RedisKey ) k ) . ToArray ( ) ;
839- var redisResult = await Database . ScriptEvaluateAsync ( _getAllExpiration . Hash , hashSlotKeys ) . AnyContext ( ) ;
840- if ( redisResult . IsNull )
841- return ;
842-
843- // Lua script returns array of TTL values in milliseconds (in same order as keys)
844- // -2 = key doesn't exist, -1 = no expiration, positive = TTL in ms
845- long [ ] ttls = ( long [ ] ) redisResult ;
846- if ( ttls is null || ttls . Length != hashSlotKeys . Length )
847- throw new InvalidOperationException ( $ "Script returned { ttls ? . Length ?? 0 } results for { hashSlotKeys . Length } keys") ;
848-
849- for ( int hashSlotIndex = 0 ; hashSlotIndex < hashSlotKeys . Length ; hashSlotIndex ++ )
850- {
851- string key = hashSlotKeys [ hashSlotIndex ] ;
852- long ttl = ttls [ hashSlotIndex ] ;
853- if ( ttl >= 0 ) // Only include keys with positive TTL (exclude non-existent and persistent)
854- result [ key ] = TimeSpan . FromMilliseconds ( ttl ) ;
855- }
856- } ) . AnyContext ( ) ;
830+ string key = hashSlotKeys [ hashSlotIndex ] ;
831+ long ttl = ttls [ hashSlotIndex ] ;
832+ if ( ttl >= 0 ) // Only include keys with positive TTL (exclude non-existent and persistent)
833+ result [ key ] = TimeSpan . FromMilliseconds ( ttl ) ;
834+ }
835+ }
857836
858837 return result . AsReadOnly ( ) ;
859838 }
860839 else
861840 {
862- var redisKeys = keyList . Select ( k => ( RedisKey ) k ) . ToArray ( ) ;
841+ var redisKeys = keyList . ToArray ( ) ;
863842 var redisResult = await Database . ScriptEvaluateAsync ( _getAllExpiration . Hash , redisKeys ) . AnyContext ( ) ;
864843
865844 if ( redisResult . IsNull )
@@ -897,18 +876,16 @@ public async Task SetAllExpirationAsync(IDictionary<string, TimeSpan?> expiratio
897876
898877 if ( _options . ConnectionMultiplexer . IsCluster ( ) )
899878 {
900- await Parallel . ForEachAsync (
901- expirations . GroupBy ( kvp => _options . ConnectionMultiplexer . HashSlot ( kvp . Key ) ) ,
902- async ( hashSlotGroup , ct ) =>
903- {
904- var hashSlotExpirations = hashSlotGroup . ToList ( ) ;
905- var keys = hashSlotExpirations . Select ( kvp => ( RedisKey ) kvp . Key ) . ToArray ( ) ;
906- var values = hashSlotExpirations
907- . Select ( kvp => ( RedisValue ) ( kvp . Value . HasValue ? ( long ) kvp . Value . Value . TotalMilliseconds : - 1 ) )
908- . ToArray ( ) ;
909-
910- await Database . ScriptEvaluateAsync ( _setAllExpiration . Hash , keys , values ) . AnyContext ( ) ;
911- } ) . AnyContext ( ) ;
879+ foreach ( var hashSlotGroup in expirations . GroupBy ( kvp => _options . ConnectionMultiplexer . HashSlot ( kvp . Key ) ) )
880+ {
881+ var hashSlotExpirations = hashSlotGroup . ToList ( ) ;
882+ var keys = hashSlotExpirations . Select ( kvp => ( RedisKey ) kvp . Key ) . ToArray ( ) ;
883+ var values = hashSlotExpirations
884+ . Select ( kvp => ( RedisValue ) ( kvp . Value . HasValue ? ( long ) kvp . Value . Value . TotalMilliseconds : - 1 ) )
885+ . ToArray ( ) ;
886+
887+ await Database . ScriptEvaluateAsync ( _setAllExpiration . Hash , keys , values ) . AnyContext ( ) ;
888+ }
912889 }
913890 else
914891 {
0 commit comments