Skip to content

Commit b05b54b

Browse files
committed
feat(uptime): add config and read-only mode for redis client
1 parent df85306 commit b05b54b

File tree

5 files changed

+482
-136
lines changed

5 files changed

+482
-136
lines changed

src/app/config.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,9 @@ pub struct Config {
156156

157157
/// Whether to collect connection-level metrics (only available on Isahc)
158158
pub enable_metrics: bool,
159+
160+
/// Whether this uptime checker will write to redis or not.
161+
pub redis_readonly: bool,
159162
}
160163

161164
impl Default for Config {
@@ -197,6 +200,7 @@ impl Default for Config {
197200
thread_cpu_scale_factor: 1,
198201
redis_timeouts_ms: 30_000,
199202
enable_metrics: false,
203+
redis_readonly: false,
200204
}
201205
}
202206
}
@@ -352,6 +356,7 @@ mod tests {
352356
thread_cpu_scale_factor: 1,
353357
redis_timeouts_ms: 30_000,
354358
enable_metrics: false,
359+
redis_readonly: false,
355360
}
356361
);
357362
},
@@ -398,6 +403,7 @@ mod tests {
398403
"8.8.8.8,8.8.4.4",
399404
),
400405
("UPTIME_CHECKER_INTERFACE", "eth0"),
406+
("UPTIME_CHECKER_REDIS_READONLY", "true"),
401407
],
402408
|config| {
403409
assert_eq!(
@@ -445,6 +451,7 @@ mod tests {
445451
thread_cpu_scale_factor: 3,
446452
redis_timeouts_ms: 30_000,
447453
enable_metrics: false,
454+
redis_readonly: true,
448455
}
449456
);
450457
},

src/check_config_provider/redis_config_provider.rs

Lines changed: 123 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use anyhow::Result;
2-
use redis::AsyncCommands;
32
use std::sync::Arc;
43
use std::{collections::HashSet, time::Instant};
54
use tokio::task::JoinHandle;
@@ -70,7 +69,6 @@ impl RedisKey for CheckConfig {
7069

7170
pub struct RedisConfigProvider {
7271
redis: RedisClient,
73-
redis_timeouts_ms: u64,
7472
partitions: HashSet<u16>,
7573
check_interval: Duration,
7674
}
@@ -82,13 +80,18 @@ impl RedisConfigProvider {
8280
check_interval: Duration,
8381
enable_cluster: bool,
8482
redis_timeouts_ms: u64,
83+
readonly: bool,
8584
) -> Result<Self> {
86-
let client = crate::redis::build_redis_client(redis_url, enable_cluster)?;
85+
let client = crate::redis::build_redis_client(
86+
redis_url,
87+
enable_cluster,
88+
redis_timeouts_ms,
89+
readonly,
90+
)?;
8791
Ok(Self {
8892
redis: client,
8993
partitions,
9094
check_interval,
91-
redis_timeouts_ms,
9295
})
9396
}
9497

@@ -105,8 +108,11 @@ impl RedisConfigProvider {
105108
.set(partitions.len() as f64);
106109
self.load_initial_configs(manager.clone(), &partitions, region)
107110
.await;
108-
self.monitor_updates(manager.clone(), &partitions, shutdown, region)
109-
.await;
111+
112+
if !self.redis.is_readonly() {
113+
self.monitor_updates(manager.clone(), &partitions, shutdown, region)
114+
.await;
115+
}
110116
}
111117

112118
fn get_partition_keys(&self) -> Vec<RedisPartition> {
@@ -126,9 +132,9 @@ impl RedisConfigProvider {
126132
// Fetch configs for all partitions from Redis and register them with the manager
127133
manager.update_partitions(&self.partitions);
128134

129-
let mut conn = self
135+
let mut ops = self
130136
.redis
131-
.get_async_connection(self.redis_timeouts_ms)
137+
.get_async_connection()
132138
.await
133139
.expect("Redis should be available");
134140
metrics::gauge!("config_provider.initial_load.partitions", "uptime_region" => region)
@@ -139,8 +145,8 @@ impl RedisConfigProvider {
139145
// Initial load of all configs for all partitions
140146
for partition in partitions {
141147
let partition_start_loading = Instant::now();
142-
let config_payloads: Vec<Vec<u8>> = conn
143-
.hvals(&partition.config_key)
148+
let config_payloads: Vec<Vec<u8>> = ops
149+
.read_configs(&partition.config_key)
144150
.await
145151
.expect("Config key should exist");
146152
tracing::info!(
@@ -191,9 +197,9 @@ impl RedisConfigProvider {
191197
shutdown: CancellationToken,
192198
region: &'static str,
193199
) {
194-
let mut conn = self
200+
let mut ops = self
195201
.redis
196-
.get_async_connection(self.redis_timeouts_ms)
202+
.get_async_connection()
197203
.await
198204
.expect("Redis should be available");
199205
let mut interval = interval(self.check_interval);
@@ -208,35 +214,10 @@ impl RedisConfigProvider {
208214

209215
for partition in partitions.iter() {
210216
let partition_update_start = Instant::now();
211-
let mut pipe = redis::pipe();
212217
// We fetch all updates from the list and then delete the key. We do this
213218
// atomically so that there isn't any chance of a race
214-
let (config_upserts, config_deletes) = pipe
215-
.atomic()
216-
.hvals(&partition.update_key)
217-
.del(&partition.update_key)
218-
.query_async::<(Vec<Vec<u8>>, ())>(&mut conn)
219-
.await
220-
.unwrap_or_else(|err| {
221-
tracing::error!(?err, "redis_config_provider.redis_query_failed");
222-
(vec![], ())
223-
})
224-
.0 // Get just the LRANGE results
225-
.iter()
226-
.map(|payload| {
227-
rmp_serde::from_slice::<ConfigUpdate>(payload).map_err(|err| {
228-
tracing::error!(?err, "config_consumer.invalid_config_message");
229-
err
230-
})
231-
})
232-
.filter_map(Result::ok)
233-
.fold((vec![], vec![]), |(mut upserts, mut deletes), update| {
234-
match update.action {
235-
ConfigUpdateAction::Upsert => upserts.push(update),
236-
ConfigUpdateAction::Delete => deletes.push(update),
237-
}
238-
(upserts, deletes)
239-
});
219+
let (config_upserts, config_deletes) =
220+
ops.consume_config_updates(&partition.update_key).await;
240221

241222
metrics::counter!("config_provider.updater.upserts", "uptime_region" => region, "partition" => partition.number.to_string())
242223
.increment(config_upserts.len() as u64);
@@ -260,19 +241,15 @@ impl RedisConfigProvider {
260241
continue;
261242
}
262243

263-
let config_payloads: Vec<Vec<u8>> = conn
264-
.hget(
265-
partition.config_key.clone(),
244+
let config_payloads: Vec<Vec<u8>> = ops
245+
.get_config_key_payloads(
246+
&partition.config_key,
266247
config_upserts
267248
.iter()
268249
.map(|config| config.redis_key())
269250
.collect::<Vec<_>>(),
270251
)
271-
.await
272-
.unwrap_or_else(|err| {
273-
tracing::error!(?err, "redis_config_provider.config_key_get_failed");
274-
vec![]
275-
});
252+
.await;
276253

277254
for config_payload in config_payloads {
278255
let config: CheckConfig = rmp_serde::from_slice(&config_payload)
@@ -324,6 +301,7 @@ pub fn run_config_provider(
324301
Duration::from_millis(config.config_provider_redis_update_ms),
325302
config.redis_enable_cluster,
326303
config.redis_timeouts_ms,
304+
config.redis_readonly,
327305
)
328306
.expect("Config provider should be initializable");
329307

@@ -375,7 +353,9 @@ mod tests {
375353
use redis_test_macro::redis_test;
376354
use std::time::Duration;
377355

378-
async fn setup_test() -> (
356+
async fn setup_test(
357+
readonly: bool,
358+
) -> (
379359
Config,
380360
RedisAsyncConnection,
381361
Vec<RedisPartition>,
@@ -387,6 +367,7 @@ mod tests {
387367
config_provider_redis_total_partitions: 2,
388368
checker_number: 0,
389369
total_checkers: 1,
370+
redis_readonly: readonly,
390371
..Default::default()
391372
};
392373
let test_partitions: HashSet<u16> = vec![0, 1].into_iter().collect();
@@ -400,6 +381,7 @@ mod tests {
400381
Duration::from_millis(10),
401382
false,
402383
config.redis_timeouts_ms,
384+
config.redis_readonly,
403385
)
404386
.unwrap()
405387
.get_partition_keys();
@@ -419,7 +401,7 @@ mod tests {
419401

420402
#[redis_test(start_paused = false)]
421403
async fn test_redis_config_provider_load_no_configs() {
422-
let (config, _, _, manager, shutdown) = setup_test().await;
404+
let (config, _, _, manager, shutdown) = setup_test(false).await;
423405
let _handle = run_config_provider(&config, manager.clone(), shutdown.clone());
424406
tokio::time::sleep(Duration::from_millis(50)).await;
425407

@@ -439,7 +421,7 @@ mod tests {
439421

440422
#[redis_test(start_paused = false)]
441423
async fn test_redis_config_provider_load() {
442-
let (config, mut conn, partitions, manager, shutdown) = setup_test().await;
424+
let (config, mut conn, partitions, manager, shutdown) = setup_test(false).await;
443425
assert_eq!(partitions.len(), 2);
444426
let partition_configs = partitions
445427
.iter()
@@ -526,7 +508,7 @@ mod tests {
526508

527509
#[redis_test(start_paused = false)]
528510
async fn test_redis_config_provider_updates() {
529-
let (config, conn, partitions, manager, shutdown) = setup_test().await;
511+
let (config, conn, partitions, manager, shutdown) = setup_test(false).await;
530512
assert_eq!(partitions.len(), 2);
531513
let _handle = run_config_provider(&config, manager.clone(), shutdown.clone());
532514

@@ -592,6 +574,96 @@ mod tests {
592574
shutdown.cancel();
593575
}
594576

577+
#[redis_test(start_paused = false)]
578+
async fn test_redis_config_provider_updates_readonly() {
579+
let (config, mut conn, partitions, manager, shutdown) = setup_test(true).await;
580+
assert_eq!(partitions.len(), 2);
581+
582+
let partition_configs = partitions
583+
.iter()
584+
.map(|p| {
585+
(
586+
p,
587+
CheckConfig {
588+
subscription_id: Uuid::new_v4(),
589+
interval: crate::types::check_config::CheckInterval::OneMinute,
590+
..Default::default()
591+
},
592+
)
593+
})
594+
.collect::<Vec<_>>();
595+
for (partition, config) in partition_configs.iter() {
596+
let _: () = conn
597+
.hset(
598+
&partition.config_key,
599+
config.redis_key(),
600+
rmp_serde::to_vec(&config).unwrap(),
601+
)
602+
.await
603+
.unwrap();
604+
}
605+
606+
let _handle = run_config_provider(&config, manager.clone(), shutdown.clone());
607+
608+
tokio::time::sleep(Duration::from_millis(30)).await;
609+
610+
for partition in &partitions {
611+
let configs = manager
612+
.get_service(partition.number)
613+
.get_config_store()
614+
.read()
615+
.unwrap()
616+
.all_configs();
617+
618+
assert_eq!(configs.len(), 1);
619+
}
620+
621+
let partition_configs = partitions
622+
.iter()
623+
.map(|p| {
624+
(
625+
p,
626+
CheckConfig {
627+
subscription_id: Uuid::new_v4(),
628+
..Default::default()
629+
},
630+
)
631+
})
632+
.collect::<Vec<_>>();
633+
634+
// Test adding configs to different partitions
635+
for (partition, config) in partition_configs.iter() {
636+
send_update(conn.clone(), partition, config).await;
637+
}
638+
tokio::time::sleep(Duration::from_millis(15)).await;
639+
640+
// Verify upserts do nothing.
641+
for (partition, _) in partition_configs.clone() {
642+
let configs = manager
643+
.get_service(partition.number)
644+
.get_config_store()
645+
.read()
646+
.unwrap()
647+
.all_configs();
648+
649+
assert_eq!(configs.len(), 1);
650+
}
651+
652+
// Verify deletes do nothing.
653+
let removed_config = partition_configs.first().unwrap();
654+
send_delete(conn.clone(), removed_config.0, &removed_config.1).await;
655+
tokio::time::sleep(Duration::from_millis(15)).await;
656+
let configs = manager
657+
.get_service(removed_config.0.number)
658+
.get_config_store()
659+
.read()
660+
.unwrap()
661+
.all_configs();
662+
assert_eq!(1, configs.len());
663+
664+
shutdown.cancel();
665+
}
666+
595667
fn run_determine_owned_partition_test(
596668
total_partitions: u16,
597669
checker_number: u16,

src/manager.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use crate::check_executor::{run_executor, CheckSender, ExecutorConfig};
1717
use crate::checker::HttpChecker;
1818
use crate::config_waiter::wait_for_partition_boot;
1919
use crate::producer::kafka_producer::KafkaResultsProducer;
20+
use crate::redis::build_redis_client;
2021
use crate::{
2122
app::config::Config,
2223
checker::reqwest_checker::ReqwestChecker,
@@ -61,17 +62,23 @@ impl PartitionedService {
6162
config.region,
6263
);
6364

65+
let client = build_redis_client(
66+
&config.redis_host,
67+
config.redis_enable_cluster,
68+
config.redis_timeouts_ms,
69+
config.redis_readonly,
70+
)
71+
.expect("could not build reddis client");
72+
6473
let scheduler_join_handle = run_scheduler(
6574
partition,
6675
config_store.clone(),
6776
executor_sender,
6877
shutdown_signal.clone(),
6978
build_progress_key(partition),
70-
config.redis_host.clone(),
79+
client,
7180
config_loaded,
7281
config.region,
73-
config.redis_enable_cluster,
74-
config.redis_timeouts_ms,
7582
tasks_finished_tx,
7683
);
7784

0 commit comments

Comments
 (0)