Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/observability/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ The Dynamo HTTP Frontend (`python -m dynamo.frontend`) exposes `dynamo_frontend_
- `dynamo_frontend_queued_requests`: Number of requests in HTTP processing queue (gauge)
- `dynamo_frontend_disconnected_clients`: Number of disconnected clients (gauge)
- `dynamo_frontend_input_sequence_tokens`: Input sequence length (histogram)
- `dynamo_frontend_cached_sequence_length`: Number of cached tokens (prefix cache hits) per request (histogram)
- `dynamo_frontend_inter_token_latency_seconds`: Inter-token latency (histogram)
- `dynamo_frontend_output_sequence_tokens`: Output sequence length (histogram)
- `dynamo_frontend_output_tokens_total`: Total number of output tokens generated (counter)
Expand Down
6 changes: 6 additions & 0 deletions lib/bindings/python/src/dynamo/prometheus_names.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class frontend_service:
INPUT_SEQUENCE_TOKENS = "input_sequence_tokens"
# Output sequence length in tokens
OUTPUT_SEQUENCE_TOKENS = "output_sequence_tokens"
# Number of cached tokens (prefix cache hits) per request
CACHED_SEQUENCE_LENGTH = "cached_sequence_length"
# Total number of output tokens generated (counter that updates in real-time)
OUTPUT_TOKENS_TOTAL = "output_tokens_total"
# Time to first token in seconds
Expand Down Expand Up @@ -93,6 +95,10 @@ class kvbm:
ONBOARD_BLOCKS_D2D = "onboard_blocks_d2d"
# The number of matched tokens
MATCHED_TOKENS = "matched_tokens"
# Host cache hit rate (0.0-1.0) from the sliding window
HOST_CACHE_HIT_RATE = "host_cache_hit_rate"
# Disk cache hit rate (0.0-1.0) from the sliding window
DISK_CACHE_HIT_RATE = "disk_cache_hit_rate"


class kvrouter:
Expand Down
208 changes: 203 additions & 5 deletions lib/llm/src/http/service/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ pub struct Metrics {
request_duration: HistogramVec,
input_sequence_length: HistogramVec,
output_sequence_length: HistogramVec,
cached_sequence_length: HistogramVec,
output_tokens_counter: IntCounterVec,
time_to_first_token: HistogramVec,
inter_token_latency: HistogramVec,
Expand Down Expand Up @@ -252,6 +253,8 @@ pub struct ResponseMetricCollector {
// be computed.
last_response_time: Option<Duration>,
osl: usize,
// we track if cached_tokens has been observed to ensure we only increment once per request
cached_tokens_observed: bool,
}

impl Default for Metrics {
Expand Down Expand Up @@ -378,7 +381,7 @@ impl Metrics {
frontend_metric_name(frontend_service::INPUT_SEQUENCE_TOKENS),
"Input sequence length in tokens",
)
.buckets(input_sequence_buckets),
.buckets(input_sequence_buckets.clone()),
&["model"],
)
.unwrap();
Expand Down Expand Up @@ -436,6 +439,16 @@ impl Metrics {
)
.unwrap();

let cached_sequence_length = HistogramVec::new(
HistogramOpts::new(
frontend_metric_name(frontend_service::CACHED_SEQUENCE_LENGTH),
"Number of cached tokens (prefix cache hits) per request",
)
.buckets(input_sequence_buckets.clone()),
&["model"],
)
.unwrap();

// Runtime configuration metrics
// Note: Some of these metrics represent counter-like values from source systems,
// but are implemented as gauges because they are copied/synchronized from upstream
Expand Down Expand Up @@ -502,6 +515,7 @@ impl Metrics {
request_duration,
input_sequence_length,
output_sequence_length,
cached_sequence_length,
output_tokens_counter,
time_to_first_token,
inter_token_latency,
Expand Down Expand Up @@ -597,6 +611,7 @@ impl Metrics {
registry.register(Box::new(self.request_duration.clone()))?;
registry.register(Box::new(self.input_sequence_length.clone()))?;
registry.register(Box::new(self.output_sequence_length.clone()))?;
registry.register(Box::new(self.cached_sequence_length.clone()))?;
registry.register(Box::new(self.output_tokens_counter.clone()))?;
registry.register(Box::new(self.time_to_first_token.clone()))?;
registry.register(Box::new(self.inter_token_latency.clone()))?;
Expand Down Expand Up @@ -830,6 +845,7 @@ impl ResponseMetricCollector {
last_response_time: None,
start_time: Instant::now(),
osl: 0,
cached_tokens_observed: false,
}
}

Expand All @@ -843,6 +859,17 @@ impl ResponseMetricCollector {
self.is_first_token
}

/// Observe cached tokens (prefix cache hits), observing only once per request when value is available
pub fn observe_cached_tokens(&mut self, cached_tokens: Option<usize>) {
if let Some(tokens) = cached_tokens && !self.cached_tokens_observed {
self.cached_tokens_observed = true;
self.metrics
.cached_sequence_length
.with_label_values(&[&self.model])
.observe(tokens as f64);
}
}

/// Observe a response with input sequence length and number of new tokens
pub fn observe_response(&mut self, isl: usize, num_tokens: usize) {
if num_tokens == 0 {
Expand Down Expand Up @@ -943,18 +970,21 @@ impl<T> From<crate::types::Annotated<T>> for EventConverter<T> {
///
/// This function handles metrics collection, http_queue_guard management, and converts
/// annotated responses to SSE events for streaming responses.
///
/// Returns None for service events (events with no data and no event type) to filter them out.
pub fn process_response_using_event_converter_and_observe_metrics<T: Serialize>(
annotated: EventConverter<T>,
response_collector: &mut ResponseMetricCollector,
http_queue_guard: &mut Option<HttpQueueGuard>,
) -> Result<Event, axum::Error> {
) -> Result<Option<Event>, axum::Error> {
use crate::preprocessor::LLMMetricAnnotation;

let mut annotated = annotated.0;

// update metrics
if let Ok(Some(metrics)) = LLMMetricAnnotation::from_annotation(&annotated) {
response_collector.observe_current_osl(metrics.output_tokens);
response_collector.observe_cached_tokens(metrics.cached_tokens);

// Drop http_queue_guard on first token for streaming
if response_collector.is_first_token()
Expand All @@ -976,11 +1006,11 @@ pub fn process_response_using_event_converter_and_observe_metrics<T: Serialize>(

let mut event = Event::default();

if let Some(data) = annotated.data {
if let Some(ref data) = annotated.data {
event = event.json_data(data)?;
}

if let Some(msg) = annotated.event {
if let Some(ref msg) = annotated.event {
if msg == "error" {
let msgs = annotated
.comment
Expand All @@ -996,7 +1026,12 @@ pub fn process_response_using_event_converter_and_observe_metrics<T: Serialize>(
}
}

Ok(event)
// Filter out service events (events with no data and no event type)
if annotated.data.is_none() && annotated.event.is_none() {
Ok(None)
} else {
Ok(Some(event))
}
}

/// Create a new router with optional custom backend metrics support
Expand Down Expand Up @@ -1357,4 +1392,167 @@ mod tests {
20
);
}

#[test]
fn test_cached_tokens_counter_increments() {
let metrics = Arc::new(Metrics::new());
let registry = prometheus::Registry::new();
metrics.register(&registry).unwrap();

let model = "test-model";
let expected_metric_name = "dynamo_frontend_cached_sequence_length";
let mut collector = metrics.clone().create_response_collector(model);

// Create histogram handle first (this registers it with the registry)
let _histogram = metrics.cached_sequence_length.with_label_values(&[model]);

// Observe cached tokens
collector.observe_cached_tokens(Some(100));

// Verify histogram recorded the observation
let metric_families = registry.gather();
let histogram_family = metric_families
.iter()
.find(|mf| mf.name() == expected_metric_name)
.expect("histogram should be registered");
assert_eq!(histogram_family.get_metric().len(), 1);
assert_eq!(
histogram_family.get_metric()[0]
.get_histogram()
.get_sample_count(),
1
);

// Observe more cached tokens with the same collector (should be idempotent)
collector.observe_cached_tokens(Some(50));

// Sample count should remain 1 (idempotent)
let metric_families = registry.gather();
let histogram_family = metric_families
.iter()
.find(|mf| mf.name() == expected_metric_name)
.expect("histogram should be registered");
assert_eq!(
histogram_family.get_metric()[0]
.get_histogram()
.get_sample_count(),
1
);
}

#[test]
fn test_cached_tokens_once_per_request() {
let metrics = Arc::new(Metrics::new());
let registry = prometheus::Registry::new();
metrics.register(&registry).unwrap();

let model = "test-model";
let expected_metric_name = "dynamo_frontend_cached_sequence_length";
let mut collector = metrics.clone().create_response_collector(model);

// Create histogram handle first
let _histogram = metrics.cached_sequence_length.with_label_values(&[model]);

// First call should observe and record 1 sample
collector.observe_cached_tokens(Some(100));
let metric_families = registry.gather();
let histogram_family = metric_families
.iter()
.find(|mf| mf.name() == expected_metric_name)
.expect("histogram should be registered");
assert_eq!(
histogram_family.get_metric()[0]
.get_histogram()
.get_sample_count(),
1
);

// Second call with same collector should not observe again (idempotent)
collector.observe_cached_tokens(Some(50));
let metric_families = registry.gather();
let histogram_family = metric_families
.iter()
.find(|mf| mf.name() == expected_metric_name)
.expect("histogram should be registered");
assert_eq!(
histogram_family.get_metric()[0]
.get_histogram()
.get_sample_count(),
1
);

// Third call with different value should still be idempotent
collector.observe_cached_tokens(Some(75));
let metric_families = registry.gather();
let histogram_family = metric_families
.iter()
.find(|mf| mf.name() == expected_metric_name)
.expect("histogram should be registered");
assert_eq!(
histogram_family.get_metric()[0]
.get_histogram()
.get_sample_count(),
1
);
}

#[test]
fn test_ghost_event_handling() {
use crate::preprocessor::LLMMetricAnnotation;
use crate::types::Annotated;

let metrics = Arc::new(Metrics::new());
let registry = prometheus::Registry::new();
metrics.register(&registry).unwrap();

let model = "test-model";
let expected_metric_name = "dynamo_frontend_cached_sequence_length";
let mut collector = metrics.clone().create_response_collector(model);

// Create a service event (ghost event) with metrics annotation but no data/event
let mut annotated = Annotated::<
crate::protocols::openai::chat_completions::NvCreateChatCompletionStreamResponse,
> {
id: None,
data: None,
event: Some(crate::preprocessor::ANNOTATION_LLM_METRICS.to_string()),
comment: None,
};

// Add metrics annotation with cached_tokens
let llm_metrics = LLMMetricAnnotation {
input_tokens: 10,
output_tokens: 20,
chunk_tokens: 5,
cached_tokens: Some(15),
};

let annotation = llm_metrics.to_annotation::<()>().unwrap();
annotated.event = annotation.event;
annotated.comment = annotation.comment;

// Process the event
let mut http_queue_guard = None;
let result = process_response_using_event_converter_and_observe_metrics(
EventConverter::from(annotated),
&mut collector,
&mut http_queue_guard,
);

// Should return Ok(None) for service events
assert!(matches!(result, Ok(None)));

// Should have observed the cached tokens from the ghost event
let metric_families = registry.gather();
let histogram_family = metric_families
.iter()
.find(|mf| mf.name() == expected_metric_name)
.expect("histogram should be registered");
assert_eq!(
histogram_family.get_metric()[0]
.get_histogram()
.get_sample_count(),
1
);
}
}
Loading
Loading