Skip to content

Commit 78cb857

Browse files
committed
feat(analytics): fix formatting issues
1 parent d98649a commit 78cb857

File tree

5 files changed

+105
-69
lines changed

5 files changed

+105
-69
lines changed

src/analytics/events.rs

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,7 @@ pub struct RoutingEvent {
2525
}
2626

2727
impl RoutingEvent {
28-
pub fn new(
29-
merchant_id: String,
30-
request_id: String,
31-
endpoint: String,
32-
method: String,
33-
) -> Self {
28+
pub fn new(merchant_id: String, request_id: String, endpoint: String, method: String) -> Self {
3429
Self {
3530
event_id: Uuid::new_v4().to_string(),
3631
merchant_id,
@@ -137,17 +132,21 @@ impl RoutingEvent {
137132
if !self.response_payload.is_empty() {
138133
if let Ok(response_json) = serde_json::from_str::<Value>(&self.response_payload) {
139134
// Try to extract gateway from various possible response structures
140-
if let Some(gateway) = response_json.get("gateway")
135+
if let Some(gateway) = response_json
136+
.get("gateway")
141137
.or_else(|| response_json.get("selected_gateway"))
142138
.or_else(|| response_json.get("connector"))
143-
.and_then(|v| v.as_str()) {
139+
.and_then(|v| v.as_str())
140+
{
144141
self.gateway_selected = Some(gateway.to_string());
145142
}
146143

147144
// Try to extract routing algorithm ID
148-
if let Some(algo_id) = response_json.get("routing_algorithm_id")
145+
if let Some(algo_id) = response_json
146+
.get("routing_algorithm_id")
149147
.or_else(|| response_json.get("algorithm_id"))
150-
.and_then(|v| v.as_str()) {
148+
.and_then(|v| v.as_str())
149+
{
151150
self.routing_algorithm_id = Some(algo_id.to_string());
152151
}
153152
}
@@ -159,10 +158,12 @@ impl RoutingEvent {
159158
pub fn extract_error_from_response(&mut self) -> AnalyticsResult<()> {
160159
if self.status_code >= 400 && !self.response_payload.is_empty() {
161160
if let Ok(response_json) = serde_json::from_str::<Value>(&self.response_payload) {
162-
if let Some(error) = response_json.get("error")
161+
if let Some(error) = response_json
162+
.get("error")
163163
.or_else(|| response_json.get("message"))
164164
.or_else(|| response_json.get("error_message"))
165-
.and_then(|v| v.as_str()) {
165+
.and_then(|v| v.as_str())
166+
{
166167
self.error_message = Some(error.to_string());
167168
}
168169
}
@@ -199,7 +200,10 @@ fn extract_ip_address(request: &Request) -> Option<String> {
199200
.and_then(|v| v.to_str().ok())
200201
.map(|s| {
201202
// Take the first IP if there are multiple (comma-separated)
202-
s.split(',').next().map(|ip| ip.trim().to_string()).unwrap_or_else(|| String::new())
203+
s.split(',')
204+
.next()
205+
.map(|ip| ip.trim().to_string())
206+
.unwrap_or_else(|| String::new())
203207
})
204208
}
205209

@@ -274,6 +278,9 @@ mod tests {
274278

275279
event.extract_error_from_response().unwrap();
276280

277-
assert_eq!(event.error_message, Some("Gateway not available".to_string()));
281+
assert_eq!(
282+
event.error_message,
283+
Some("Gateway not available".to_string())
284+
);
278285
}
279286
}

src/analytics/kafka_producer.rs

Lines changed: 49 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::analytics::{AnalyticsError, AnalyticsResult, KafkaConfig, RoutingEven
22
use kafka::producer::{Producer, Record, RequiredAcks};
33
use std::time::Duration;
44
use tokio::sync::mpsc;
5-
use tracing::{error, info, warn, debug};
5+
use tracing::{debug, error, info, warn};
66

77
#[derive(Clone)]
88
pub struct KafkaProducer {
@@ -13,39 +13,48 @@ pub struct KafkaProducer {
1313
impl KafkaProducer {
1414
pub fn new(config: KafkaConfig) -> AnalyticsResult<Self> {
1515
let topic = format!("{}-routing-events", config.topic_prefix);
16-
16+
1717
// Validate broker configuration
1818
if config.brokers.is_empty() {
1919
return Err(AnalyticsError::Configuration(
20-
"No Kafka brokers configured".to_string()
20+
"No Kafka brokers configured".to_string(),
2121
));
2222
}
23-
24-
debug!("Initializing Kafka producer with brokers: {:?}", config.brokers);
25-
23+
24+
debug!(
25+
"Initializing Kafka producer with brokers: {:?}",
26+
config.brokers
27+
);
28+
2629
Ok(Self { config, topic })
2730
}
2831

2932
/// Test Kafka connectivity
3033
pub async fn test_connection(&self) -> AnalyticsResult<()> {
31-
debug!("Testing Kafka connection to brokers: {:?}", self.config.brokers);
32-
34+
debug!(
35+
"Testing Kafka connection to brokers: {:?}",
36+
self.config.brokers
37+
);
38+
3339
let producer = Producer::from_hosts(self.config.brokers.clone())
3440
.with_ack_timeout(Duration::from_secs(5))
3541
.with_required_acks(RequiredAcks::One)
3642
.create()
3743
.map_err(|e| {
38-
error!("Failed to create Kafka producer for connection test: {:?}", e);
44+
error!(
45+
"Failed to create Kafka producer for connection test: {:?}",
46+
e
47+
);
3948
AnalyticsError::Kafka(e)
4049
})?;
41-
50+
4251
info!("Kafka connection test successful");
4352
Ok(())
4453
}
4554

4655
pub async fn send_event(&self, event: &RoutingEventData) -> AnalyticsResult<()> {
4756
let json_data = serde_json::to_string(event)?;
48-
57+
4958
// Create producer with configuration
5059
let mut producer = Producer::from_hosts(self.config.brokers.clone())
5160
.with_ack_timeout(Duration::from_secs(1))
@@ -54,11 +63,10 @@ impl KafkaProducer {
5463
.map_err(AnalyticsError::Kafka)?;
5564

5665
// Send the record
57-
let record = Record::from_key_value(&self.topic, event.event_id.as_bytes(), json_data.as_bytes());
58-
59-
producer
60-
.send(&record)
61-
.map_err(AnalyticsError::Kafka)?;
66+
let record =
67+
Record::from_key_value(&self.topic, event.event_id.as_bytes(), json_data.as_bytes());
68+
69+
producer.send(&record).map_err(AnalyticsError::Kafka)?;
6270

6371
Ok(())
6472
}
@@ -80,15 +88,28 @@ impl KafkaProducer {
8088

8189
for (index, event) in events.iter().enumerate() {
8290
let json_data = serde_json::to_string(event)?;
83-
let record = Record::from_key_value(&self.topic, event.event_id.as_bytes(), json_data.as_bytes());
84-
91+
let record = Record::from_key_value(
92+
&self.topic,
93+
event.event_id.as_bytes(),
94+
json_data.as_bytes(),
95+
);
96+
8597
if let Err(e) = producer.send(&record) {
86-
error!("Failed to send event {} of {} to Kafka: {:?}", index + 1, events.len(), e);
98+
error!(
99+
"Failed to send event {} of {} to Kafka: {:?}",
100+
index + 1,
101+
events.len(),
102+
e
103+
);
87104
return Err(AnalyticsError::Kafka(e));
88105
}
89106
}
90107

91-
info!("Successfully sent {} events to Kafka topic: {}", events.len(), self.topic);
108+
info!(
109+
"Successfully sent {} events to Kafka topic: {}",
110+
events.len(),
111+
self.topic
112+
);
92113
Ok(())
93114
}
94115

@@ -97,7 +118,10 @@ impl KafkaProducer {
97118
match self.send_events_batch(events).await {
98119
Ok(()) => true,
99120
Err(e) => {
100-
warn!("Failed to send events batch to Kafka, continuing without analytics: {:?}", e);
121+
warn!(
122+
"Failed to send events batch to Kafka, continuing without analytics: {:?}",
123+
e
124+
);
101125
false
102126
}
103127
}
@@ -126,7 +150,7 @@ impl KafkaProducer {
126150
match event {
127151
Some(event) => {
128152
batch.push(event);
129-
153+
130154
// Flush if batch is full
131155
if batch.len() >= batch_size {
132156
let success = producer.send_events_batch_graceful(&batch).await;
@@ -135,7 +159,7 @@ impl KafkaProducer {
135159
} else {
136160
consecutive_failures += 1;
137161
if consecutive_failures >= max_consecutive_failures {
138-
warn!("Too many consecutive Kafka failures ({}), continuing to collect events but not sending",
162+
warn!("Too many consecutive Kafka failures ({}), continuing to collect events but not sending",
139163
consecutive_failures);
140164
}
141165
}
@@ -154,7 +178,7 @@ impl KafkaProducer {
154178
}
155179
}
156180
}
157-
181+
158182
// Timeout-based flush
159183
_ = tokio::time::sleep_until(last_flush + batch_timeout) => {
160184
if !batch.is_empty() {
@@ -164,7 +188,7 @@ impl KafkaProducer {
164188
} else {
165189
consecutive_failures += 1;
166190
if consecutive_failures >= max_consecutive_failures {
167-
warn!("Too many consecutive Kafka failures ({}), continuing to collect events but not sending",
191+
warn!("Too many consecutive Kafka failures ({}), continuing to collect events but not sending",
168192
consecutive_failures);
169193
}
170194
}

src/analytics/middleware.rs

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@ pub async fn analytics_middleware(
2424
}
2525

2626
let start_time = Instant::now();
27-
27+
2828
// Extract request information
2929
let method = request.method().to_string();
3030
let endpoint = path.to_string();
31-
31+
3232
// Extract merchant ID from request headers or body (simplified for now)
3333
let merchant_id = extract_merchant_id(&request).unwrap_or("public".to_string());
34-
34+
3535
// Get the tenant app state to access analytics client
3636
let tenant_app_state = match global_app_state.get_app_state_of_tenant(&merchant_id).await {
3737
Ok(state) => state,
@@ -46,62 +46,66 @@ pub async fn analytics_middleware(
4646
}
4747
}
4848
};
49-
49+
5050
// Create routing event
5151
let mut routing_event = RoutingEvent::from_request(&request, merchant_id.clone());
52-
52+
5353
// Extract request body for logging
5454
let (request_parts, body) = request.into_parts();
5555
let body_bytes = match body.collect().await {
5656
Ok(collected) => collected.to_bytes(),
5757
Err(_) => Bytes::new(),
5858
};
59-
59+
6060
let request_payload = String::from_utf8_lossy(&body_bytes).to_string();
6161
routing_event = routing_event.with_request_payload(&request_payload);
62-
62+
6363
// Reconstruct request with body
6464
let request = Request::from_parts(request_parts, Body::from(body_bytes));
65-
65+
6666
// Process the request
6767
let response = next.run(request).await;
68-
68+
6969
// Calculate processing time
7070
let processing_time = start_time.elapsed().as_millis() as u32;
71-
71+
7272
// Extract response information
7373
let status_code = response.status().as_u16();
74-
75-
// Extract response body for logging. Note: This operation can lead to high memory usage
74+
75+
// Extract response body for logging. Note: This operation can lead to high memory usage
7676
let (response_parts, body) = response.into_parts();
7777
let body_bytes = match body.collect().await {
7878
Ok(collected) => collected.to_bytes(),
7979
Err(_) => Bytes::new(),
8080
};
81-
81+
8282
let response_payload = String::from_utf8_lossy(&body_bytes).to_string();
83-
83+
8484
// Complete the routing event
8585
routing_event = routing_event
8686
.with_response_payload(&response_payload)
8787
.with_status_code(status_code)
8888
.with_processing_time(processing_time);
89-
89+
9090
// Extract gateway information from response
9191
if let Err(e) = routing_event.extract_gateway_from_response() {
9292
warn!("Failed to extract gateway from response: {:?}", e);
9393
}
94-
94+
9595
// Extract error information if status indicates failure
9696
if let Err(e) = routing_event.extract_error_from_response() {
9797
warn!("Failed to extract error from response: {:?}", e);
9898
}
99-
99+
100100
// Send event to analytics (async, non-blocking)
101-
if let Err(e) = tenant_app_state.analytics_client.track_routing_event(routing_event).await {
101+
if let Err(e) = tenant_app_state
102+
.analytics_client
103+
.track_routing_event(routing_event)
104+
.await
105+
{
102106
error!("Failed to track routing event: {:?}", e);
103107
}
104-
108+
105109
// Reconstruct response
106110
Response::from_parts(response_parts, Body::from(body_bytes))
107111
}
@@ -119,14 +123,14 @@ fn extract_merchant_id(request: &Request) -> Option<String> {
119123
return Some(merchant_id_str.to_string());
120124
}
121125
}
122-
126+
123127
// Try x-tenant-id header as fallback
124128
if let Some(tenant_id) = request.headers().get("x-tenant-id") {
125129
if let Ok(tenant_id_str) = tenant_id.to_str() {
126130
return Some(tenant_id_str.to_string());
127131
}
128132
}
129-
133+
130134
// Default to "public" tenant
131135
Some("public".to_string())
132136
}
@@ -146,15 +150,15 @@ mod tests {
146150
#[test]
147151
fn test_extract_merchant_id_from_header() {
148152
use axum::http::{HeaderMap, HeaderValue};
149-
153+
150154
let mut headers = HeaderMap::new();
151155
headers.insert("x-merchant-id", HeaderValue::from_static("merchant-123"));
152-
156+
153157
let request = Request::builder()
154158
.uri("/routing/evaluate")
155159
.body(Body::empty())
156160
.unwrap();
157-
161+
158162
// Note: This test would need to be adjusted to work with the actual request structure
159163
// For now, it's a placeholder to show the testing approach
160164
}

0 commit comments

Comments
 (0)