Skip to content

Commit 208519e

Browse files
authored
feat(uptime): add assertion checks to check_executor (#437)
1 parent 50f88aa commit 208519e

File tree

14 files changed

+668
-404
lines changed

14 files changed

+668
-404
lines changed

Cargo.lock

Lines changed: 22 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ iprange = "0.6.7"
6565
http = "1.2.0"
6666
relay-pattern = { git = "https://github.com/getsentry/relay", rev = "7ba03b5f6ffe95c2429ba315f661d2b8ee3c0efe" }
6767
pest = "2.7.15"
68-
axum = "0.8.6"
68+
axum = { version = "0.8.6", features = ["macros"] }
69+
associative-cache = "2.0.0"
6970

7071
[patch.crates-io]
7172
rdkafka = { git = "https://github.com/fede1024/rust-rdkafka" }
@@ -82,3 +83,5 @@ redis-test-macro = { path = "./redis-test-macro" }
8283
ntest = "0.9.3"
8384
socket-server-mocker = "0.5.0"
8485
httpmock = "0.7.0-rc.1"
86+
tower = { version = "0.5.2", features = ["util"] }
87+
http-body-util = "0.1.0"

src/assertions/cache.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
use crate::assertions::{self, compiled};
2+
use associative_cache::{Capacity8192, HashDirectMapped, LruReplacement, LruTimestamp};
3+
use std::{
4+
fmt,
5+
sync::{Arc, RwLock},
6+
time::Instant,
7+
};
8+
9+
pub struct CacheEntry {
10+
assertion: Arc<compiled::Assertion>,
11+
timestamp: RwLock<Instant>,
12+
}
13+
14+
impl LruTimestamp for CacheEntry {
15+
type Timestamp<'a> = Instant;
16+
17+
fn get_timestamp(&self) -> Self::Timestamp<'_> {
18+
*self.timestamp.read().expect("not poisoned")
19+
}
20+
21+
fn update_timestamp(&self) {
22+
*self.timestamp.write().expect("not poisoned") = Instant::now();
23+
}
24+
}
25+
26+
pub struct Cache {
27+
compiled: Arc<RwLock<AssocCache>>,
28+
}
29+
30+
impl fmt::Debug for Cache {
31+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32+
f.debug_struct("Cache").finish()
33+
}
34+
}
35+
36+
type AssocCache = associative_cache::AssociativeCache<
37+
assertions::Assertion,
38+
CacheEntry,
39+
Capacity8192,
40+
HashDirectMapped,
41+
LruReplacement,
42+
>;
43+
44+
impl Cache {
45+
pub fn new() -> Self {
46+
Self {
47+
compiled: Arc::new(RwLock::new(AssocCache::default())),
48+
}
49+
}
50+
pub fn get_or_compile(
51+
&self,
52+
key: &assertions::Assertion,
53+
) -> Result<Arc<compiled::Assertion>, compiled::Error> {
54+
if let Some(entry) = self.compiled.read().expect("not poisoned").get(key) {
55+
return Ok(entry.assertion.clone());
56+
}
57+
58+
let comp = compiled::compile(key);
59+
match comp {
60+
Ok(comp) => {
61+
let mut wl = self.compiled.write().expect("not poisoned");
62+
let comp = Arc::new(comp);
63+
wl.insert(
64+
key.clone(),
65+
CacheEntry {
66+
assertion: comp.clone(),
67+
timestamp: RwLock::new(Instant::now()),
68+
},
69+
);
70+
71+
Ok(comp)
72+
}
73+
Err(err) => Err(err),
74+
}
75+
}
76+
}

src/assertions/compiled.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use jsonpath_rust::{
33
parser::{errors::JsonPathError, model::JpQuery, parse_json_path},
44
query::js_path_process,
55
};
6+
use serde::Serialize;
67
use std::{
78
borrow::{Borrow, BorrowMut},
89
str::FromStr,
@@ -11,7 +12,7 @@ use std::{
1112
const GLOB_COMPLEXITY_LIMIT: u64 = 20;
1213
const ASSERTION_MAX_GAS: u32 = 100;
1314

14-
#[derive(thiserror::Error, Debug)]
15+
#[derive(thiserror::Error, Debug, Serialize)]
1516
pub enum Error {
1617
#[error("Invalid glob: {0}")]
1718
InvalidGlob(String),
@@ -58,7 +59,7 @@ impl BorrowMut<u32> for Gas {
5859
}
5960
}
6061

61-
#[derive(Debug, Clone, PartialEq, Eq)]
62+
#[derive(Debug, PartialEq, Eq)]
6263
pub struct Assertion {
6364
root: Op,
6465
}
@@ -75,7 +76,7 @@ impl Assertion {
7576
}
7677
}
7778

78-
#[derive(Debug, Clone, PartialEq, Eq)]
79+
#[derive(Debug, PartialEq, Eq)]
7980
enum Comparison {
8081
LessThan,
8182
GreaterThan,
@@ -94,7 +95,7 @@ impl From<&super::Comparison> for Comparison {
9495
}
9596
}
9697

97-
#[derive(Debug, Clone, PartialEq, Eq)]
98+
#[derive(Debug, PartialEq, Eq)]
9899
enum HeaderOperand {
99100
Literal { value: Value },
100101
Glob { value: relay_pattern::Pattern },
@@ -123,7 +124,7 @@ impl TryFrom<&super::HeaderOperand> for HeaderOperand {
123124
}
124125
}
125126

126-
#[derive(Debug, Clone, PartialEq)]
127+
#[derive(Debug, PartialEq)]
127128
enum Value {
128129
I64(i64),
129130
F64(f64),
@@ -132,7 +133,7 @@ enum Value {
132133

133134
impl Eq for Value {}
134135

135-
#[derive(Debug, Clone, PartialEq, Eq)]
136+
#[derive(Debug, PartialEq, Eq)]
136137
enum HeaderComparison {
137138
Always,
138139
Never,
@@ -261,7 +262,7 @@ impl HeaderComparison {
261262
}
262263
}
263264

264-
#[derive(Debug, Clone, PartialEq)]
265+
#[derive(Debug, PartialEq)]
265266
enum Op {
266267
And {
267268
children: Vec<Op>,

src/assertions/mod.rs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,39 @@
1+
pub mod cache;
12
pub mod compiled;
23

34
use serde::{Deserialize, Serialize};
45

5-
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
6+
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
67
#[serde(rename_all = "snake_case")]
78
pub struct Assertion {
8-
root: Op,
9+
pub(crate) root: Op,
910
}
1011

11-
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
12+
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
1213
#[serde(tag = "cmp", rename_all = "snake_case")]
13-
enum Comparison {
14+
pub(crate) enum Comparison {
1415
LessThan,
1516
GreaterThan,
1617
Equal,
1718
NotEqual,
1819
}
1920

20-
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
21+
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
2122
#[serde(rename_all = "snake_case")]
22-
struct GlobPattern {
23-
value: String,
23+
pub(crate) struct GlobPattern {
24+
pub(crate) value: String,
2425
}
2526

26-
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
27+
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
2728
#[serde(tag = "header_op", rename_all = "snake_case")]
28-
enum HeaderOperand {
29+
pub(crate) enum HeaderOperand {
2930
Literal { value: String },
3031
Glob { pattern: GlobPattern },
3132
}
3233

33-
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
34+
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
3435
#[serde(tag = "header_cmp", rename_all = "snake_case")]
35-
enum HeaderComparison {
36+
pub(crate) enum HeaderComparison {
3637
Always,
3738
Never,
3839
Equals { test_value: HeaderOperand },
@@ -41,9 +42,9 @@ enum HeaderComparison {
4142
GreaterThan { test_value: String },
4243
}
4344

44-
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
45+
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
4546
#[serde(tag = "op", rename_all = "snake_case")]
46-
enum Op {
47+
pub(crate) enum Op {
4748
And {
4849
children: Vec<Op>,
4950
},

src/check_config_provider/redis_config_provider.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ impl RedisConfigProvider {
168168
.get_config_store()
169169
.write()
170170
.unwrap()
171-
.add_config(Arc::new(config));
171+
.add_config(config);
172172
}
173173
let partition_loading_time = partition_start_loading.elapsed().as_secs_f64();
174174
metrics::histogram!(
@@ -267,7 +267,7 @@ impl RedisConfigProvider {
267267
.get_config_store()
268268
.write()
269269
.unwrap()
270-
.add_config(Arc::new(config));
270+
.add_config(config);
271271
}
272272
let partition_update_duration = partition_update_start.elapsed().as_secs_f64();
273273
metrics::histogram!(

src/check_executor.rs

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use crate::checker::HttpChecker;
1616
use crate::config_store::Tick;
1717
use crate::producer::ResultsProducer;
1818
use crate::types::check_config::CheckConfig;
19-
use crate::types::result::{CheckResult, CheckStatus, CheckStatusReasonType};
19+
use crate::types::result::{CheckResult, CheckStatus};
2020

2121
const SLOW_POLL_THRESHOLD: Duration = Duration::from_millis(100);
2222
const LONG_DELAY_THRESHOLD: Duration = Duration::from_millis(100);
@@ -38,6 +38,21 @@ pub struct ScheduledCheck {
3838
}
3939

4040
impl ScheduledCheck {
41+
pub fn new(
42+
kind: CheckKind,
43+
tick: Tick,
44+
config: Arc<CheckConfig>,
45+
resolve_tx: Sender<Option<CheckResult>>,
46+
) -> ScheduledCheck {
47+
ScheduledCheck {
48+
kind,
49+
tick,
50+
config,
51+
resolve_tx,
52+
retry_count: 0,
53+
}
54+
}
55+
4156
#[cfg(test)]
4257
pub fn new_for_test(tick: Tick, config: CheckConfig) -> Self {
4358
let (resolve_tx, _) = tokio::sync::oneshot::channel();
@@ -107,13 +122,7 @@ impl CheckSender {
107122
) -> anyhow::Result<Receiver<Option<CheckResult>>> {
108123
let (resolve_tx, resolve_rx) = oneshot::channel();
109124

110-
let scheduled_check = ScheduledCheck {
111-
tick,
112-
config,
113-
resolve_tx,
114-
retry_count: 0,
115-
kind: check_kind,
116-
};
125+
let scheduled_check = ScheduledCheck::new(check_kind, tick, config, resolve_tx);
117126

118127
self.queue_size.fetch_add(1, Ordering::Relaxed);
119128

@@ -284,7 +293,7 @@ async fn executor_loop(
284293
}
285294

286295
#[allow(clippy::too_many_arguments)]
287-
async fn do_check(
296+
pub(crate) async fn do_check(
288297
failure_retries: u16,
289298
scheduled_check: ScheduledCheck,
290299
job_checker: Arc<HttpChecker>,
@@ -367,15 +376,7 @@ fn record_result_metrics(result: &CheckResult, is_retry: bool, will_retry: bool)
367376
CheckStatus::MissedWindow => "missed_window",
368377
CheckStatus::DisallowedByRobots => "disallowed_by_robots",
369378
};
370-
let failure_reason = match status_reason.as_ref().map(|r| r.status_type) {
371-
Some(CheckStatusReasonType::Failure) => Some("failure"),
372-
Some(CheckStatusReasonType::DnsError) => Some("dns_error"),
373-
Some(CheckStatusReasonType::Timeout) => Some("timeout"),
374-
Some(CheckStatusReasonType::TlsError) => Some("tls_error"),
375-
Some(CheckStatusReasonType::ConnectionError) => Some("connection_error"),
376-
Some(CheckStatusReasonType::RedirectError) => Some("redirect_error"),
377-
None => None,
378-
};
379+
let failure_reason = status_reason.as_ref().map(|r| r.status_type.as_str());
379380
let status_code = match request_info.as_ref().and_then(|a| a.http_status_code) {
380381
None => "none".to_string(),
381382
Some(status) => status.to_string(),

0 commit comments

Comments
 (0)