11use anyhow:: Result ;
2- use redis:: AsyncCommands ;
32use std:: sync:: Arc ;
43use std:: { collections:: HashSet , time:: Instant } ;
54use tokio:: task:: JoinHandle ;
@@ -70,7 +69,6 @@ impl RedisKey for CheckConfig {
7069
7170pub struct RedisConfigProvider {
7271 redis : RedisClient ,
73- redis_timeouts_ms : u64 ,
7472 partitions : HashSet < u16 > ,
7573 check_interval : Duration ,
7674}
@@ -82,13 +80,18 @@ impl RedisConfigProvider {
8280 check_interval : Duration ,
8381 enable_cluster : bool ,
8482 redis_timeouts_ms : u64 ,
83+ readonly : bool ,
8584 ) -> Result < Self > {
86- let client = crate :: redis:: build_redis_client ( redis_url, enable_cluster) ?;
85+ let client = crate :: redis:: build_redis_client (
86+ redis_url,
87+ enable_cluster,
88+ redis_timeouts_ms,
89+ readonly,
90+ ) ?;
8791 Ok ( Self {
8892 redis : client,
8993 partitions,
9094 check_interval,
91- redis_timeouts_ms,
9295 } )
9396 }
9497
@@ -105,8 +108,11 @@ impl RedisConfigProvider {
105108 . set ( partitions. len ( ) as f64 ) ;
106109 self . load_initial_configs ( manager. clone ( ) , & partitions, region)
107110 . await ;
108- self . monitor_updates ( manager. clone ( ) , & partitions, shutdown, region)
109- . await ;
111+
112+ if !self . redis . is_readonly ( ) {
113+ self . monitor_updates ( manager. clone ( ) , & partitions, shutdown, region)
114+ . await ;
115+ }
110116 }
111117
112118 fn get_partition_keys ( & self ) -> Vec < RedisPartition > {
@@ -126,9 +132,9 @@ impl RedisConfigProvider {
126132 // Fetch configs for all partitions from Redis and register them with the manager
127133 manager. update_partitions ( & self . partitions ) ;
128134
129- let mut conn = self
135+ let mut ops = self
130136 . redis
131- . get_async_connection ( self . redis_timeouts_ms )
137+ . get_async_connection ( )
132138 . await
133139 . expect ( "Redis should be available" ) ;
134140 metrics:: gauge!( "config_provider.initial_load.partitions" , "uptime_region" => region)
@@ -139,8 +145,8 @@ impl RedisConfigProvider {
139145 // Initial load of all configs for all partitions
140146 for partition in partitions {
141147 let partition_start_loading = Instant :: now ( ) ;
142- let config_payloads: Vec < Vec < u8 > > = conn
143- . hvals ( & partition. config_key )
148+ let config_payloads: Vec < Vec < u8 > > = ops
149+ . read_configs ( & partition. config_key )
144150 . await
145151 . expect ( "Config key should exist" ) ;
146152 tracing:: info!(
@@ -191,9 +197,9 @@ impl RedisConfigProvider {
191197 shutdown : CancellationToken ,
192198 region : & ' static str ,
193199 ) {
194- let mut conn = self
200+ let mut ops = self
195201 . redis
196- . get_async_connection ( self . redis_timeouts_ms )
202+ . get_async_connection ( )
197203 . await
198204 . expect ( "Redis should be available" ) ;
199205 let mut interval = interval ( self . check_interval ) ;
@@ -208,35 +214,10 @@ impl RedisConfigProvider {
208214
209215 for partition in partitions. iter ( ) {
210216 let partition_update_start = Instant :: now ( ) ;
211- let mut pipe = redis:: pipe ( ) ;
212217 // We fetch all updates from the list and then delete the key. We do this
213218 // atomically so that there isn't any chance of a race
214- let ( config_upserts, config_deletes) = pipe
215- . atomic ( )
216- . hvals ( & partition. update_key )
217- . del ( & partition. update_key )
218- . query_async :: < ( Vec < Vec < u8 > > , ( ) ) > ( & mut conn)
219- . await
220- . unwrap_or_else ( |err| {
221- tracing:: error!( ?err, "redis_config_provider.redis_query_failed" ) ;
222- ( vec ! [ ] , ( ) )
223- } )
224- . 0 // Get just the LRANGE results
225- . iter ( )
226- . map ( |payload| {
227- rmp_serde:: from_slice :: < ConfigUpdate > ( payload) . map_err ( |err| {
228- tracing:: error!( ?err, "config_consumer.invalid_config_message" ) ;
229- err
230- } )
231- } )
232- . filter_map ( Result :: ok)
233- . fold ( ( vec ! [ ] , vec ! [ ] ) , |( mut upserts, mut deletes) , update| {
234- match update. action {
235- ConfigUpdateAction :: Upsert => upserts. push ( update) ,
236- ConfigUpdateAction :: Delete => deletes. push ( update) ,
237- }
238- ( upserts, deletes)
239- } ) ;
219+ let ( config_upserts, config_deletes) =
220+ ops. consume_config_updates ( & partition. update_key ) . await ;
240221
241222 metrics:: counter!( "config_provider.updater.upserts" , "uptime_region" => region, "partition" => partition. number. to_string( ) )
242223 . increment ( config_upserts. len ( ) as u64 ) ;
@@ -260,19 +241,15 @@ impl RedisConfigProvider {
260241 continue ;
261242 }
262243
263- let config_payloads: Vec < Vec < u8 > > = conn
264- . hget (
265- partition. config_key . clone ( ) ,
244+ let config_payloads: Vec < Vec < u8 > > = ops
245+ . get_config_key_payloads (
246+ & partition. config_key ,
266247 config_upserts
267248 . iter ( )
268249 . map ( |config| config. redis_key ( ) )
269250 . collect :: < Vec < _ > > ( ) ,
270251 )
271- . await
272- . unwrap_or_else ( |err| {
273- tracing:: error!( ?err, "redis_config_provider.config_key_get_failed" ) ;
274- vec ! [ ]
275- } ) ;
252+ . await ;
276253
277254 for config_payload in config_payloads {
278255 let config: CheckConfig = rmp_serde:: from_slice ( & config_payload)
@@ -324,6 +301,7 @@ pub fn run_config_provider(
324301 Duration :: from_millis ( config. config_provider_redis_update_ms ) ,
325302 config. redis_enable_cluster ,
326303 config. redis_timeouts_ms ,
304+ config. redis_readonly ,
327305 )
328306 . expect ( "Config provider should be initializable" ) ;
329307
@@ -375,7 +353,9 @@ mod tests {
375353 use redis_test_macro:: redis_test;
376354 use std:: time:: Duration ;
377355
378- async fn setup_test ( ) -> (
356+ async fn setup_test (
357+ readonly : bool ,
358+ ) -> (
379359 Config ,
380360 RedisAsyncConnection ,
381361 Vec < RedisPartition > ,
@@ -387,6 +367,7 @@ mod tests {
387367 config_provider_redis_total_partitions : 2 ,
388368 checker_number : 0 ,
389369 total_checkers : 1 ,
370+ redis_readonly : readonly,
390371 ..Default :: default ( )
391372 } ;
392373 let test_partitions: HashSet < u16 > = vec ! [ 0 , 1 ] . into_iter ( ) . collect ( ) ;
@@ -400,6 +381,7 @@ mod tests {
400381 Duration :: from_millis ( 10 ) ,
401382 false ,
402383 config. redis_timeouts_ms ,
384+ config. redis_readonly ,
403385 )
404386 . unwrap ( )
405387 . get_partition_keys ( ) ;
@@ -419,7 +401,7 @@ mod tests {
419401
420402 #[ redis_test( start_paused = false ) ]
421403 async fn test_redis_config_provider_load_no_configs ( ) {
422- let ( config, _, _, manager, shutdown) = setup_test ( ) . await ;
404+ let ( config, _, _, manager, shutdown) = setup_test ( false ) . await ;
423405 let _handle = run_config_provider ( & config, manager. clone ( ) , shutdown. clone ( ) ) ;
424406 tokio:: time:: sleep ( Duration :: from_millis ( 50 ) ) . await ;
425407
@@ -439,7 +421,7 @@ mod tests {
439421
440422 #[ redis_test( start_paused = false ) ]
441423 async fn test_redis_config_provider_load ( ) {
442- let ( config, mut conn, partitions, manager, shutdown) = setup_test ( ) . await ;
424+ let ( config, mut conn, partitions, manager, shutdown) = setup_test ( false ) . await ;
443425 assert_eq ! ( partitions. len( ) , 2 ) ;
444426 let partition_configs = partitions
445427 . iter ( )
@@ -526,7 +508,7 @@ mod tests {
526508
527509 #[ redis_test( start_paused = false ) ]
528510 async fn test_redis_config_provider_updates ( ) {
529- let ( config, conn, partitions, manager, shutdown) = setup_test ( ) . await ;
511+ let ( config, conn, partitions, manager, shutdown) = setup_test ( false ) . await ;
530512 assert_eq ! ( partitions. len( ) , 2 ) ;
531513 let _handle = run_config_provider ( & config, manager. clone ( ) , shutdown. clone ( ) ) ;
532514
@@ -592,6 +574,96 @@ mod tests {
592574 shutdown. cancel ( ) ;
593575 }
594576
577+ #[ redis_test( start_paused = false ) ]
578+ async fn test_redis_config_provider_updates_readonly ( ) {
579+ let ( config, mut conn, partitions, manager, shutdown) = setup_test ( true ) . await ;
580+ assert_eq ! ( partitions. len( ) , 2 ) ;
581+
582+ let partition_configs = partitions
583+ . iter ( )
584+ . map ( |p| {
585+ (
586+ p,
587+ CheckConfig {
588+ subscription_id : Uuid :: new_v4 ( ) ,
589+ interval : crate :: types:: check_config:: CheckInterval :: OneMinute ,
590+ ..Default :: default ( )
591+ } ,
592+ )
593+ } )
594+ . collect :: < Vec < _ > > ( ) ;
595+ for ( partition, config) in partition_configs. iter ( ) {
596+ let _: ( ) = conn
597+ . hset (
598+ & partition. config_key ,
599+ config. redis_key ( ) ,
600+ rmp_serde:: to_vec ( & config) . unwrap ( ) ,
601+ )
602+ . await
603+ . unwrap ( ) ;
604+ }
605+
606+ let _handle = run_config_provider ( & config, manager. clone ( ) , shutdown. clone ( ) ) ;
607+
608+ tokio:: time:: sleep ( Duration :: from_millis ( 30 ) ) . await ;
609+
610+ for partition in & partitions {
611+ let configs = manager
612+ . get_service ( partition. number )
613+ . get_config_store ( )
614+ . read ( )
615+ . unwrap ( )
616+ . all_configs ( ) ;
617+
618+ assert_eq ! ( configs. len( ) , 1 ) ;
619+ }
620+
621+ let partition_configs = partitions
622+ . iter ( )
623+ . map ( |p| {
624+ (
625+ p,
626+ CheckConfig {
627+ subscription_id : Uuid :: new_v4 ( ) ,
628+ ..Default :: default ( )
629+ } ,
630+ )
631+ } )
632+ . collect :: < Vec < _ > > ( ) ;
633+
634+ // Test adding configs to different partitions
635+ for ( partition, config) in partition_configs. iter ( ) {
636+ send_update ( conn. clone ( ) , partition, config) . await ;
637+ }
638+ tokio:: time:: sleep ( Duration :: from_millis ( 15 ) ) . await ;
639+
640+ // Verify upserts do nothing.
641+ for ( partition, _) in partition_configs. clone ( ) {
642+ let configs = manager
643+ . get_service ( partition. number )
644+ . get_config_store ( )
645+ . read ( )
646+ . unwrap ( )
647+ . all_configs ( ) ;
648+
649+ assert_eq ! ( configs. len( ) , 1 ) ;
650+ }
651+
652+ // Verify deletes do nothing.
653+ let removed_config = partition_configs. first ( ) . unwrap ( ) ;
654+ send_delete ( conn. clone ( ) , removed_config. 0 , & removed_config. 1 ) . await ;
655+ tokio:: time:: sleep ( Duration :: from_millis ( 15 ) ) . await ;
656+ let configs = manager
657+ . get_service ( removed_config. 0 . number )
658+ . get_config_store ( )
659+ . read ( )
660+ . unwrap ( )
661+ . all_configs ( ) ;
662+ assert_eq ! ( 1 , configs. len( ) ) ;
663+
664+ shutdown. cancel ( ) ;
665+ }
666+
595667 fn run_determine_owned_partition_test (
596668 total_partitions : u16 ,
597669 checker_number : u16 ,
0 commit comments