diff --git a/Cargo.lock b/Cargo.lock index d7f0734f..997a4bc7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -999,6 +999,12 @@ dependencies = [ "half", ] +[[package]] +name = "cityhash-rs" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93a719913643003b84bd13022b4b7e703c09342cd03b679c4641c7d2e50dc34d" + [[package]] name = "clang-sys" version = "1.8.1" @@ -1035,6 +1041,45 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6" +[[package]] +name = "clickhouse" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3093f817c4f81c8bd174ed8dd30eac785821a8a7eef27a7dcb7f8cd0d0f6548" +dependencies = [ + "bstr", + "bytes", + "cityhash-rs", + "clickhouse-derive", + "futures", + "futures-channel", + "http-body-util", + "hyper 1.6.0", + "hyper-util", + "lz4_flex", + "replace_with", + "sealed", + "serde", + "static_assertions", + "thiserror 1.0.69", + "time", + "tokio", + "url", + "uuid", +] + +[[package]] +name = "clickhouse-derive" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d70f3e2893f7d3e017eeacdc9a708fbc29a10488e3ebca21f9df6a5d2b616dbb" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals", + "syn 2.0.100", +] + [[package]] name = "cmake" version = "0.1.54" @@ -1256,6 +1301,21 @@ dependencies = [ "libc", ] +[[package]] +name = "crc" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9710d3b3739c2e349eb44fe848ad0b7c8cb1e42bd87ee49371df2f7acaf3e675" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" + [[package]] name = "crc16" version = "0.4.0" @@ -1646,7 +1706,7 @@ checksum = "139ae9aca7527f85f26dd76483eb38533fd84bd571065da1739656ef71c5ff5b" dependencies = [ "darling 0.20.10", "either", - "heck", + "heck 0.5.0", "proc-macro2", "quote", "syn 2.0.100", @@ -2089,6 +2149,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "heck" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" + [[package]] name = "heck" version = "0.5.0" @@ -2665,6 +2731,25 @@ dependencies = [ "serde", ] +[[package]] +name = "kafka" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2054ba4edcb4dcda4209e138c7e88caf26d4a325b3db76fbdb6ca5eecc23e426" +dependencies = [ + "byteorder", + "crc", + "flate2", + "fnv", + "openssl", + "openssl-sys", + "ref_slice", + "snap", + "thiserror 1.0.69", + "tracing", + "twox-hash", +] + [[package]] name = "keyed_priority_queue" version = "0.4.2" @@ -2787,6 +2872,12 @@ dependencies = [ "linked-hash-map", ] +[[package]] +name = "lz4_flex" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08ab2867e3eeeca90e844d1940eab391c9dc5228783db2ed999acbc0a9ed375a" + [[package]] name = "masking" version = "0.1.0" @@ -3126,6 +3217,7 @@ dependencies = [ "bytes", "cargo_metadata", "chrono", + "clickhouse", "config", "console-subscriber", "cpu-time", @@ -3138,10 +3230,12 @@ dependencies = [ "futures", "gethostname", "hex", + "http-body-util", "hyper 1.6.0", "jemalloc-ctl", "jemallocator", "josekit", + "kafka", "lazy_static", "masking 0.1.0 (git+https://github.com/juspay/hyperswitch?tag=v1.111.1)", "mysqlclient-sys", @@ -3862,6 +3956,12 @@ dependencies = [ "bitflags 2.9.0", ] +[[package]] +name = "ref_slice" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4ed1d73fb92eba9b841ba2aef69533a060ccc0d3ec71c90aeda5996d4afb7a9" + [[package]] name = "regex" version = "1.11.1" @@ -3933,6 +4033,12 @@ dependencies = [ "bytecheck", ] +[[package]] +name = "replace_with" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51743d3e274e2b18df81c4dc6caf8a5b8e15dbe799e0dca05c7617380094e884" + [[package]] name = "reqwest" version = "0.11.27" @@ -4409,6 +4515,18 @@ version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" +[[package]] +name = "sealed" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4a8caec23b7800fb97971a1c6ae365b6239aaeddfb934d6265f8505e795699d" +dependencies = [ + "heck 0.4.1", + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "security-framework" version = "2.11.1" @@ -4474,6 +4592,17 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "serde_derive_internals" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "serde_json" version = "1.0.140" @@ -4613,6 +4742,12 @@ version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fcf8323ef1faaee30a44a340193b1ac6814fd9b7b4e88e9d4519a3e4abe1cfd" +[[package]] +name = "snap" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" + [[package]] name = "socket2" version = "0.5.8" @@ -4673,7 +4808,7 @@ version = "0.26.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be" dependencies = [ - "heck", + "heck 0.5.0", "proc-macro2", "quote", "rustversion", diff --git a/Cargo.toml b/Cargo.toml index 77bf6526..694664a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,6 +73,9 @@ hex = "0.4.3" time = { version = "0.3.36", features = ["serde"] } uuid = { version = "1.10.0", features = ["v4", "v7", "fast-rng"] } reqwest = { version = "0.12.7", features = ["json", "__rustls"] } +http-body-util = "0.1.3" +kafka = "0.10.0" +clickhouse = { version = "0.12.2", features = ["time", "uuid"] } nanoid = "0.4.0" mysqlclient-sys = { version = "0.4.2", features = ["buildtime_bindgen"] } diff --git a/analytics/README.md b/analytics/README.md new file mode 100644 index 00000000..966e12c9 --- /dev/null +++ b/analytics/README.md @@ -0,0 +1,303 @@ +# Decision Engine Analytics + +This directory contains the analytics infrastructure for the Decision Engine project, implementing real-time event tracking and analytics using ClickHouse and Kafka. + +## Architecture + +The analytics system follows the Hyperswitch analytics pattern with the following components: + +``` +┌─────────────────────┐ +│ Decision Engine │ +│ (Main Service) │ +└──────────┬──────────┘ + │ Events + ▼ +┌─────────────────────┐ +│ Kafka │ +│ (Event Stream) │ +└──────────┬──────────┘ + │ Real-time + ▼ +┌─────────────────────┐ +│ ClickHouse │ +│ ┌───────────────┐ │ +│ │ Kafka Engine │ │ +│ │ Tables │ │ +│ └───────┬───────┘ │ +│ │ │ +│ ▼ │ +│ ┌───────────────┐ │ +│ │ Materialized │ │ +│ │ Views │ │ +│ └───────┬───────┘ │ +│ │ │ +│ ▼ │ +│ ┌───────────────┐ │ +│ │ Storage │ │ +│ │ Tables │ │ +│ └───────────────┘ │ +└─────────────────────┘ +``` + +## Components + +### 1. Event Tracking +- **Endpoints Tracked**: `/routing/evaluate` and `/decide-gateway` +- **Event Data**: Request/response payloads, processing time, gateway selection, errors +- **Middleware**: Automatic event capture for tracked endpoints + +### 2. Data Storage +- **Kafka Topics**: `decision-engine-routing-events` +- **ClickHouse Tables**: + - `routing_events_queue` (Kafka engine for ingestion) + - `routing_events` (CollapsingMergeTree for storage) + - `routing_events_hourly` (Hourly aggregations) + - `routing_events_daily` (Daily aggregations) + +### 3. Real-time Processing +- **Kafka Producer**: Batched event publishing +- **Materialized Views**: Real-time data transformation +- **Aggregations**: Automatic hourly and daily rollups + +## Quick Start + +### 1. Start Analytics Infrastructure + +```bash +# Start with analytics profile +docker-compose --profile analytics up -d + +# This will start: +# - Zookeeper +# - Kafka +# - ClickHouse +# - Analytics migrator (runs schema setup) +``` + +### 2. Enable Analytics in Configuration + +Update your `config/development.toml`: + +```toml +[analytics] +enabled = true + +[analytics.kafka] +brokers = ["kafka:29092"] +topic_prefix = "decision-engine" +batch_size = 100 +batch_timeout_ms = 1000 + +[analytics.clickhouse] +host = "http://clickhouse:8123" +username = "analytics_user" +password = "analytics_pass" +database = "decision_engine_analytics" +``` + +### 3. Start Decision Engine + +```bash +# Start the main application +docker-compose up open-router-local +``` + +## Database Schema + +### Routing Events Table + +```sql +CREATE TABLE routing_events ( + event_id String, + merchant_id LowCardinality(String), + request_id String, + endpoint LowCardinality(String), + method LowCardinality(String), + request_payload String, + response_payload String, + status_code UInt16, + processing_time_ms UInt32, + gateway_selected LowCardinality(Nullable(String)), + routing_algorithm_id Nullable(String), + error_message Nullable(String), + user_agent Nullable(String), + ip_address Nullable(String), + created_at DateTime DEFAULT now(), + inserted_at DateTime DEFAULT now(), + sign_flag Int8 +) ENGINE = CollapsingMergeTree(sign_flag) +PARTITION BY toStartOfDay(created_at) +ORDER BY (created_at, merchant_id, request_id, event_id); +``` + +### Aggregated Views + +- **Hourly Aggregations**: Request counts, success/failure rates, performance metrics +- **Daily Aggregations**: Daily summaries with unique request tracking + +## Querying Analytics Data + +### Basic Queries + +```sql +-- Total requests by endpoint +SELECT + endpoint, + sum(sign_flag) as total_requests +FROM routing_events +WHERE created_at >= now() - INTERVAL 1 DAY +GROUP BY endpoint; + +-- Success rate by gateway +SELECT + gateway_selected, + sum(if(status_code < 400, sign_flag, 0)) as successful_requests, + sum(sign_flag) as total_requests, + (successful_requests / total_requests) * 100 as success_rate +FROM routing_events +WHERE created_at >= now() - INTERVAL 1 DAY + AND gateway_selected IS NOT NULL +GROUP BY gateway_selected; + +-- Performance metrics +SELECT + endpoint, + avg(processing_time_ms) as avg_processing_time, + quantile(0.95)(processing_time_ms) as p95_processing_time, + quantile(0.99)(processing_time_ms) as p99_processing_time +FROM routing_events +WHERE created_at >= now() - INTERVAL 1 DAY +GROUP BY endpoint; +``` + +### Using Aggregated Tables + +```sql +-- Hourly trends +SELECT + hour, + endpoint, + total_requests, + successful_requests, + (successful_requests / total_requests) * 100 as success_rate, + avg_processing_time_ms +FROM routing_events_hourly +WHERE hour >= now() - INTERVAL 24 HOUR +ORDER BY hour DESC; +``` + +## Configuration Options + +### Analytics Configuration + +```toml +[analytics] +enabled = true # Enable/disable analytics + +[analytics.kafka] +brokers = ["kafka:29092"] # Kafka broker addresses +topic_prefix = "decision-engine" # Topic prefix for events +batch_size = 100 # Events per batch +batch_timeout_ms = 1000 # Batch timeout in milliseconds + +[analytics.clickhouse] +host = "http://clickhouse:8123" # ClickHouse HTTP endpoint +username = "analytics_user" # ClickHouse username +password = "analytics_pass" # ClickHouse password +database = "decision_engine_analytics" # Database name +``` + +## Monitoring and Maintenance + +### Health Checks + +The analytics client provides health check functionality: + +```rust +// Check analytics connectivity +analytics_client.health_check().await?; +``` + +### Data Retention + +- **Raw Events**: 18 months (configurable via TTL) +- **Hourly Aggregations**: 12 months +- **Daily Aggregations**: 24 months + +### Performance Considerations + +1. **Batch Processing**: Events are batched for efficient Kafka publishing +2. **Async Processing**: Analytics don't block request processing +3. **Fallback Handling**: Graceful degradation when analytics are unavailable +4. **Partitioning**: Data partitioned by day for efficient queries + +## Troubleshooting + +### Common Issues + +1. **Kafka Connection Failed** + - Check Kafka broker connectivity + - Verify topic creation + - Check network configuration + +2. **ClickHouse Connection Failed** + - Verify ClickHouse is running + - Check credentials and database existence + - Verify network connectivity + +3. **Missing Events** + - Check analytics middleware is enabled + - Verify endpoint tracking configuration + - Check Kafka topic consumption + +### Debugging + +Enable debug logging for analytics: + +```toml +[log.console] +level = "DEBUG" +``` + +Check analytics client status: + +```bash +# Check if analytics is enabled +curl http://localhost:8080/health + +# Check Kafka topics +docker exec -it open-router-kafka kafka-topics --bootstrap-server localhost:9092 --list + +# Check ClickHouse tables +docker exec -it open-router-clickhouse clickhouse-client --query "SHOW TABLES FROM decision_engine_analytics" +``` + +## Development + +### Adding New Event Types + +1. Extend `RoutingEventData` struct in `src/analytics/types.rs` +2. Update ClickHouse schema in `analytics/migrations/` +3. Modify event extraction logic in middleware +4. Update aggregation queries if needed + +### Testing + +```bash +# Run analytics tests +cargo test analytics + +# Test with sample events +curl -X POST http://localhost:8080/routing/evaluate \ + -H "Content-Type: application/json" \ + -H "x-merchant-id: test-merchant" \ + -d '{"test": "data"}' +``` + +## Security Considerations + +1. **Data Privacy**: Ensure sensitive data is properly masked +2. **Access Control**: Restrict ClickHouse access to authorized users +3. **Network Security**: Use proper network isolation +4. **Data Retention**: Implement appropriate data retention policies diff --git a/analytics/migrations/001_routing_events.sql b/analytics/migrations/001_routing_events.sql new file mode 100644 index 00000000..27faa863 --- /dev/null +++ b/analytics/migrations/001_routing_events.sql @@ -0,0 +1,108 @@ +-- Merged SQL Migration File for Decision Engine Analytics +-- This file combines all migration steps into a single comprehensive migration +-- Created by merging: 001_routing_events.sql, 002_fix_datetime_parsing.sql, 002_fix_datetime_parsing_alternative.sql, 003_unix_timestamp_fix.sql + +-- ============================================================================= +-- STEP 1: Database Setup +-- ============================================================================= + +-- Create database if not exists +CREATE DATABASE IF NOT EXISTS decision_engine_analytics; + +-- Use the analytics database +USE decision_engine_analytics; + +-- ============================================================================= +-- STEP 2: Create Storage Tables (Final Schema) +-- ============================================================================= + +-- Create storage table with CollapsingMergeTree for routing events +CREATE TABLE IF NOT EXISTS routing_events ( + `event_id` String, + `merchant_id` LowCardinality(String), + `request_id` String, + `endpoint` LowCardinality(String), + `method` LowCardinality(String), + `request_payload` String, + `response_payload` String, + `status_code` UInt16, + `processing_time_ms` UInt32, + `gateway_selected` LowCardinality(Nullable(String)), + `routing_algorithm_id` Nullable(String), + `error_message` Nullable(String), + `user_agent` Nullable(String), + `ip_address` Nullable(String), + `created_at` DateTime DEFAULT now() CODEC(T64, LZ4), + `inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4), + `sign_flag` Int8, + INDEX endpointIndex endpoint TYPE bloom_filter GRANULARITY 1, + INDEX gatewayIndex gateway_selected TYPE bloom_filter GRANULARITY 1, + INDEX statusIndex status_code TYPE bloom_filter GRANULARITY 1, + INDEX merchantIndex merchant_id TYPE bloom_filter GRANULARITY 1 +) ENGINE = CollapsingMergeTree(sign_flag) +PARTITION BY toStartOfDay(created_at) +ORDER BY (created_at, merchant_id, request_id, event_id) +TTL created_at + toIntervalMonth(18) +SETTINGS index_granularity = 8192; + +-- ============================================================================= +-- STEP 3: Create Kafka Integration (Final Version with Unix Timestamp Support) +-- ============================================================================= + +-- Drop existing views and tables if they exist (for clean migration) +DROP VIEW IF EXISTS routing_events_mv; +DROP TABLE IF EXISTS routing_events_queue; + +-- Create Kafka engine table with Unix timestamp handling (final version) +CREATE TABLE IF NOT EXISTS routing_events_queue ( + `event_id` String, + `merchant_id` String, + `request_id` String, + `endpoint` LowCardinality(String), + `method` LowCardinality(String), + `request_payload` String, + `response_payload` String, + `status_code` UInt16, + `processing_time_ms` UInt32, + `gateway_selected` LowCardinality(Nullable(String)), + `routing_algorithm_id` Nullable(String), + `error_message` Nullable(String), + `user_agent` Nullable(String), + `ip_address` Nullable(String), + `created_at` Int64, -- Unix timestamp (final fix) + `sign_flag` Int8 +) ENGINE = Kafka +SETTINGS + kafka_broker_list = 'open-router-kafka:29092', + kafka_topic_list = 'decision-engine-routing-events', + kafka_group_name = 'decision-engine-analytics', + kafka_format = 'JSONEachRow', + kafka_handle_error_mode = 'stream'; + +-- Create materialized view with Unix timestamp conversion (final version) +CREATE MATERIALIZED VIEW IF NOT EXISTS routing_events_mv TO routing_events AS +SELECT + event_id, + merchant_id, + request_id, + endpoint, + method, + request_payload, + response_payload, + status_code, + processing_time_ms, + gateway_selected, + routing_algorithm_id, + error_message, + user_agent, + ip_address, + -- Convert Unix timestamp to DateTime (final fix) + CASE + WHEN created_at > 0 + THEN toDateTime(created_at) + ELSE now() + END AS created_at, + now() AS inserted_at, + sign_flag +FROM routing_events_queue +WHERE length(_error) = 0; diff --git a/analytics/run_migrations.sh b/analytics/run_migrations.sh new file mode 100755 index 00000000..fa3a4e02 --- /dev/null +++ b/analytics/run_migrations.sh @@ -0,0 +1,20 @@ +#!/bin/sh + +echo 'Waiting for ClickHouse to be ready...' +sleep 10 + +echo 'Running analytics migrations...' +for file in /analytics/migrations/*.sql; do + if [ -f "$file" ]; then + echo "Executing: $file" + clickhouse-client --host clickhouse --port 9000 --user analytics_user --password analytics_pass --multiquery --query "$(cat $file)" + if [ $? -eq 0 ]; then + echo "Successfully executed: $file" + else + echo "Failed to execute: $file" + exit 1 + fi + fi +done + +echo 'Analytics migrations completed!' diff --git a/config/development.toml b/config/development.toml index 56b26693..612c1126 100644 --- a/config/development.toml +++ b/config/development.toml @@ -49,6 +49,25 @@ max_feed_count = 200 tti = 7200 # i.e. 2 hours max_capacity = 5000 +[analytics] +enabled = false + +[analytics.kafka] +brokers = ["localhost:9092"] +topic_prefix = "decision-engine" +batch_size = 100 +batch_timeout_ms = 1000 +max_consecutive_failures = 5 + +[analytics.clickhouse] +host = "http://clickhouse:8123" +username = "analytics_user" +password = "analytics_pass" +database = "decision_engine_analytics" + +[secrets] +open_router_private_key = "" + [tenant_secrets] public = { schema = "public" } diff --git a/config/docker-configuration.toml b/config/docker-configuration.toml index 71a2d164..1eb682d7 100644 --- a/config/docker-configuration.toml +++ b/config/docker-configuration.toml @@ -49,6 +49,25 @@ max_feed_count = 200 tti = 7200 # i.e. 2 hours max_capacity = 5000 +[analytics] +enabled = false + +[analytics.kafka] +brokers = ["kafka:29092"] +topic_prefix = "decision-engine" +batch_size = 100 +batch_timeout_ms = 1000 +max_consecutive_failures = 5 + +[analytics.clickhouse] +host = "http://clickhouse:8123" +username = "analytics_user" +password = "analytics_pass" +database = "decision_engine_analytics" + +[secrets] +open_router_private_key = "" + [tenant_secrets] public = { schema = "public" } diff --git a/docker-compose.yaml b/docker-compose.yaml index 9639b552..057efed3 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -36,7 +36,7 @@ services: - "com.docker.compose.watchfile=Cargo.lock" image: decision-engine-open-router-local:latest platform: linux/amd64 - container_name: open-router + container_name: open-router-local restart: unless-stopped ports: - "8080:8080" @@ -69,7 +69,7 @@ services: - "com.docker.compose.watchfile=Cargo.lock" image: decision-engine-open-router-local-pg:latest platform: linux/amd64 - container_name: open-router + container_name: open-router-local-pg restart: unless-stopped ports: - "8080:8080" @@ -138,7 +138,7 @@ services: - "com.docker.compose.watchfile=groovy.Dockerfile" - "com.docker.compose.watchfile=src/Runner.groovy" platform: linux/amd64 - container_name: groovy-runner + container_name: groovy-runner-local restart: unless-stopped ports: - "8085:8085" @@ -172,7 +172,7 @@ services: postgresql: image: postgres:latest - container_name: open-router-postgres + container_name: postgres-db restart: unless-stopped environment: - POSTGRES_USER=db_user @@ -220,7 +220,7 @@ services: db-migrator-postgres: image: rust:latest - container_name: db-migrator + container_name: db-migrator-postgres depends_on: postgresql: condition: service_healthy @@ -247,6 +247,90 @@ services: networks: - open-router-network + # Analytics Infrastructure + zookeeper: + image: confluentinc/cp-zookeeper:7.4.0 + platform: linux/amd64 + container_name: open-router-zookeeper + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + networks: + - open-router-network + profiles: + - analytics + + kafka: + image: confluentinc/cp-kafka:7.4.0 + platform: linux/amd64 + container_name: open-router-kafka + depends_on: + - zookeeper + ports: + - "9092:9092" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" + networks: + - open-router-network + profiles: + - analytics + healthcheck: + test: ["CMD", "kafka-topics", "--bootstrap-server", "localhost:9092", "--list"] + interval: 10s + timeout: 10s + retries: 5 + + clickhouse: + image: clickhouse/clickhouse-server:latest + platform: linux/amd64 + container_name: open-router-clickhouse + ports: + - "8123:8123" + - "9000:9000" + environment: + CLICKHOUSE_DB: decision_engine_analytics + CLICKHOUSE_USER: analytics_user + CLICKHOUSE_PASSWORD: analytics_pass + volumes: + - clickhouse-data:/var/lib/clickhouse + - ./analytics/clickhouse/init:/docker-entrypoint-initdb.d + networks: + - open-router-network + depends_on: + - kafka + - zookeeper + profiles: + - analytics + healthcheck: + test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://localhost:8123/ping"] + interval: 10s + timeout: 5s + retries: 5 + + analytics-migrator: + image: clickhouse/clickhouse-server:latest + platform: linux/amd64 + container_name: analytics-migrator + depends_on: + clickhouse: + condition: service_healthy + kafka: + condition: service_healthy + volumes: + - ./analytics:/analytics + working_dir: /analytics + command: sh /analytics/run_migrations.sh + networks: + - open-router-network + profiles: + - analytics + networks: open-router-network: driver: bridge @@ -255,3 +339,4 @@ volumes: mysql-data: redis-data: postgres-data: + clickhouse-data: diff --git a/scripts/test_analytics.sh b/scripts/test_analytics.sh new file mode 100755 index 00000000..69b3facb --- /dev/null +++ b/scripts/test_analytics.sh @@ -0,0 +1,197 @@ +#!/bin/bash + +# Test script for Decision Engine Analytics +set -e + +echo "🚀 Testing Decision Engine Analytics Setup" +echo "==========================================" + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +NC='\033[0m' # No Color + +# Function to print colored output +print_status() { + echo -e "${GREEN}✓${NC} $1" +} + +print_warning() { + echo -e "${YELLOW}⚠${NC} $1" +} + +print_error() { + echo -e "${RED}✗${NC} $1" +} + +# Check if Docker is running +if ! docker info > /dev/null 2>&1; then + print_error "Docker is not running. Please start Docker first." + exit 1 +fi + +print_status "Docker is running" + +# Check if docker-compose is available +if ! command -v docker-compose &> /dev/null; then + print_error "docker-compose is not installed" + exit 1 +fi + +print_status "docker-compose is available" + +echo "" +echo "🔧 Starting Analytics Infrastructure..." +echo "======================================" + +# Clean up any existing containers to avoid conflicts +print_status "Cleaning up existing containers..." +docker-compose --profile analytics down --remove-orphans 2>/dev/null || true + +# Start analytics infrastructure +print_status "Starting Zookeeper, Kafka, ClickHouse and Analytics Migrator..." +docker-compose up -d kafka zookeeper clickhouse analytics-migrator + +# Wait for services to be ready +echo "" +echo "⏳ Waiting for services to be ready..." +sleep 10 + +# Check Kafka +echo "" +echo "🔍 Checking Kafka..." +if docker exec open-router-kafka kafka-topics --bootstrap-server localhost:9092 --list > /dev/null 2>&1; then + print_status "Kafka is running" +else + print_error "Kafka is not responding" + exit 1 +fi + +# Check ClickHouse +echo "" +echo "🔍 Checking ClickHouse..." +if docker exec open-router-clickhouse clickhouse-client --query "SELECT 1" > /dev/null 2>&1; then + print_status "ClickHouse is running" +else + print_error "ClickHouse is not responding" + exit 1 +fi + +# Check if analytics database exists +echo "" +echo "🔍 Checking Analytics Database..." +if docker exec open-router-clickhouse clickhouse-client --query "SHOW DATABASES" | grep -q "decision_engine_analytics"; then + print_status "Analytics database exists" +else + print_warning "Analytics database not found, running migrations..." + + # Run migrations manually if needed + docker exec open-router-clickhouse clickhouse-client --multiquery < analytics/migrations/001_routing_events.sql + + if docker exec open-router-clickhouse clickhouse-client --query "SHOW DATABASES" | grep -q "decision_engine_analytics"; then + print_status "Analytics database created successfully" + else + print_error "Failed to create analytics database" + exit 1 + fi +fi + +# Check analytics tables +echo "" +echo "🔍 Checking Analytics Tables..." +TABLES=$(docker exec open-router-clickhouse clickhouse-client --query "SHOW TABLES FROM decision_engine_analytics") + +if echo "$TABLES" | grep -q "routing_events"; then + print_status "routing_events table exists" +else + print_error "routing_events table not found" + exit 1 +fi + +if echo "$TABLES" | grep -q "routing_events_queue"; then + print_status "routing_events_queue table exists" +else + print_error "routing_events_queue table not found" + exit 1 +fi + +# Test Kafka topic creation +echo "" +echo "🔍 Checking Kafka Topics..." +if docker exec open-router-kafka kafka-topics --bootstrap-server localhost:9092 --list | grep -q "decision-engine-routing-events"; then + print_status "Routing events topic exists" +else + print_warning "Creating routing events topic..." + docker exec open-router-kafka kafka-topics --bootstrap-server localhost:9092 --create --topic decision-engine-routing-events --partitions 3 --replication-factor 1 + print_status "Routing events topic created" +fi + +# Test sending a sample event to Kafka +echo "" +echo "🧪 Testing Event Publishing..." +SAMPLE_EVENT='{"event_id":"test-event-1","merchant_id":"test-merchant","request_id":"test-req-1","endpoint":"/routing/evaluate","method":"POST","request_payload":"{}","response_payload":"{}","status_code":200,"processing_time_ms":100,"gateway_selected":"stripe","routing_algorithm_id":"algo-1","error_message":null,"user_agent":"test-agent","ip_address":"127.0.0.1","created_at":"2024-01-01T12:00:00Z","sign_flag":1}' + +echo "$SAMPLE_EVENT" | docker exec -i open-router-kafka kafka-console-producer --bootstrap-server localhost:9092 --topic decision-engine-routing-events + +print_status "Sample event sent to Kafka" + +# Wait a moment for processing +sleep 5 + +# Check if event was processed by ClickHouse +echo "" +echo "🔍 Checking Event Processing..." +EVENT_COUNT=$(docker exec open-router-clickhouse clickhouse-client --query "SELECT count(*) FROM decision_engine_analytics.routing_events WHERE event_id = 'test-event-1'") + +if [ "$EVENT_COUNT" -gt 0 ]; then + print_status "Event successfully processed by ClickHouse" +else + print_warning "Event not yet processed (this might be normal for new setups)" +fi + +# Show sample queries +echo "" +echo "📊 Sample Analytics Queries" +echo "==========================" + +echo "" +echo "Total events in the last hour:" +docker exec open-router-clickhouse clickhouse-client --query " +SELECT count(*) as total_events +FROM decision_engine_analytics.routing_events +WHERE created_at >= now() - INTERVAL 1 HOUR +" + +echo "" +echo "Events by endpoint:" +docker exec open-router-clickhouse clickhouse-client --query " +SELECT + endpoint, + count(*) as event_count +FROM decision_engine_analytics.routing_events +GROUP BY endpoint +ORDER BY event_count DESC +" + +echo "" +echo "🎉 Analytics Setup Test Complete!" +echo "=================================" +print_status "All analytics components are running correctly" + +echo "" +echo "📝 Next Steps:" +echo "1. Enable analytics in your config: Set analytics.enabled = true" +echo "2. Start the decision engine: docker-compose up open-router-local" +echo "3. Send test requests to /routing/evaluate or /decide-gateway" +echo "4. Query analytics data using the sample queries above" + +echo "" +echo "🔗 Useful Commands:" +echo "- View ClickHouse logs: docker logs open-router-clickhouse" +echo "- View Kafka logs: docker logs open-router-kafka" +echo "- Connect to ClickHouse: docker exec -it open-router-clickhouse clickhouse-client" +echo "- List Kafka topics: docker exec open-router-kafka kafka-topics --bootstrap-server localhost:9092 --list" + +echo "" +echo "📚 For more information, see analytics/README.md" diff --git a/scripts/test_kafka_connection.sh b/scripts/test_kafka_connection.sh new file mode 100755 index 00000000..23ca2ea7 --- /dev/null +++ b/scripts/test_kafka_connection.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +echo "Testing Kafka connection from application container..." + +# Test 1: Check if the application can reach Kafka +echo "1. Testing network connectivity to Kafka..." +docker exec open-router-kafka sh -c "nc -z kafka 29092 && echo 'Kafka is reachable' || echo 'Kafka is NOT reachable'" + +# Test 2: Send a test message to the topic using kafka tools from within the kafka container +echo "2. Sending test message to decision-engine-routing-events topic..." +docker exec open-router-kafka kafka-console-producer --bootstrap-server localhost:9092 --topic decision-engine-routing-events <>, + event_sender: Option>, +} + +impl AnalyticsClient { + pub fn new(config: AnalyticsConfig) -> AnalyticsResult { + if !config.enabled { + info!("Analytics is disabled"); + return Ok(Self { + config, + kafka_producer: None, + event_sender: None, + }); + } + + // Initialize Kafka producer + let kafka_producer = Arc::new(KafkaProducer::new(config.kafka.clone())?); + + // Create event channel for batching + let (event_sender, event_receiver) = mpsc::channel::(1000); + + // Start batch processor + let _batch_processor = kafka_producer.start_batch_processor(event_receiver); + + info!("Analytics client initialized successfully"); + + Ok(Self { + config, + kafka_producer: Some(kafka_producer), + event_sender: Some(event_sender), + }) + } + + pub async fn track_routing_event(&self, event: RoutingEvent) -> AnalyticsResult<()> { + if !self.config.enabled { + return Ok(()); + } + + let event_data = event.to_event_data(); + + if let Some(sender) = &self.event_sender { + if let Err(e) = sender.try_send(event_data) { + match e { + mpsc::error::TrySendError::Full(_) => { + warn!("Analytics event queue is full, dropping event"); + } + mpsc::error::TrySendError::Closed(_) => { + error!("Analytics event channel is closed"); + return Err(AnalyticsError::Configuration( + "Event channel is closed".to_string(), + )); + } + } + } + } + + Ok(()) + } + + pub async fn track_routing_event_sync(&self, event: RoutingEvent) -> AnalyticsResult<()> { + if !self.config.enabled { + return Ok(()); + } + + let event_data = event.to_event_data(); + + if let Some(producer) = &self.kafka_producer { + producer.send_event(&event_data).await?; + } + + Ok(()) + } + + pub async fn flush_events(&self) -> AnalyticsResult<()> { + if !self.config.enabled { + return Ok(()); + } + + // Close the sender to trigger flush in batch processor + if let Some(sender) = &self.event_sender { + sender.closed().await; + } + + Ok(()) + } + + pub fn is_enabled(&self) -> bool { + self.config.enabled + } + + pub async fn health_check(&self) -> AnalyticsResult<()> { + if !self.config.enabled { + return Ok(()); + } + + // Test Kafka connectivity by sending a test event + if let Some(producer) = &self.kafka_producer { + let test_event = RoutingEventData { + event_id: "health-check".to_string(), + merchant_id: "health-check".to_string(), + request_id: "health-check".to_string(), + endpoint: "/health".to_string(), + method: "GET".to_string(), + request_payload: "{}".to_string(), + response_payload: r#"{"status": "ok"}"#.to_string(), + status_code: 200, + processing_time_ms: 1, + gateway_selected: None, + routing_algorithm_id: None, + error_message: None, + user_agent: Some("health-check".to_string()), + ip_address: Some("127.0.0.1".to_string()), + created_at: time::OffsetDateTime::now_utc(), + sign_flag: 1, + }; + + producer.send_event(&test_event).await?; + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::analytics::{ClickhouseConfig, KafkaConfig}; + + #[tokio::test] + async fn test_analytics_client_disabled() { + let config = AnalyticsConfig { + enabled: false, + kafka: KafkaConfig::default(), + clickhouse: ClickhouseConfig::default(), + }; + + let client = AnalyticsClient::new(config).unwrap(); + assert!(!client.is_enabled()); + + let event = RoutingEvent::new( + "merchant-123".to_string(), + "req-456".to_string(), + "/routing/evaluate".to_string(), + "POST".to_string(), + ); + + // Should not error when disabled + let result = client.track_routing_event(event).await; + assert!(result.is_ok()); + } + + #[test] + fn test_analytics_config_default() { + let config = AnalyticsConfig::default(); + assert!(!config.enabled); + assert_eq!(config.kafka.brokers, vec!["localhost:9092"]); + assert_eq!(config.kafka.topic_prefix, "decision-engine"); + assert_eq!(config.clickhouse.host, "http://localhost:8123"); + assert_eq!(config.clickhouse.username, "analytics_user"); + } +} diff --git a/src/analytics/events.rs b/src/analytics/events.rs new file mode 100644 index 00000000..8b575441 --- /dev/null +++ b/src/analytics/events.rs @@ -0,0 +1,286 @@ +use crate::analytics::{AnalyticsResult, RoutingEventData}; +use axum::extract::Request; +use axum::response::Response; +use serde_json::Value; +use time::OffsetDateTime; +use uuid::Uuid; + +#[derive(Clone)] +pub struct RoutingEvent { + pub event_id: String, + pub merchant_id: String, + pub request_id: String, + pub endpoint: String, + pub method: String, + pub request_payload: String, + pub response_payload: String, + pub status_code: u16, + pub processing_time_ms: u32, + pub gateway_selected: Option, + pub routing_algorithm_id: Option, + pub error_message: Option, + pub user_agent: Option, + pub ip_address: Option, + pub created_at: OffsetDateTime, +} + +impl RoutingEvent { + pub fn new(merchant_id: String, request_id: String, endpoint: String, method: String) -> Self { + Self { + event_id: Uuid::new_v4().to_string(), + merchant_id, + request_id, + endpoint, + method, + request_payload: String::new(), + response_payload: String::new(), + status_code: 0, + processing_time_ms: 0, + gateway_selected: None, + routing_algorithm_id: None, + error_message: None, + user_agent: None, + ip_address: None, + created_at: OffsetDateTime::now_utc(), + } + } + + pub fn from_request(request: &Request, merchant_id: String) -> Self { + let request_id = extract_request_id(request); + let endpoint = request.uri().path().to_string(); + let method = request.method().to_string(); + let user_agent = extract_user_agent(request); + let ip_address = extract_ip_address(request); + + Self { + event_id: Uuid::new_v4().to_string(), + merchant_id, + request_id, + endpoint, + method, + request_payload: String::new(), + response_payload: String::new(), + status_code: 0, + processing_time_ms: 0, + gateway_selected: None, + routing_algorithm_id: None, + error_message: None, + user_agent, + ip_address, + created_at: OffsetDateTime::now_utc(), + } + } + + pub fn with_request_payload(mut self, payload: &str) -> Self { + self.request_payload = payload.to_string(); + self + } + + pub fn with_response_payload(mut self, payload: &str) -> Self { + self.response_payload = payload.to_string(); + self + } + + pub fn with_status_code(mut self, status_code: u16) -> Self { + self.status_code = status_code; + self + } + + pub fn with_processing_time(mut self, processing_time_ms: u32) -> Self { + self.processing_time_ms = processing_time_ms; + self + } + + pub fn with_gateway_selected(mut self, gateway: Option) -> Self { + self.gateway_selected = gateway; + self + } + + pub fn with_routing_algorithm_id(mut self, algorithm_id: Option) -> Self { + self.routing_algorithm_id = algorithm_id; + self + } + + pub fn with_error_message(mut self, error: Option) -> Self { + self.error_message = error; + self + } + + pub fn to_event_data(&self) -> RoutingEventData { + RoutingEventData { + event_id: self.event_id.clone(), + merchant_id: self.merchant_id.clone(), + request_id: self.request_id.clone(), + endpoint: self.endpoint.clone(), + method: self.method.clone(), + request_payload: self.request_payload.clone(), + response_payload: self.response_payload.clone(), + status_code: self.status_code, + processing_time_ms: self.processing_time_ms, + gateway_selected: self.gateway_selected.clone(), + routing_algorithm_id: self.routing_algorithm_id.clone(), + error_message: self.error_message.clone(), + user_agent: self.user_agent.clone(), + ip_address: self.ip_address.clone(), + created_at: self.created_at, + sign_flag: 1, // Always 1 for new events + } + } + + /// Extract gateway information from response payload + pub fn extract_gateway_from_response(&mut self) -> AnalyticsResult<()> { + if !self.response_payload.is_empty() { + if let Ok(response_json) = serde_json::from_str::(&self.response_payload) { + // Try to extract gateway from various possible response structures + if let Some(gateway) = response_json + .get("gateway") + .or_else(|| response_json.get("selected_gateway")) + .or_else(|| response_json.get("connector")) + .and_then(|v| v.as_str()) + { + self.gateway_selected = Some(gateway.to_string()); + } + + // Try to extract routing algorithm ID + if let Some(algo_id) = response_json + .get("routing_algorithm_id") + .or_else(|| response_json.get("algorithm_id")) + .and_then(|v| v.as_str()) + { + self.routing_algorithm_id = Some(algo_id.to_string()); + } + } + } + Ok(()) + } + + /// Extract error information from response payload + pub fn extract_error_from_response(&mut self) -> AnalyticsResult<()> { + if self.status_code >= 400 && !self.response_payload.is_empty() { + if let Ok(response_json) = serde_json::from_str::(&self.response_payload) { + if let Some(error) = response_json + .get("error") + .or_else(|| response_json.get("message")) + .or_else(|| response_json.get("error_message")) + .and_then(|v| v.as_str()) + { + self.error_message = Some(error.to_string()); + } + } + } + Ok(()) + } +} + +fn extract_request_id(request: &Request) -> String { + request + .headers() + .get("x-request-id") + .or_else(|| request.headers().get("request-id")) + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()) + .unwrap_or_else(|| Uuid::new_v4().to_string()) +} + +fn extract_user_agent(request: &Request) -> Option { + request + .headers() + .get("user-agent") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()) +} + +fn extract_ip_address(request: &Request) -> Option { + // Try various headers for IP address + request + .headers() + .get("x-forwarded-for") + .or_else(|| request.headers().get("x-real-ip")) + .or_else(|| request.headers().get("cf-connecting-ip")) + .and_then(|v| v.to_str().ok()) + .map(|s| { + // Take the first IP if there are multiple (comma-separated) + s.split(',') + .next() + .map(|ip| ip.trim().to_string()) + .unwrap_or_else(|| String::new()) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use axum::http::{HeaderMap, HeaderValue, Method, Uri}; + + #[test] + fn test_routing_event_creation() { + let event = RoutingEvent::new( + "merchant-123".to_string(), + "req-456".to_string(), + "/routing/evaluate".to_string(), + "POST".to_string(), + ); + + assert_eq!(event.merchant_id, "merchant-123"); + assert_eq!(event.request_id, "req-456"); + assert_eq!(event.endpoint, "/routing/evaluate"); + assert_eq!(event.method, "POST"); + assert!(!event.event_id.is_empty()); + } + + #[test] + fn test_event_builder_pattern() { + let event = RoutingEvent::new( + "merchant-123".to_string(), + "req-456".to_string(), + "/routing/evaluate".to_string(), + "POST".to_string(), + ) + .with_request_payload(r#"{"test": "data"}"#) + .with_response_payload(r#"{"gateway": "stripe"}"#) + .with_status_code(200) + .with_processing_time(150) + .with_gateway_selected(Some("stripe".to_string())); + + assert_eq!(event.request_payload, r#"{"test": "data"}"#); + assert_eq!(event.response_payload, r#"{"gateway": "stripe"}"#); + assert_eq!(event.status_code, 200); + assert_eq!(event.processing_time_ms, 150); + assert_eq!(event.gateway_selected, Some("stripe".to_string())); + } + + #[test] + fn test_extract_gateway_from_response() { + let mut event = RoutingEvent::new( + "merchant-123".to_string(), + "req-456".to_string(), + "/routing/evaluate".to_string(), + "POST".to_string(), + ) + .with_response_payload(r#"{"gateway": "stripe", "routing_algorithm_id": "algo-123"}"#); + + event.extract_gateway_from_response().unwrap(); + + assert_eq!(event.gateway_selected, Some("stripe".to_string())); + assert_eq!(event.routing_algorithm_id, Some("algo-123".to_string())); + } + + #[test] + fn test_extract_error_from_response() { + let mut event = RoutingEvent::new( + "merchant-123".to_string(), + "req-456".to_string(), + "/routing/evaluate".to_string(), + "POST".to_string(), + ) + .with_response_payload(r#"{"error": "Gateway not available"}"#) + .with_status_code(500); + + event.extract_error_from_response().unwrap(); + + assert_eq!( + event.error_message, + Some("Gateway not available".to_string()) + ); + } +} diff --git a/src/analytics/kafka_producer.rs b/src/analytics/kafka_producer.rs new file mode 100644 index 00000000..78b8da94 --- /dev/null +++ b/src/analytics/kafka_producer.rs @@ -0,0 +1,248 @@ +use crate::analytics::{AnalyticsError, AnalyticsResult, KafkaConfig, RoutingEventData}; +use kafka::producer::{Producer, Record, RequiredAcks}; +use std::time::Duration; +use tokio::sync::mpsc; +use tracing::{debug, error, info, warn}; + +#[derive(Clone)] +pub struct KafkaProducer { + config: KafkaConfig, + topic: String, +} + +impl KafkaProducer { + pub fn new(config: KafkaConfig) -> AnalyticsResult { + let topic = format!("{}-routing-events", config.topic_prefix); + + // Validate broker configuration + if config.brokers.is_empty() { + return Err(AnalyticsError::Configuration( + "No Kafka brokers configured".to_string(), + )); + } + + debug!( + "Initializing Kafka producer with brokers: {:?}", + config.brokers + ); + + Ok(Self { config, topic }) + } + + /// Test Kafka connectivity + pub async fn test_connection(&self) -> AnalyticsResult<()> { + debug!( + "Testing Kafka connection to brokers: {:?}", + self.config.brokers + ); + + let producer = Producer::from_hosts(self.config.brokers.clone()) + .with_ack_timeout(Duration::from_secs(5)) + .with_required_acks(RequiredAcks::One) + .create() + .map_err(|e| { + error!( + "Failed to create Kafka producer for connection test: {:?}", + e + ); + AnalyticsError::Kafka(e) + })?; + + info!("Kafka connection test successful"); + Ok(()) + } + + pub async fn send_event(&self, event: &RoutingEventData) -> AnalyticsResult<()> { + let json_data = serde_json::to_string(event)?; + + // Create producer with configuration + let mut producer = Producer::from_hosts(self.config.brokers.clone()) + .with_ack_timeout(Duration::from_secs(1)) + .with_required_acks(RequiredAcks::One) + .create() + .map_err(AnalyticsError::Kafka)?; + + // Send the record + let record = + Record::from_key_value(&self.topic, event.event_id.as_bytes(), json_data.as_bytes()); + + producer.send(&record).map_err(AnalyticsError::Kafka)?; + + Ok(()) + } + + pub async fn send_events_batch(&self, events: &[RoutingEventData]) -> AnalyticsResult<()> { + if events.is_empty() { + return Ok(()); + } + + let mut producer = Producer::from_hosts(self.config.brokers.clone()) + .with_ack_timeout(Duration::from_secs(5)) + .with_required_acks(RequiredAcks::One) + .create() + .map_err(|e| { + error!("Failed to create Kafka producer for batch send: {:?}", e); + warn!("Kafka brokers configured: {:?}", self.config.brokers); + AnalyticsError::Kafka(e) + })?; + + for (index, event) in events.iter().enumerate() { + let json_data = serde_json::to_string(event)?; + let record = Record::from_key_value( + &self.topic, + event.event_id.as_bytes(), + json_data.as_bytes(), + ); + + if let Err(e) = producer.send(&record) { + error!( + "Failed to send event {} of {} to Kafka: {:?}", + index + 1, + events.len(), + e + ); + return Err(AnalyticsError::Kafka(e)); + } + } + + info!( + "Successfully sent {} events to Kafka topic: {}", + events.len(), + self.topic + ); + Ok(()) + } + + /// Send events batch with graceful error handling + pub async fn send_events_batch_graceful(&self, events: &[RoutingEventData]) -> bool { + match self.send_events_batch(events).await { + Ok(()) => true, + Err(e) => { + warn!( + "Failed to send events batch to Kafka, continuing without analytics: {:?}", + e + ); + false + } + } + } + + pub fn start_batch_processor( + &self, + mut receiver: mpsc::Receiver, + ) -> tokio::task::JoinHandle<()> { + let producer = self.clone(); + let batch_size = self.config.batch_size; + let batch_timeout = Duration::from_millis(self.config.batch_timeout_ms); + let max_consecutive_failures = self.config.max_consecutive_failures; + + tokio::spawn(async move { + let mut batch = Vec::with_capacity(batch_size); + let mut last_flush = tokio::time::Instant::now(); + let mut consecutive_failures = 0; + info!("Starting Kafka batch processor with batch_size: {}, timeout: {}ms, max_consecutive_failures: {}", + batch_size, batch_timeout.as_millis(), max_consecutive_failures); + + loop { + tokio::select! { + // Receive new events + event = receiver.recv() => { + match event { + Some(event) => { + batch.push(event); + + // Flush if batch is full + if batch.len() >= batch_size { + let success = producer.send_events_batch_graceful(&batch).await; + if success { + consecutive_failures = 0; + } else { + consecutive_failures += 1; + if consecutive_failures >= max_consecutive_failures { + warn!("Too many consecutive Kafka failures ({}), continuing to collect events but not sending", + consecutive_failures); + } + } + batch.clear(); + last_flush = tokio::time::Instant::now(); + } + } + None => { + // Channel closed, flush remaining events and exit + if !batch.is_empty() { + info!("Channel closed, sending final batch of {} events", batch.len()); + producer.send_events_batch_graceful(&batch).await; + } + info!("Kafka batch processor shutting down"); + break; + } + } + } + + // Timeout-based flush + _ = tokio::time::sleep_until(last_flush + batch_timeout) => { + if !batch.is_empty() { + let success = producer.send_events_batch_graceful(&batch).await; + if success { + consecutive_failures = 0; + } else { + consecutive_failures += 1; + if consecutive_failures >= max_consecutive_failures { + warn!("Too many consecutive Kafka failures ({}), continuing to collect events but not sending", + consecutive_failures); + } + } + batch.clear(); + last_flush = tokio::time::Instant::now(); + } + } + } + } + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use time::OffsetDateTime; + + #[tokio::test] + async fn test_kafka_producer_creation() { + let config = KafkaConfig { + brokers: vec!["localhost:9092".to_string()], + topic_prefix: "test".to_string(), + batch_size: 10, + batch_timeout_ms: 1000, + max_consecutive_failures: 5, + }; + + let producer = KafkaProducer::new(config); + assert!(producer.is_ok()); + } + + #[test] + fn test_event_serialization() { + let event = RoutingEventData { + event_id: "test-event-1".to_string(), + merchant_id: "merchant-123".to_string(), + request_id: "req-456".to_string(), + endpoint: "/routing/evaluate".to_string(), + method: "POST".to_string(), + request_payload: r#"{"test": "data"}"#.to_string(), + response_payload: r#"{"result": "success"}"#.to_string(), + status_code: 200, + processing_time_ms: 150, + gateway_selected: Some("stripe".to_string()), + routing_algorithm_id: Some("algo-789".to_string()), + error_message: None, + user_agent: Some("test-agent".to_string()), + ip_address: Some("127.0.0.1".to_string()), + created_at: OffsetDateTime::now_utc(), + sign_flag: 1, + }; + + let json = serde_json::to_string(&event); + assert!(json.is_ok()); + } +} diff --git a/src/analytics/middleware.rs b/src/analytics/middleware.rs new file mode 100644 index 00000000..a4372231 --- /dev/null +++ b/src/analytics/middleware.rs @@ -0,0 +1,165 @@ +use crate::analytics::RoutingEvent; +use crate::tenant::GlobalAppState; +use axum::{ + body::Body, + extract::{Request, State}, + middleware::Next, + response::Response, +}; +use bytes::Bytes; +use http_body_util::BodyExt; +use std::{sync::Arc, time::Instant}; +use tracing::{error, warn}; + +/// Analytics middleware to track routing events for specific endpoints +pub async fn analytics_middleware( + State(global_app_state): State>, + request: Request, + next: Next, +) -> Response { + // Only track analytics for routing endpoints + let path = request.uri().path(); + if !global_app_state.global_config.analytics.enabled || !should_track_endpoint(path) { + return next.run(request).await; + } + + let start_time = Instant::now(); + + // Extract request information + let method = request.method().to_string(); + let endpoint = path.to_string(); + + // Extract merchant ID from request headers or body (simplified for now) + let merchant_id = extract_merchant_id(&request).unwrap_or("public".to_string()); + + // Get the tenant app state to access analytics client + let tenant_app_state = match global_app_state.get_app_state_of_tenant(&merchant_id).await { + Ok(state) => state, + Err(_) => { + // If tenant not found, try with default "public" tenant + match global_app_state.get_app_state_of_tenant("public").await { + Ok(state) => state, + Err(_) => { + // If analytics client is not available, just proceed without analytics + return next.run(request).await; + } + } + } + }; + + // Create routing event + let mut routing_event = RoutingEvent::from_request(&request, merchant_id.clone()); + + // Extract request body for logging + let (request_parts, body) = request.into_parts(); + let body_bytes = match body.collect().await { + Ok(collected) => collected.to_bytes(), + Err(_) => Bytes::new(), + }; + + let request_payload = String::from_utf8_lossy(&body_bytes).to_string(); + routing_event = routing_event.with_request_payload(&request_payload); + + // Reconstruct request with body + let request = Request::from_parts(request_parts, Body::from(body_bytes)); + + // Process the request + let response = next.run(request).await; + + // Calculate processing time + let processing_time = start_time.elapsed().as_millis() as u32; + + // Extract response information + let status_code = response.status().as_u16(); + + // Extract response body for logging. Note: This operation can lead to high memory usage + let (response_parts, body) = response.into_parts(); + let body_bytes = match body.collect().await { + Ok(collected) => collected.to_bytes(), + Err(_) => Bytes::new(), + }; + + let response_payload = String::from_utf8_lossy(&body_bytes).to_string(); + + // Complete the routing event + routing_event = routing_event + .with_response_payload(&response_payload) + .with_status_code(status_code) + .with_processing_time(processing_time); + + // Extract gateway information from response + if let Err(e) = routing_event.extract_gateway_from_response() { + warn!("Failed to extract gateway from response: {:?}", e); + } + + // Extract error information if status indicates failure + if let Err(e) = routing_event.extract_error_from_response() { + warn!("Failed to extract error from response: {:?}", e); + } + + // Send event to analytics (async, non-blocking) + if let Err(e) = tenant_app_state + .analytics_client + .track_routing_event(routing_event) + .await + { + error!("Failed to track routing event: {:?}", e); + } + + // Reconstruct response + Response::from_parts(response_parts, Body::from(body_bytes)) +} + +/// Determine if an endpoint should be tracked for analytics +fn should_track_endpoint(path: &str) -> bool { + matches!(path, "/routing/evaluate" | "/decide-gateway") +} + +/// Extract merchant ID from request (simplified implementation) +fn extract_merchant_id(request: &Request) -> Option { + // Try to extract from headers first + if let Some(merchant_id) = request.headers().get("x-merchant-id") { + if let Ok(merchant_id_str) = merchant_id.to_str() { + return Some(merchant_id_str.to_string()); + } + } + + // Try x-tenant-id header as fallback + if let Some(tenant_id) = request.headers().get("x-tenant-id") { + if let Ok(tenant_id_str) = tenant_id.to_str() { + return Some(tenant_id_str.to_string()); + } + } + + // Default to "public" tenant + Some("public".to_string()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_should_track_endpoint() { + assert!(should_track_endpoint("/routing/evaluate")); + assert!(should_track_endpoint("/decide-gateway")); + assert!(!should_track_endpoint("/health")); + assert!(!should_track_endpoint("/rule/create")); + } + + #[test] + fn test_extract_merchant_id_from_header() { + use axum::http::{HeaderMap, HeaderValue}; + + let mut headers = HeaderMap::new(); + headers.insert("x-merchant-id", HeaderValue::from_static("merchant-123")); + + let request = Request::builder() + .uri("/routing/evaluate") + .body(Body::empty()) + .unwrap(); + + // Note: This test would need to be adjusted to work with the actual request structure + // For now, it's a placeholder to show the testing approach + } +} diff --git a/src/analytics/mod.rs b/src/analytics/mod.rs new file mode 100644 index 00000000..d26bd4b4 --- /dev/null +++ b/src/analytics/mod.rs @@ -0,0 +1,11 @@ +pub mod client; +pub mod events; +pub mod kafka_producer; +pub mod middleware; +pub mod types; + +pub use client::AnalyticsClient; +pub use events::RoutingEvent; +pub use kafka_producer::KafkaProducer; +pub use middleware::analytics_middleware; +pub use types::*; diff --git a/src/analytics/types.rs b/src/analytics/types.rs new file mode 100644 index 00000000..df4bda6a --- /dev/null +++ b/src/analytics/types.rs @@ -0,0 +1,114 @@ +use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct AnalyticsConfig { + pub enabled: bool, + pub kafka: KafkaConfig, + pub clickhouse: ClickhouseConfig, +} + +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct KafkaConfig { + pub brokers: Vec, + pub topic_prefix: String, + pub batch_size: usize, + pub batch_timeout_ms: u64, + pub max_consecutive_failures: u32, +} + +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct ClickhouseConfig { + pub host: String, + pub username: String, + pub password: Option, + pub database: String, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct RoutingEventData { + pub event_id: String, + pub merchant_id: String, + pub request_id: String, + pub endpoint: String, + pub method: String, + pub request_payload: String, + pub response_payload: String, + pub status_code: u16, + pub processing_time_ms: u32, + pub gateway_selected: Option, + pub routing_algorithm_id: Option, + pub error_message: Option, + pub user_agent: Option, + pub ip_address: Option, + #[serde(with = "clickhouse_datetime")] + pub created_at: OffsetDateTime, + pub sign_flag: i8, +} + +impl Default for AnalyticsConfig { + fn default() -> Self { + Self { + enabled: false, + kafka: KafkaConfig { + brokers: vec!["localhost:9092".to_string()], + topic_prefix: "decision-engine".to_string(), + batch_size: 100, + batch_timeout_ms: 1000, + max_consecutive_failures: 5, + }, + clickhouse: ClickhouseConfig { + host: "http://localhost:8123".to_string(), + username: "analytics_user".to_string(), + password: Some("analytics_pass".to_string()), + database: "decision_engine_analytics".to_string(), + }, + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum AnalyticsError { + #[error("Kafka error: {0}")] + Kafka(#[from] kafka::Error), + #[error("ClickHouse error: {0}")] + ClickHouse(String), + #[error("Serialization error: {0}")] + Serialization(#[from] serde_json::Error), + #[error("Configuration error: {0}")] + Configuration(String), +} + +pub type AnalyticsResult = Result; + +// Custom datetime serialization for ClickHouse compatibility +mod clickhouse_datetime { + use serde::{self, Deserialize, Deserializer, Serializer}; + use time::format_description::well_known::Rfc3339; + use time::OffsetDateTime; + + pub fn serialize(date: &OffsetDateTime, serializer: S) -> Result + where + S: Serializer, + { + // Format as Unix timestamp for ClickHouse compatibility + let timestamp = date.unix_timestamp(); + serializer.serialize_i64(timestamp) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + + // Try parsing as Unix timestamp first + if let Ok(timestamp) = s.parse::() { + return OffsetDateTime::from_unix_timestamp(timestamp) + .map_err(serde::de::Error::custom); + } + + // Fallback to RFC3339 parsing + OffsetDateTime::parse(&s, &Rfc3339).map_err(serde::de::Error::custom) + } +} diff --git a/src/app.rs b/src/app.rs index da215adf..f6265d6c 100644 --- a/src/app.rs +++ b/src/app.rs @@ -10,6 +10,7 @@ use tokio::signal::unix::{signal, SignalKind}; use tower_http::trace as tower_trace; use crate::{ + analytics::{analytics_middleware, AnalyticsClient}, api_client::ApiClient, config::{self, GlobalConfig, TenantConfig}, error, logger, routes, storage, @@ -43,6 +44,7 @@ pub struct TenantAppState { pub redis_conn: Arc, pub config: config::TenantConfig, pub api_client: ApiClient, + pub analytics_client: Arc, } #[allow(clippy::expect_used)] @@ -69,11 +71,24 @@ impl TenantAppState { .await .expect("Failed to create Redis connection Pool"); + // Initialize analytics client + let analytics_client = AnalyticsClient::new(global_config.analytics.clone()) + .map_err(|e| { + logger::warn!("Failed to initialize analytics client: {:?}", e); + e + }) + .unwrap_or_else(|_| { + // Fallback to disabled analytics client + let disabled_config = crate::analytics::AnalyticsConfig::default(); + AnalyticsClient::new(disabled_config).unwrap() + }); + Ok(Self { db, redis_conn: Arc::new(RedisConnectionWrapper::new(redis_conn)), api_client, config: tenant_config, + analytics_client: Arc::new(analytics_client), }) } } @@ -183,21 +198,26 @@ where post(routes::update_gateway_score::update_gateway_score), ); - let router = router.layer( - tower_trace::TraceLayer::new_for_http() - .make_span_with(|request: &Request<_>| utils::record_fields_from_header(request)) - .on_request(tower_trace::DefaultOnRequest::new().level(tracing::Level::INFO)) - .on_response( - tower_trace::DefaultOnResponse::new() - .level(tracing::Level::INFO) - .latency_unit(tower_http::LatencyUnit::Micros), - ) - .on_failure( - tower_trace::DefaultOnFailure::new() - .latency_unit(tower_http::LatencyUnit::Micros) - .level(tracing::Level::ERROR), - ), - ); + let router = router + .layer(axum::middleware::from_fn_with_state( + global_app_state.clone(), + analytics_middleware, + )) + .layer( + tower_trace::TraceLayer::new_for_http() + .make_span_with(|request: &Request<_>| utils::record_fields_from_header(request)) + .on_request(tower_trace::DefaultOnRequest::new().level(tracing::Level::INFO)) + .on_response( + tower_trace::DefaultOnResponse::new() + .level(tracing::Level::INFO) + .latency_unit(tower_http::LatencyUnit::Micros), + ) + .on_failure( + tower_trace::DefaultOnFailure::new() + .latency_unit(tower_http::LatencyUnit::Micros) + .level(tracing::Level::ERROR), + ), + ); let router = router .nest("/health", routes::health::serve()) diff --git a/src/config.rs b/src/config.rs index 61f72496..fbdec7f3 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,5 +1,6 @@ use crate::decider::network_decider; use crate::{ + analytics::AnalyticsConfig, api_client::ApiClientConfig, crypto::secrets_manager::{ secrets_interface::SecretManager, secrets_management::SecretsManagementConfig, @@ -40,6 +41,8 @@ pub struct GlobalConfig { pub routing_config: Option, #[serde(default)] pub debit_routing_config: network_decider::types::DebitRoutingConfig, + #[serde(default)] + pub analytics: AnalyticsConfig, } #[derive(Clone, Debug)] diff --git a/src/decider/gatewaydecider/flows.rs b/src/decider/gatewaydecider/flows.rs index 26660fbe..edb341b2 100644 --- a/src/decider/gatewaydecider/flows.rs +++ b/src/decider/gatewaydecider/flows.rs @@ -657,7 +657,9 @@ pub async fn runDeciderFlow( is_scheduled_outage: decider_flow.writer.isScheduledOutage, is_dynamic_mga_enabled: decider_flow.writer.is_dynamic_mga_enabled, gateway_mga_id_map: Some(gatewayMgaIdMap), - is_rust_based_decider: deciderParams.dpShouldConsumeResult.unwrap_or(false), + is_rust_based_decider: deciderParams + .dpShouldConsumeResult + .unwrap_or(false), }), None => Err(( decider_flow.writer.debugFilterList.clone(), diff --git a/src/lib.rs b/src/lib.rs index 564b2735..47783c4c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +pub mod analytics; pub mod api_client; pub mod app; pub mod config;