-
Notifications
You must be signed in to change notification settings - Fork 2.1k
feat(llma): Add OpenTelemetry ingestion supportv for LLMA #41846
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Size Change: 0 B Total Size: 3.41 MB ℹ️ View Unchanged
|
| error=str(e), | ||
| ) | ||
| return Response( | ||
| {"error": str(e)}, |
Check warning
Code scanning / CodeQL
Information exposure through an exception Medium
Stack trace information
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 3 days ago
General fix:
Avoid returning exception string representations directly to the user in API responses. Instead, return a generic error message or only error details that are safe for user consumption. Optionally, log the detailed error on the server with the request context for debugging, as is already done.
Best way to fix this code:
Change the except ValidationError as e block in otel_traces_endpoint so that the API response contains a generic error (e.g., "Validation failed") and, optionally, a list of error messages if and only if these are stripped of sensitive detail and intended for user consumption. Given we cannot be sure, the safest is to remove str(e) and only say "Validation failed". The server already logs the full error.
Region to change:
File: products/llm_analytics/backend/api/otel/ingestion.py, in the block for except ValidationError as e.
Required changes:
- Replace
{"error": str(e)}in the API response with something like{"error": "Validation failed"}. - Ensure the logger still records the full exception for developers.
-
Copy modified line R255
| @@ -252,7 +252,7 @@ | ||
| error=str(e), | ||
| ) | ||
| return Response( | ||
| {"error": str(e)}, | ||
| {"error": "Validation failed"}, | ||
| status=status.HTTP_400_BAD_REQUEST, | ||
| ) | ||
|
|
| error=str(e), | ||
| ) | ||
| return Response( | ||
| {"error": str(e)}, |
Check warning
Code scanning / CodeQL
Information exposure through an exception Medium
Stack trace information
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 3 days ago
To fix the issue, we should not include the raw str(e) from the exception directly in the API response to external clients. Instead, return a generic error message such as "Validation failed" (already present in the similar block above at line 536). Optionally, if the exception's message is known to contain only user input/field issues and has been sanitized, we could expose field-level errors, but by default just log the exception and return a generic message to the user.
Thus, in the block at line 563, replace:
{"error": str(e)}with:
{"error": "Validation failed"}Alternatively, if e.detail is field errors and safe, you could return those, but unless the format is strictly controlled and cannot include stack traces, it's safest to exclude them.
No additional methods or imports are needed, as logger already collects the details server-side.
-
Copy modified line R563
| @@ -560,7 +560,7 @@ | ||
| error=str(e), | ||
| ) | ||
| return Response( | ||
| {"error": str(e)}, | ||
| {"error": "Validation failed"}, | ||
| status=status.HTTP_400_BAD_REQUEST, | ||
| ) | ||
|
|
Implements the core infrastructure for ingesting OpenTelemetry traces and converting them to PostHog AI events for LLM Analytics. ## TypeScript (Plugin-server) **Location**: `plugin-server/src/llm-analytics/otel/` - **validation.ts**: Industry-standard OTel limits and validation functions - Max 1000 spans per request (aligns with OTel SDK defaults) - Max 100KB per attribute value (generous for LLM content) - Max 128 attributes/events/links per span - **types.ts**: TypeScript types for OTel structures and AI events - OTelSpan, SpanStatus, SpanEvent, etc. - AIEventProperties mapping - Transformer version tracking (v1.0.0) - **conventions/posthog-native.ts**: PostHog-native attributes (posthog.ai.*) - Highest priority in waterfall pattern - Direct mapping to PostHog AI event properties - **conventions/genai.ts**: GenAI semantic conventions (gen_ai.*) - OpenTelemetry standard for LLM observability - Fallback when PostHog-native attributes not present - Supports prompt/completion in multiple formats - **transformer.ts**: Core span → AI event transformation logic - Waterfall pattern: PostHog > GenAI > OTel built-ins - Validates spans against limits - Calculates latency, determines event type - Handles baggage for session context - Maps to $ai_generation, $ai_span, $ai_trace events ## Python (API Endpoint) **Location**: `products/llm_analytics/backend/api/otel/` - **ingestion.py**: OTLP/HTTP endpoint stub - Endpoint: POST /api/projects/:project_id/ai/otel/v1/traces - Auth: PersonalAPIKeyAuthentication - Content-Type: application/x-protobuf - Returns 200 with placeholder response (parser TODO) **URL Routing**: `posthog/urls.py` - Registered at `/api/projects/<int:project_id>/ai/otel/v1/traces` - CSRF exempt (external OTel exporters don't have CSRF tokens) ## Next Steps 1. Implement protobuf parser (opentelemetry-proto package) 2. Connect transformer to capture pipeline 3. Add unit tests for transformer 4. Add integration tests for endpoint 5. Test end-to-end with OTel SDK 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Step 1: Parse OTLP/HTTP protobuf payloads - Add opentelemetry-proto==1.38.0 dependency - Create parser.py with full OTLP ExportTraceServiceRequest parsing - Parses spans, resource attributes, instrumentation scope - Handles all OTel value types (string, int, double, bool, array, kvlist, bytes) - Parses baggage from HTTP headers for session context - Integrate parser into ingestion.py endpoint - Add validation against industry-standard limits - Check span count, attribute count/size, event count, link count - Provide actionable error messages with limit guidance Next: Transform parsed spans to PostHog AI events (Step 2)
Step 2: Transform parsed OTel spans to PostHog AI events - Create conventions extractors: - posthog_native.py: Extract posthog.ai.* attributes (highest priority) - genai.py: Extract gen_ai.* attributes (fallback) - Create transformer.py with waterfall pattern: - Merge attributes with precedence: PostHog > GenAI > OTel built-ins - Build AI event properties with proper mapping - Determine event type ($ai_generation, $ai_embedding, $ai_span, $ai_trace) - Calculate timestamps and latency from nano timestamps - Extract distinct_id from resource/baggage with 'anonymous' fallback - Integrate transformer into ingestion.py endpoint: - Transform all spans to AI events - Log transformation results Events are ready for capture pipeline (Step 3). Transformation features: - Supports PostHog native and GenAI semantic conventions - Handles token usage, cost, model parameters - Preserves OTel metadata (span kind, status, scope) - Maps content (input/output, prompt/completion) - Adds unmapped attributes with 'otel.' prefix
Step 3: Send transformed events to capture-rs for ingestion - Implement capture_events() using capture_batch_internal: - Submit events concurrently for better performance - Use team's API token for authentication - Set event_source='otel_traces_ingestion' for observability - Disable person processing (AI events don't need it) - Wait for all futures and log any capture errors - Integrate capture into ingestion endpoint: - Call capture_events() after transformation - Add logging for capture success - Update response message to 'Traces ingested successfully' End-to-end flow complete: 1. Parse OTLP protobuf → 2. Transform to AI events → 3. Capture to PostHog Events are now fully ingested and will appear in PostHog as: - $ai_generation (chat/completion) - $ai_embedding (embeddings) - $ai_span (generic LLM operations) - $ai_trace (root spans) Next: Testing (unit tests, integration tests, e2e with real OTel SDK)
Adds local dev support for OpenTelemetry logs ingestion: - Expose capture-logs service on port 4318 in docker-compose - Add Django proxy view at /i/v1/logs that forwards to capture-logs:4318/v1/logs - Enables OpenAI instrumentation to send message content as OTel logs This allows OpenTelemetry SDKs to send logs to /i/v1/logs in local dev, which matches the production endpoint pattern used by PostHog logs product. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Adds logs ingestion to complement existing traces ingestion: - Add logs_parser.py for parsing OTLP LogRecord protobuf messages - Add logs_transformer.py to convert logs to PostHog AI events - Add otel_logs_endpoint in ingestion.py with validation - Add URL route at /api/projects/:id/ai/otel/v1/logs - Add OTEL_QUICKSTART.md documentation This enables full OTEL ingestion (traces + logs) for LLM Analytics, allowing users to send data from any OTEL-instrumented framework (Pydantic AI, LangChain, LlamaIndex, etc.) directly to PostHog. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
The explicit opentelemetry-proto==1.38.0 dependency conflicts with opentelemetry-exporter-otlp-proto-grpc==1.33.1 (already in master), which requires proto==1.33.1. Removing the explicit pin lets the proto package be resolved as a transitive dependency at 1.33.1, which is compatible with our OTLP parser implementation. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
…tructure Changes authentication from personal API keys to project tokens and improves data structure quality to match SDK-based traces. ## Authentication Changes - Replace PersonalAPIKeyAuthentication with ProjectTokenAuthentication - Use project tokens (phc_...) instead of personal API keys (phx_...) - Support token in Authorization header or ?token= query parameter - Follows same pattern as logs ingestion for consistency ## Data Structure Improvements - Keep structured data as native JSON (arrays/objects) instead of JSON strings - Add $ai_span_name extraction from span.name field - Add $ai_tools extraction from llm.request.functions.* attributes - Parse tool definitions into structured array format matching SDK ## GenAI Conventions - Add llm.request.functions.* attribute mapping for tool definitions - Improve attribute extraction to handle nested tool parameters ## Technical Details These changes ensure OTEL-captured traces have the same data quality as SDK-captured traces: - $ai_input: Structured array (not JSON string) - $ai_output_choices: Structured array (not JSON string) - $ai_tools: Structured array with proper schema (name, description, input_schema) - $ai_span_name: Captured from span name This makes OTEL ingestion a first-class alternative to SDK instrumentation with identical data structure and UI rendering. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Only the traces endpoint is needed for AI instrumentation. Removed: - otel_logs_endpoint function and helpers - logs_parser.py and logs_transformer.py - logs endpoint URL route - log-related OTEL_LIMITS
This reverts commit 3b600bd.
The v2 OpenAI instrumentation (opentelemetry-instrumentation-openai-v2) sends message content as separate log records via logger.emit(), while v1 sends everything as span attributes. To meet users where they are, we need to support both: - v1: Uses traces endpoint only (span attributes) - v2: Uses both traces (metadata) and logs (message content) endpoints Updated module docstring to clarify the difference and requirements.
The v2 instrumentation sends structured log events with bodies like:
- gen_ai.user.message: {"content": "..."}
- gen_ai.assistant.message: {"content": "..."} or {"tool_calls": [...]}
- gen_ai.choice: {"index": 0, "finish_reason": "stop", "message": {...}}
Updated logs_transformer to properly extract:
- Message content from body.content
- Tool calls from body.tool_calls
- Output content and metadata from choice events
- Role and finish_reason from structured bodies
This ensures v2 message content appears in PostHog traces.
- Fix property names to match documented schema: - $ai_error_message → $ai_error - $ai_cache_read_tokens → $ai_cache_read_input_tokens - $ai_cache_write_tokens → $ai_cache_creation_input_tokens - Remove undocumented properties ($ai_message_content, $ai_output_content, etc.) - Properly structure $ai_input and $ai_output_choices as arrays - Add Redis-based event merger to combine v2 trace and log events - Support multiple log event accumulation (user + assistant messages) - Add bidirectional merge logic (logs-first or traces-first) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
…ition Problem: v2 instrumentation sends multiple log events (user + assistant messages) in the same HTTP request. Processing them sequentially caused a race condition where: 1. Log 1 (user message) caches 2. Trace arrives (parallel thread), merges with Log 1, sends event 3. Log 2 (assistant message) arrives, finds no trace (already consumed), caches forever Result: Events missing $ai_output_choices from assistant messages Solution: Group all logs by (trace_id, span_id) and accumulate their properties atomically BEFORE calling the event merger. This ensures all log content is merged together in a single operation, preventing the trace from consuming partial logs. Testing: Verified with real OpenAI API calls that v2 events now correctly contain both $ai_input (from user message logs) and $ai_output_choices (from assistant logs). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
v1 OpenTelemetry instrumentation puts complete data (prompts, completions, and metadata) in span attributes (gen_ai.prompt, gen_ai.completion), unlike v2 which separates metadata (traces) and content (logs). Previously, v1 spans were cached in Redis waiting for logs that never arrive, causing events to expire without being sent. This change detects v1 spans by checking for gen_ai.prompt/completion attributes and skips the event merger, sending v1 events immediately. v2 spans continue to use the event merger for bidirectional merging with logs.
The previous v1 detection was checking for 'gen_ai.prompt' and 'gen_ai.completion' in merged_attrs, but v1 actually sends indexed attributes like 'gen_ai.prompt.0.content' which get extracted as 'prompt' and 'completion' (without the 'gen_ai.' prefix). Updated detection to check for 'prompt' or 'completion' in merged_attrs instead, which correctly identifies v1 spans and skips the event merger. Verified with test: v1 events now appear in database with complete $ai_input and $ai_output_choices arrays.
Added detailed documentation covering: - Architecture and data flow diagrams - v1 vs v2 instrumentation differences - Component descriptions and responsibilities - Event merger Redis cache mechanics - PostHog AI event schema specification - Testing and troubleshooting guides - Recent bug fixes and solutions This README serves as the primary reference for developers working on or debugging the OTEL ingestion pipeline.
Rewrote README to focus on current implementation and design decisions rather than development-specific context: - Removed references to specific bugs encountered during development - Removed troubleshooting for transient development issues - Removed references to temporary test scripts - Added Design Decisions section explaining architectural choices - Added Extending the System section with examples - Reorganized content to flow from overview to architecture to usage - Focused on what the system does and why, not historical problems The README now stands alone as documentation for the current codebase.
…mentation Completely rewrote test suite to match current Redis-backed event merger API: - Removed references to _event_cache (no longer exists) - Changed send_immediately parameter to is_trace boolean - Added mock_redis fixture using unittest.mock - Removed blocking/timeout tests (obsolete with non-blocking design) - Added test for multiple log accumulation (critical v2 feature) - Added test for Redis error fallback - Added test for TTL configuration - Fixed test expectations to match actual merge behavior All 7 tests now pass successfully.
The TypeScript OTEL transformer in plugin-server was never used for ingestion. All OTEL transformation happens in the Python backend via the Django API endpoints. Plugin-server only does post-processing (cost calculation, normalization) after events are already transformed and sent through Kafka. This removes dead code that was diverging from the actual Python implementation.
The detailed README in backend/api/otel/ is sufficient documentation.
- Remove incorrect claim about Redis transactions (WATCH/MULTI/EXEC) The event_merger uses simple get/setex/delete operations - Fix test path from test/ subdirectory to correct location - Update performance characteristics to reflect actual implementation
Show complete flow from OTLP ingestion through to ClickHouse: - OTEL transformation (Python) - PostHog capture API - Kafka (events_plugin_ingestion topic) - Plugin-server post-processing (cost calculation, normalization) - ClickHouse storage This clarifies that OTEL transformation happens in Python, while plugin-server only handles post-processing after events are already in the standard PostHog ingestion pipeline.
Fixes bug where tool messages were missing from OTEL v2 conversation traces. Changes: - Add handler for gen_ai.tool.message events in logs_transformer.py - Fix assistant messages to go to $ai_input (conversation history) instead of $ai_output_choices - Implement message array accumulation in ingestion.py for multiple log events per span - Add unit tests for message accumulation and tool message handling Before: OTEL v2 traces had incorrect message counts (2→4→5→7→8→10) with missing tool messages After: Correct message counts (2→4→6→8→10→12) with all tool messages properly captured Tool messages now include: - role: "tool" - content: tool execution result - tool_call_id: links response to tool call 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Add test_tool_messages_accumulate to verify: - gen_ai.tool.message events are properly parsed - Tool messages include role, content, and tool_call_id - Tool messages accumulate in $ai_input with conversation history - Message order is preserved (user → assistant with tool_calls → tool → final response) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
- Detect Mastra by instrumentation scope name (@mastra/otel) - Treat Mastra as v1 framework (all attributes in spans, no log merging) - Mark v1 framework root spans as $ai_span instead of $ai_trace to fix tree hierarchy - Add provider transformer pattern for framework-specific data transformations - Filter out raw input/output attributes to prevent duplicate otel.input/otel.output Fixes tree display issue where Mastra generations weren't appearing as children under the trace. Root spans from v1 frameworks must be $ai_span (not $ai_trace) since TraceQueryRunner filters out $ai_trace events from the events array. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
- Document provider transformers pattern for framework-specific data transformations - Add Mastra to architecture diagram and v1 instrumentation section - Explain v1 framework detection via instrumentation scope name - Document event type determination for v1 frameworks ($ai_span for root spans) - Add section on adding new provider transformers - Update design decisions with provider transformers and v1 event type logic 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
19 files reviewed, 5 comments
posthog/urls.py
Outdated
| @csrf_exempt | ||
| def proxy_logs_to_capture_service(request: HttpRequest) -> HttpResponse: | ||
| """ | ||
| Proxy OTLP logs to the capture-logs Rust service. | ||
| This allows OpenTelemetry SDKs to send logs to /i/v1/logs in local dev, | ||
| which then forwards to the capture-logs service. | ||
| Note: Using Docker container IP directly since Django runs on host. | ||
| """ | ||
| import requests | ||
|
|
||
| # Forward to capture-logs service (exposed on host port 4318) | ||
| capture_logs_url = "http://localhost:4318/v1/logs" | ||
|
|
||
| try: | ||
| # Forward the request with all headers and body | ||
| response = requests.post( | ||
| capture_logs_url, | ||
| data=request.body, | ||
| headers={ | ||
| "Content-Type": request.META.get("CONTENT_TYPE", ""), | ||
| "Authorization": request.META.get("HTTP_AUTHORIZATION", ""), | ||
| }, | ||
| timeout=10, | ||
| ) | ||
|
|
||
| # Return the response from capture-logs | ||
| return HttpResponse( | ||
| response.content, | ||
| status=response.status_code, | ||
| content_type=response.headers.get("Content-Type", "application/json"), | ||
| ) | ||
| except Exception: | ||
| logger.exception("Error proxying logs to capture service") | ||
| return HttpResponse( | ||
| b'{"error": "Failed to forward logs"}', | ||
| status=500, | ||
| content_type="application/json", | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: proxy_logs_to_capture_service function is defined in this file but appears to be development/testing infrastructure. This function forwards OTLP logs to a local capture-logs service but is exposed as a production endpoint at /i/v1/logs which could be accessed in production environments.
Issues:
- Hardcoded
localhost:4318will fail in production - No authentication check on this endpoint
- Uses synchronous
requests.post()which blocks Django workers - Missing
@csrf_exemptdecorator (though it may inherit from URL config)
Consider moving this to a dev-only utility or documenting that this endpoint should not be exposed in production.
Prompt To Fix With AI
This is a comment left during a code review.
Path: posthog/urls.py
Line: 149:188
Comment:
**logic:** `proxy_logs_to_capture_service` function is defined in this file but appears to be development/testing infrastructure. This function forwards OTLP logs to a local capture-logs service but is exposed as a production endpoint at `/i/v1/logs` which could be accessed in production environments.
Issues:
- Hardcoded `localhost:4318` will fail in production
- No authentication check on this endpoint
- Uses synchronous `requests.post()` which blocks Django workers
- Missing `@csrf_exempt` decorator (though it may inherit from URL config)
Consider moving this to a dev-only utility or documenting that this endpoint should not be exposed in production.
How can I resolve this? If you propose a fix, please make it concise.| from collections import defaultdict | ||
|
|
||
| logs_by_span = defaultdict(list) | ||
|
|
||
| for log_record in logs: | ||
| trace_id = log_record.get("trace_id", "") | ||
| span_id = log_record.get("span_id", "") | ||
| if trace_id and span_id: | ||
| logs_by_span[(trace_id, span_id)].append(log_record) | ||
| else: | ||
| # No trace/span ID - process individually | ||
| logs_by_span[(None, None)].append(log_record) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: Logs without trace_id or span_id are grouped under (None, None) key and processed individually. However, this creates a potential issue where multiple unrelated log events accumulate together in logs_by_span[(None, None)].
If you have multiple standalone log events in the same request (not associated with any trace), they'll all be accumulated into the same properties dict, which will cause property overwrites and incorrect merging.
Consider processing these immediately without grouping:
if trace_id and span_id:
logs_by_span[(trace_id, span_id)].append(log_record)
else:
# Process standalone logs immediately without grouping
event = transform_log_to_ai_event(log_record, resource, scope)
if event is not None:
events.append(event)Prompt To Fix With AI
This is a comment left during a code review.
Path: products/llm_analytics/backend/api/otel/ingestion.py
Line: 709:720
Comment:
**logic:** Logs without `trace_id` or `span_id` are grouped under `(None, None)` key and processed individually. However, this creates a potential issue where multiple unrelated log events accumulate together in `logs_by_span[(None, None)]`.
If you have multiple standalone log events in the same request (not associated with any trace), they'll all be accumulated into the same properties dict, which will cause property overwrites and incorrect merging.
Consider processing these immediately without grouping:
```python
if trace_id and span_id:
logs_by_span[(trace_id, span_id)].append(log_record)
else:
# Process standalone logs immediately without grouping
event = transform_log_to_ai_event(log_record, resource, scope)
if event is not None:
events.append(event)
```
How can I resolve this? If you propose a fix, please make it concise.| logs_json = redis_client.get(logs_cache_key) | ||
|
|
||
| if logs_json: | ||
| # Another log already cached - accumulate | ||
| existing_logs = json.loads(logs_json) | ||
| merged_logs = {**existing_logs, **properties} # Later log props override | ||
|
|
||
| # Re-cache accumulated logs | ||
| redis_client.setex(logs_cache_key, _CACHE_TTL, json.dumps(merged_logs)) | ||
|
|
||
| logger.info( | ||
| "event_merger_accumulate: Accumulated log properties", | ||
| extra={"trace_id": trace_id, "span_id": span_id}, | ||
| ) | ||
|
|
||
| # Check if trace is ready | ||
| trace_json = redis_client.get(trace_cache_key) | ||
| if trace_json: | ||
| # Trace is ready - merge and send | ||
| trace_properties = json.loads(trace_json) | ||
| final_merged = {**merged_logs, **trace_properties} # Trace props override | ||
|
|
||
| # Clean up both caches | ||
| redis_client.delete(logs_cache_key) | ||
| redis_client.delete(trace_cache_key) | ||
|
|
||
| logger.info( | ||
| "event_merger_success: Merged accumulated logs+trace", | ||
| extra={"trace_id": trace_id, "span_id": span_id}, | ||
| ) | ||
|
|
||
| return final_merged | ||
|
|
||
| # Trace not ready yet - wait for it | ||
| return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: Race condition in log accumulation logic: between checking if logs exist (line 81), merging them (line 86), re-caching (line 89), and then checking for trace (line 97), another concurrent request could have already merged the logs with a trace and deleted the cache.
This could result in:
- Line 81: Log exists, retrieve it
- [Another thread: Trace arrives, merges with logs, deletes both caches]
- Line 89: Re-cache merged logs (recreates deleted cache)
- Line 97: Trace is gone (returns None)
- Result: Merged logs cached but trace already consumed = orphaned cache entry
Consider using Redis transactions (WATCH/MULTI/EXEC) or Lua script for atomic read-modify-write operations.
Prompt To Fix With AI
This is a comment left during a code review.
Path: products/llm_analytics/backend/api/otel/event_merger.py
Line: 81:115
Comment:
**logic:** Race condition in log accumulation logic: between checking if logs exist (line 81), merging them (line 86), re-caching (line 89), and then checking for trace (line 97), another concurrent request could have already merged the logs with a trace and deleted the cache.
This could result in:
1. Line 81: Log exists, retrieve it
2. **[Another thread: Trace arrives, merges with logs, deletes both caches]**
3. Line 89: Re-cache merged logs (recreates deleted cache)
4. Line 97: Trace is gone (returns None)
5. Result: Merged logs cached but trace already consumed = orphaned cache entry
Consider using Redis transactions (WATCH/MULTI/EXEC) or Lua script for atomic read-modify-write operations.
How can I resolve this? If you propose a fix, please make it concise.| # Submit events to capture pipeline | ||
| futures = capture_batch_internal( | ||
| events=events, | ||
| event_source="otel_traces_ingestion", | ||
| token=team.api_token, | ||
| process_person_profile=False, # AI events don't need person processing | ||
| ) | ||
|
|
||
| # Wait for all futures to complete and check for errors | ||
| errors = [] | ||
| for i, future in enumerate(futures): | ||
| try: | ||
| response = future.result() | ||
| if response.status_code not in (200, 201): | ||
| errors.append(f"Event {i}: HTTP {response.status_code}") | ||
| except Exception as e: | ||
| errors.append(f"Event {i}: {str(e)}") | ||
|
|
||
| if errors: | ||
| logger.warning( | ||
| "otel_traces_capture_errors", | ||
| team_id=team.id, | ||
| error_count=len(errors), | ||
| errors=errors[:10], # Log first 10 errors | ||
| ) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: The capture_events function waits for all capture futures synchronously using .result(), blocking the ingestion endpoint until all events are written. This negates the async benefits of capture_batch_internal and increases p99 latency.
For high-throughput scenarios with large batches, this could cause request timeouts. Consider fire-and-forget pattern or background task processing for capture errors.
Prompt To Fix With AI
This is a comment left during a code review.
Path: products/llm_analytics/backend/api/otel/ingestion.py
Line: 429:454
Comment:
**style:** The `capture_events` function waits for all capture futures synchronously using `.result()`, blocking the ingestion endpoint until all events are written. This negates the async benefits of `capture_batch_internal` and increases p99 latency.
For high-throughput scenarios with large batches, this could cause request timeouts. Consider fire-and-forget pattern or background task processing for capture errors.
How can I resolve this? If you propose a fix, please make it concise.| try: | ||
| parsed = json.loads(content_input) | ||
| properties["$ai_input"] = ( | ||
| parsed if isinstance(parsed, list) else [{"role": "user", "content": content_input}] | ||
| ) | ||
| except (json.JSONDecodeError, TypeError): | ||
| properties["$ai_input"] = [{"role": "user", "content": content_input}] | ||
| else: | ||
| properties["$ai_input"] = [{"role": "user", "content": str(content_input)}] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: JSON parsing of content_input with fallback creates ambiguous behavior. If content_input is a JSON string "[{...}]", it's parsed to a list. But if parsing fails, the string is wrapped as [{"role": "user", "content": content_input}].
This means a valid JSON array string like "[\"hello\", \"world\"]" would be parsed to ["hello", "world"] (not message objects), which will fail validation downstream. The fallback should ensure message object structure.
Prompt To Fix With AI
This is a comment left during a code review.
Path: products/llm_analytics/backend/api/otel/transformer.py
Line: 215:223
Comment:
**logic:** JSON parsing of `content_input` with fallback creates ambiguous behavior. If `content_input` is a JSON string `"[{...}]"`, it's parsed to a list. But if parsing fails, the string is wrapped as `[{"role": "user", "content": content_input}]`.
This means a valid JSON array string like `"[\"hello\", \"world\"]"` would be parsed to `["hello", "world"]` (not message objects), which will fail validation downstream. The fallback should ensure message object structure.
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if merged is not None: | ||
| # Ready to send - create event | ||
| event_type = determine_event_type(span_logs[0], span_logs[0].get("attributes", {})) | ||
| timestamp = calculate_timestamp(span_logs[0]) | ||
| distinct_id = extract_distinct_id(resource, span_logs[0].get("attributes", {})) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Baggage-based distinct_id lost when logs arrive after trace
For v2 ingestion where the trace request arrives first, the span path caches properties and returns None; the eventual event is built in the logs path with extract_distinct_id(resource, span_logs[0].get("attributes", {})) only. Any user context supplied in the trace’s baggage header is never persisted or reused, so merged events become anonymous unless the log payload repeats the distinct ID. This breaks user association for the common trace-first order; the baggage-derived distinct_id needs to be propagated into the cached properties or reused when logs complete the merge.
Useful? React with 👍 / 👎.
1b97057 to
8cbef85
Compare
…ern detection - Add OtelInstrumentationPattern enum (V1_ATTRIBUTES, V2_TRACES_AND_LOGS) - Providers declare pattern via get_instrumentation_pattern() method - Add detect_provider() for centralized provider detection - Remove hardcoded @mastra/otel checks from transformer.py - Update README with Provider Reference section and pattern documentation - Add test_ingestion_parity.py for v1/v2 parity testing - Document Mastra behavior (no conversation history accumulation) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
8cbef85 to
1e0264d
Compare
- Remove unused proxy function and /i/v1/logs route - Port 4318 was already removed from docker-compose, making this dead code - LLM Analytics OTEL ingestion now uses /api/projects/:id/ai/otel/logs 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
- Remove unused stringify_content() functions from transformer.py and logs_transformer.py - Remove unused span_uses_known_conventions() function from transformer.py - Remove debug logging statement from logs_transformer.py - Change verbose logger.info() calls to logger.debug() for per-request operational logs - Remove verbose transformation logging from genai.py and mastra.py 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Remove debug logs to minimize PR scope. Keep only warning, error, and exception logs which are appropriate for production monitoring. Removed 130 lines of debug logging code: - event_merger.py: Remove all debug logs, keep exception logging - ingestion.py: Remove debug logs for received/parsed/transformed/captured - transformer.py: Remove structlog import and debug log - conventions/genai.py: Remove structlog import and provider detection debug log 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Embedding operations don't have associated log events (no prompt/completion), so they were getting cached in Redis waiting for logs that never arrive. Now detect embedding spans by operation_name and send them immediately, preventing $ai_embedding events from being silently lost. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
…last-seen
Previously, parse_otlp_request and parse_otlp_logs_request would overwrite
resource_attrs/scope_info inside loops, causing all spans/logs in a request
with multiple resource_spans/resource_logs to emit with only the final
resource/scope attached.
This fix changes the parsers to return per-item context:
- parse_otlp_request returns list of {span, resource, scope}
- parse_otlp_logs_request returns list of {log, resource, scope}
Updated transform_spans_to_ai_events and transform_logs_to_ai_events to
extract resource/scope per item, ensuring correct service.name, distinct_id,
and other resource-derived attributes.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <[email protected]>
…arser format - Added embedding operation detection to pattern detection list - Documented per-item return format for parser.py and logs_parser.py - Clarified that each item carries its own resource/scope context 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Problem
LLM Analytics currently only supports PostHog SDK ingestion. Customers using OpenTelemetry-based frameworks (OpenAI instrumentation, LangChain, Mastra, etc.) cannot send traces to PostHog without switching SDKs or writing custom integration code.
Changes
Foundation:
/api/projects/:id/ai/otel/traces/api/projects/:id/ai/otel/logsInstrumentation Pattern Detection:
OtelInstrumentationPatternenum (V1_ATTRIBUTES,V2_TRACES_AND_LOGS) for future extensibilityget_instrumentation_pattern()methoddetect_provider()function for provider detectionV1 Instrumentation Support (data in span attributes):
V2 Instrumentation Support (metadata in spans, content in logs):
Provider Transformers:
ProviderTransformerclass withcan_handle(),transform_prompt(),transform_completion(),get_instrumentation_pattern()MastraTransformer: Handles Mastra's custom JSON-wrapped message format{"messages": [{"role": "user", "content": [...]}]}{"files": [], "text": "...", "warnings": [], ...}agent.generate()is independentTool Message Handling:
gen_ai.tool.messagelog events in v2 ingestionDocumentation:
How did you test this code?
Automated:
test_ingestion_parity.py)Manual:
Testing with llm-analytics-apps:
To test this PR with various OTEL providers, use the feature branch from PostHog/llm-analytics-apps#24:
Node.js OTEL providers:
Python OTEL providers:
The llm-analytics-apps branch includes:
Changelog: (features only) Is this feature complete?
No - behind feature flag, not ready for public announcement yet.