Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/app/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ pub struct Config {

/// Whether to collect connection-level metrics (only available on Isahc)
pub enable_metrics: bool,

/// Whether this uptime checker will write to redis or not.
pub redis_readonly: bool,
}

impl Default for Config {
Expand Down Expand Up @@ -197,6 +200,7 @@ impl Default for Config {
thread_cpu_scale_factor: 1,
redis_timeouts_ms: 30_000,
enable_metrics: false,
redis_readonly: false,
}
}
}
Expand Down Expand Up @@ -352,6 +356,7 @@ mod tests {
thread_cpu_scale_factor: 1,
redis_timeouts_ms: 30_000,
enable_metrics: false,
redis_readonly: false,
}
);
},
Expand Down Expand Up @@ -398,6 +403,7 @@ mod tests {
"8.8.8.8,8.8.4.4",
),
("UPTIME_CHECKER_INTERFACE", "eth0"),
("UPTIME_CHECKER_REDIS_READONLY", "true"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

],
|config| {
assert_eq!(
Expand Down Expand Up @@ -445,6 +451,7 @@ mod tests {
thread_cpu_scale_factor: 3,
redis_timeouts_ms: 30_000,
enable_metrics: false,
redis_readonly: true,
}
);
},
Expand Down
174 changes: 123 additions & 51 deletions src/check_config_provider/redis_config_provider.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use anyhow::Result;
use redis::AsyncCommands;
use std::sync::Arc;
use std::{collections::HashSet, time::Instant};
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -70,7 +69,6 @@ impl RedisKey for CheckConfig {

pub struct RedisConfigProvider {
redis: RedisClient,
redis_timeouts_ms: u64,
partitions: HashSet<u16>,
check_interval: Duration,
}
Expand All @@ -82,13 +80,18 @@ impl RedisConfigProvider {
check_interval: Duration,
enable_cluster: bool,
redis_timeouts_ms: u64,
readonly: bool,
) -> Result<Self> {
let client = crate::redis::build_redis_client(redis_url, enable_cluster)?;
let client = crate::redis::build_redis_client(
redis_url,
enable_cluster,
redis_timeouts_ms,
readonly,
)?;
Ok(Self {
redis: client,
partitions,
check_interval,
redis_timeouts_ms,
})
}

Expand All @@ -105,8 +108,11 @@ impl RedisConfigProvider {
.set(partitions.len() as f64);
self.load_initial_configs(manager.clone(), &partitions, region)
.await;
self.monitor_updates(manager.clone(), &partitions, shutdown, region)
.await;

if !self.redis.is_readonly() {
self.monitor_updates(manager.clone(), &partitions, shutdown, region)
.await;
}
}

fn get_partition_keys(&self) -> Vec<RedisPartition> {
Expand All @@ -126,9 +132,9 @@ impl RedisConfigProvider {
// Fetch configs for all partitions from Redis and register them with the manager
manager.update_partitions(&self.partitions);

let mut conn = self
let mut ops = self
.redis
.get_async_connection(self.redis_timeouts_ms)
.get_async_connection()
.await
.expect("Redis should be available");
metrics::gauge!("config_provider.initial_load.partitions", "uptime_region" => region)
Expand All @@ -139,8 +145,8 @@ impl RedisConfigProvider {
// Initial load of all configs for all partitions
for partition in partitions {
let partition_start_loading = Instant::now();
let config_payloads: Vec<Vec<u8>> = conn
.hvals(&partition.config_key)
let config_payloads: Vec<Vec<u8>> = ops
.read_configs(&partition.config_key)
.await
.expect("Config key should exist");
tracing::info!(
Expand Down Expand Up @@ -191,9 +197,9 @@ impl RedisConfigProvider {
shutdown: CancellationToken,
region: &'static str,
) {
let mut conn = self
let mut ops = self
.redis
.get_async_connection(self.redis_timeouts_ms)
.get_async_connection()
.await
.expect("Redis should be available");
let mut interval = interval(self.check_interval);
Expand All @@ -208,35 +214,10 @@ impl RedisConfigProvider {

for partition in partitions.iter() {
let partition_update_start = Instant::now();
let mut pipe = redis::pipe();
// We fetch all updates from the list and then delete the key. We do this
// atomically so that there isn't any chance of a race
let (config_upserts, config_deletes) = pipe
.atomic()
.hvals(&partition.update_key)
.del(&partition.update_key)
.query_async::<(Vec<Vec<u8>>, ())>(&mut conn)
.await
.unwrap_or_else(|err| {
tracing::error!(?err, "redis_config_provider.redis_query_failed");
(vec![], ())
})
.0 // Get just the LRANGE results
.iter()
.map(|payload| {
rmp_serde::from_slice::<ConfigUpdate>(payload).map_err(|err| {
tracing::error!(?err, "config_consumer.invalid_config_message");
err
})
})
.filter_map(Result::ok)
.fold((vec![], vec![]), |(mut upserts, mut deletes), update| {
match update.action {
ConfigUpdateAction::Upsert => upserts.push(update),
ConfigUpdateAction::Delete => deletes.push(update),
}
(upserts, deletes)
});
let (config_upserts, config_deletes) =
ops.consume_config_updates(&partition.update_key).await;

metrics::counter!("config_provider.updater.upserts", "uptime_region" => region, "partition" => partition.number.to_string())
.increment(config_upserts.len() as u64);
Expand All @@ -260,19 +241,15 @@ impl RedisConfigProvider {
continue;
}

let config_payloads: Vec<Vec<u8>> = conn
.hget(
partition.config_key.clone(),
let config_payloads: Vec<Vec<u8>> = ops
.get_config_key_payloads(
&partition.config_key,
config_upserts
.iter()
.map(|config| config.redis_key())
.collect::<Vec<_>>(),
)
.await
.unwrap_or_else(|err| {
tracing::error!(?err, "redis_config_provider.config_key_get_failed");
vec![]
});
.await;

for config_payload in config_payloads {
let config: CheckConfig = rmp_serde::from_slice(&config_payload)
Expand Down Expand Up @@ -324,6 +301,7 @@ pub fn run_config_provider(
Duration::from_millis(config.config_provider_redis_update_ms),
config.redis_enable_cluster,
config.redis_timeouts_ms,
config.redis_readonly,
)
.expect("Config provider should be initializable");

Expand Down Expand Up @@ -375,7 +353,9 @@ mod tests {
use redis_test_macro::redis_test;
use std::time::Duration;

async fn setup_test() -> (
async fn setup_test(
readonly: bool,
) -> (
Config,
RedisAsyncConnection,
Vec<RedisPartition>,
Expand All @@ -387,6 +367,7 @@ mod tests {
config_provider_redis_total_partitions: 2,
checker_number: 0,
total_checkers: 1,
redis_readonly: readonly,
..Default::default()
};
let test_partitions: HashSet<u16> = vec![0, 1].into_iter().collect();
Expand All @@ -400,6 +381,7 @@ mod tests {
Duration::from_millis(10),
false,
config.redis_timeouts_ms,
config.redis_readonly,
)
.unwrap()
.get_partition_keys();
Expand All @@ -419,7 +401,7 @@ mod tests {

#[redis_test(start_paused = false)]
async fn test_redis_config_provider_load_no_configs() {
let (config, _, _, manager, shutdown) = setup_test().await;
let (config, _, _, manager, shutdown) = setup_test(false).await;
let _handle = run_config_provider(&config, manager.clone(), shutdown.clone());
tokio::time::sleep(Duration::from_millis(50)).await;

Expand All @@ -439,7 +421,7 @@ mod tests {

#[redis_test(start_paused = false)]
async fn test_redis_config_provider_load() {
let (config, mut conn, partitions, manager, shutdown) = setup_test().await;
let (config, mut conn, partitions, manager, shutdown) = setup_test(false).await;
assert_eq!(partitions.len(), 2);
let partition_configs = partitions
.iter()
Expand Down Expand Up @@ -526,7 +508,7 @@ mod tests {

#[redis_test(start_paused = false)]
async fn test_redis_config_provider_updates() {
let (config, conn, partitions, manager, shutdown) = setup_test().await;
let (config, conn, partitions, manager, shutdown) = setup_test(false).await;
assert_eq!(partitions.len(), 2);
let _handle = run_config_provider(&config, manager.clone(), shutdown.clone());

Expand Down Expand Up @@ -592,6 +574,96 @@ mod tests {
shutdown.cancel();
}

#[redis_test(start_paused = false)]
async fn test_redis_config_provider_updates_readonly() {
let (config, mut conn, partitions, manager, shutdown) = setup_test(true).await;
assert_eq!(partitions.len(), 2);

let partition_configs = partitions
.iter()
.map(|p| {
(
p,
CheckConfig {
subscription_id: Uuid::new_v4(),
interval: crate::types::check_config::CheckInterval::OneMinute,
..Default::default()
},
)
})
.collect::<Vec<_>>();
for (partition, config) in partition_configs.iter() {
let _: () = conn
.hset(
&partition.config_key,
config.redis_key(),
rmp_serde::to_vec(&config).unwrap(),
)
.await
.unwrap();
}

let _handle = run_config_provider(&config, manager.clone(), shutdown.clone());

tokio::time::sleep(Duration::from_millis(30)).await;

for partition in &partitions {
let configs = manager
.get_service(partition.number)
.get_config_store()
.read()
.unwrap()
.all_configs();

assert_eq!(configs.len(), 1);
}

let partition_configs = partitions
.iter()
.map(|p| {
(
p,
CheckConfig {
subscription_id: Uuid::new_v4(),
..Default::default()
},
)
})
.collect::<Vec<_>>();

// Test adding configs to different partitions
for (partition, config) in partition_configs.iter() {
send_update(conn.clone(), partition, config).await;
}
tokio::time::sleep(Duration::from_millis(15)).await;

// Verify upserts do nothing.
for (partition, _) in partition_configs.clone() {
let configs = manager
.get_service(partition.number)
.get_config_store()
.read()
.unwrap()
.all_configs();

assert_eq!(configs.len(), 1);
}

// Verify deletes do nothing.
let removed_config = partition_configs.first().unwrap();
send_delete(conn.clone(), removed_config.0, &removed_config.1).await;
tokio::time::sleep(Duration::from_millis(15)).await;
let configs = manager
.get_service(removed_config.0.number)
.get_config_store()
.read()
.unwrap()
.all_configs();
assert_eq!(1, configs.len());

shutdown.cancel();
}

fn run_determine_owned_partition_test(
total_partitions: u16,
checker_number: u16,
Expand Down
13 changes: 10 additions & 3 deletions src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::check_executor::{run_executor, CheckSender, ExecutorConfig};
use crate::checker::HttpChecker;
use crate::config_waiter::wait_for_partition_boot;
use crate::producer::kafka_producer::KafkaResultsProducer;
use crate::redis::build_redis_client;
use crate::{
app::config::Config,
checker::reqwest_checker::ReqwestChecker,
Expand Down Expand Up @@ -61,17 +62,23 @@ impl PartitionedService {
config.region,
);

let client = build_redis_client(
&config.redis_host,
config.redis_enable_cluster,
config.redis_timeouts_ms,
config.redis_readonly,
)
.expect("could not build reddis client");

let scheduler_join_handle = run_scheduler(
partition,
config_store.clone(),
executor_sender,
shutdown_signal.clone(),
build_progress_key(partition),
config.redis_host.clone(),
client,
config_loaded,
config.region,
config.redis_enable_cluster,
config.redis_timeouts_ms,
tasks_finished_tx,
);

Expand Down
Loading
Loading