Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
ce5f3ff
feat(llma): Add OpenTelemetry traces ingestion foundation
andrewm4894 Nov 12, 2025
285563d
feat(llma): Implement OTLP protobuf parser for OTel traces ingestion
andrewm4894 Nov 12, 2025
fd62595
feat(llma): Implement span-to-AI-event transformer for OTel traces
andrewm4894 Nov 12, 2025
c9b0e0f
feat(llma): Route OTel AI events to PostHog capture pipeline
andrewm4894 Nov 12, 2025
0806fef
feat(llma): add Django proxy for OTel logs ingestion at /i/v1/logs
andrewm4894 Nov 12, 2025
8cca85f
feat(llm-analytics): implement OTEL logs ingestion endpoint
andrewm4894 Nov 20, 2025
1076cfa
fix(llma): Remove conflicting opentelemetry-proto version pin
andrewm4894 Nov 20, 2025
eaf55ef
feat(llm-analytics): Improve OTEL ingestion authentication and data s…
andrewm4894 Nov 20, 2025
a0fad6c
Remove unused OTEL logs endpoint
andrewm4894 Nov 20, 2025
0061052
Revert "Remove unused OTEL logs endpoint"
andrewm4894 Nov 20, 2025
d014111
Restore OTEL logs endpoint to support both v1 and v2 instrumentation
andrewm4894 Nov 20, 2025
bf048c6
Fix v2 log event parsing to extract message content
andrewm4894 Nov 20, 2025
e311ee9
fix(otel): align transformers with PostHog LLM Analytics schema
andrewm4894 Nov 21, 2025
c3211d1
fix(otel): accumulate all v2 logs before merging to prevent race cond…
andrewm4894 Nov 21, 2025
b3d58f3
fix(llm-analytics): Skip event merger for v1 OTEL spans
andrewm4894 Nov 21, 2025
28e12d3
fix(llm-analytics): Fix v1 OTEL span detection
andrewm4894 Nov 21, 2025
3093228
docs(llm-analytics): Add comprehensive README for OTEL ingestion
andrewm4894 Nov 21, 2025
1d6759b
docs(llm-analytics): Refactor OTEL README to be standalone
andrewm4894 Nov 21, 2025
239b146
fix(llm-analytics): Rewrite event_merger tests for Redis-backed imple…
andrewm4894 Nov 21, 2025
ce69feb
chore(llm-analytics): Remove unused TypeScript OTEL transformer
andrewm4894 Nov 21, 2025
d8374fa
docs(llm-analytics): Remove OTEL_QUICKSTART.md
andrewm4894 Nov 21, 2025
d16f5a2
docs(llm-analytics): Fix README inaccuracies
andrewm4894 Nov 21, 2025
4a69244
docs(llm-analytics): Expand architecture diagram to show full event flow
andrewm4894 Nov 21, 2025
05380d0
fix(otel): Add gen_ai.tool.message handler for OTEL v2 ingestion
andrewm4894 Nov 21, 2025
fbf810b
test(otel): Add test coverage for tool message handling
andrewm4894 Nov 21, 2025
36cf748
fix(otel): Add Mastra OTEL ingestion support with provider transformers
andrewm4894 Nov 21, 2025
e9f3cff
docs(otel): Update README with provider transformers and Mastra support
andrewm4894 Nov 21, 2025
1e0264d
refactor(otel): Add OtelInstrumentationPattern enum for provider patt…
andrewm4894 Nov 27, 2025
dc7eb6b
chore(otel): Remove legacy proxy_logs_to_capture_service
andrewm4894 Nov 27, 2025
3b12436
chore(otel): Remove dead code and reduce verbose logging
andrewm4894 Nov 27, 2025
e694228
refactor(otel): Remove verbose debug logging
andrewm4894 Nov 27, 2025
6f0bf26
fix(otel): Send embedding spans immediately without waiting for logs
andrewm4894 Nov 27, 2025
3c7822c
fix(otel): Preserve per-span resource/scope instead of flattening to …
andrewm4894 Nov 27, 2025
80f1a80
docs(otel): Update README with embedding span handling and per-item p…
andrewm4894 Nov 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions posthog/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from posthog.temporal.codec_server import decode_payloads

from products.early_access_features.backend.api import early_access_features
from products.llm_analytics.backend.api.otel.ingestion import otel_logs_endpoint, otel_traces_endpoint

from .utils import opt_slash_path, render_template
from .views import (
Expand Down Expand Up @@ -168,6 +169,10 @@ def authorize_and_redirect(request: HttpRequest) -> HttpResponse:
# ee
*ee_urlpatterns,
# api
# OpenTelemetry traces ingestion for LLM Analytics
path("api/projects/<int:project_id>/ai/otel/traces", csrf_exempt(otel_traces_endpoint)),
# OpenTelemetry logs ingestion for LLM Analytics
path("api/projects/<int:project_id>/ai/otel/logs", csrf_exempt(otel_logs_endpoint)),
path("api/environments/<int:team_id>/progress/", progress),
path("api/environments/<int:team_id>/query/<str:query_uuid>/progress/", progress),
path("api/environments/<int:team_id>/query/<str:query_uuid>/progress", progress),
Expand Down
463 changes: 463 additions & 0 deletions products/llm_analytics/backend/api/otel/README.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions products/llm_analytics/backend/api/otel/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# OpenTelemetry traces ingestion for PostHog LLM Analytics
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""
OpenTelemetry semantic conventions for LLM traces.

Supports:
- PostHog native: posthog.ai.* attributes (highest priority)
- GenAI: gen_ai.* attributes (fallback)
"""
187 changes: 187 additions & 0 deletions products/llm_analytics/backend/api/otel/conventions/genai.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
"""
GenAI semantic conventions for OpenTelemetry.

Implements the GenAI semantic conventions (gen_ai.*) as fallback
when PostHog-native attributes are not present.

Supports provider-specific transformations for frameworks like Mastra
that use custom OTEL formats.

Reference: https://opentelemetry.io/docs/specs/semconv/gen-ai/
"""

from collections import defaultdict
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
from .providers.base import ProviderTransformer


def has_genai_attributes(span: dict[str, Any]) -> bool:
"""Check if span uses GenAI semantic conventions."""
attributes = span.get("attributes", {})
return any(key.startswith("gen_ai.") for key in attributes.keys())


def _extract_indexed_messages(attributes: dict[str, Any], prefix: str) -> list[dict[str, Any]] | None:
"""
Extract indexed message attributes like gen_ai.prompt.{N}.{field} into a list of message dicts.

Args:
attributes: Span attributes dictionary
prefix: Message prefix (e.g., "gen_ai.prompt" or "gen_ai.completion")

Returns:
List of message dicts with role, content, etc., or None if no messages found
"""
# Group attributes by index
messages_by_index: dict[int, dict[str, Any]] = defaultdict(dict)

for key, value in attributes.items():
if not key.startswith(f"{prefix}."):
continue

# Parse: gen_ai.prompt.0.role -> index=0, field=role
parts = key[len(prefix) + 1 :].split(".", 1)
if len(parts) != 2:
continue

try:
index = int(parts[0])
field = parts[1]
messages_by_index[index][field] = value
except (ValueError, IndexError):
continue

if not messages_by_index:
return None

# Convert to sorted list of messages
messages = []
for index in sorted(messages_by_index.keys()):
msg = messages_by_index[index]
if msg: # Only include non-empty messages
messages.append(msg)

return messages if messages else None


def detect_provider(span: dict[str, Any], scope: dict[str, Any] | None = None) -> "ProviderTransformer | None":
"""
Detect which provider transformer handles this span.

Args:
span: Parsed OTEL span
scope: Instrumentation scope info

Returns:
Matching ProviderTransformer instance, or None if no provider matches
"""
from .providers import PROVIDER_TRANSFORMERS

scope = scope or {}
for transformer_class in PROVIDER_TRANSFORMERS:
transformer = transformer_class()
if transformer.can_handle(span, scope):
return transformer
return None


def extract_genai_attributes(span: dict[str, Any], scope: dict[str, Any] | None = None) -> dict[str, Any]:
"""
Extract GenAI semantic convention attributes from span.

GenAI conventions use `gen_ai.*` prefix and are fallback
when PostHog-native attributes are not present.

Supports provider-specific transformations for frameworks that use
custom OTEL formats (e.g., Mastra).

Args:
span: Parsed OTEL span
scope: Instrumentation scope info (for provider detection)

Returns:
Extracted attributes dict
"""
attributes = span.get("attributes", {})
scope = scope or {}
result: dict[str, Any] = {}

# Detect provider-specific transformer
provider_transformer = detect_provider(span, scope)

# Model (prefer request, fallback to response, then system)
model = (
attributes.get("gen_ai.request.model")
or attributes.get("gen_ai.response.model")
or attributes.get("gen_ai.model")
)
if model is not None:
result["model"] = model

# Provider (from gen_ai.system)
if (system := attributes.get("gen_ai.system")) is not None:
result["provider"] = system

# Operation name
if (operation_name := attributes.get("gen_ai.operation.name")) is not None:
result["operation_name"] = operation_name

# Token usage
if (input_tokens := attributes.get("gen_ai.usage.input_tokens")) is not None:
result["input_tokens"] = input_tokens
if (output_tokens := attributes.get("gen_ai.usage.output_tokens")) is not None:
result["output_tokens"] = output_tokens

# Content (prompt and completion)
# Try indexed messages first (gen_ai.prompt.0.role, gen_ai.prompt.0.content, etc.)
prompts = _extract_indexed_messages(attributes, "gen_ai.prompt")
if prompts:
result["prompt"] = prompts
# Fallback to direct gen_ai.prompt attribute
elif (prompt := attributes.get("gen_ai.prompt")) is not None:
# Try provider-specific transformation
if provider_transformer:
transformed = provider_transformer.transform_prompt(prompt)
if transformed is not None:
result["prompt"] = transformed
else:
result["prompt"] = prompt
else:
result["prompt"] = prompt

completions = _extract_indexed_messages(attributes, "gen_ai.completion")
if completions:
result["completion"] = completions
# Fallback to direct gen_ai.completion attribute
elif (completion := attributes.get("gen_ai.completion")) is not None:
# Try provider-specific transformation
if provider_transformer:
transformed = provider_transformer.transform_completion(completion)
if transformed is not None:
result["completion"] = transformed
else:
result["completion"] = completion
else:
result["completion"] = completion

# Model parameters
if (temperature := attributes.get("gen_ai.request.temperature")) is not None:
result["temperature"] = temperature
if (max_tokens := attributes.get("gen_ai.request.max_tokens")) is not None:
result["max_tokens"] = max_tokens
if (top_p := attributes.get("gen_ai.request.top_p")) is not None:
result["top_p"] = top_p
if (frequency_penalty := attributes.get("gen_ai.request.frequency_penalty")) is not None:
result["frequency_penalty"] = frequency_penalty
if (presence_penalty := attributes.get("gen_ai.request.presence_penalty")) is not None:
result["presence_penalty"] = presence_penalty

# Response metadata
if (finish_reasons := attributes.get("gen_ai.response.finish_reasons")) is not None:
result["finish_reasons"] = finish_reasons
if (response_id := attributes.get("gen_ai.response.id")) is not None:
result["response_id"] = response_id

return result
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"""
PostHog-native OpenTelemetry conventions.

Attributes with `posthog.ai.*` prefix have highest priority in the waterfall.
"""

from typing import Any


def has_posthog_attributes(span: dict[str, Any]) -> bool:
"""Check if span uses PostHog native conventions."""
attributes = span.get("attributes", {})
return any(key.startswith("posthog.ai.") for key in attributes.keys())


def extract_posthog_native_attributes(span: dict[str, Any]) -> dict[str, Any]:
"""
Extract PostHog-native attributes from span.

PostHog-native convention uses `posthog.ai.*` prefix.
This takes highest priority in the waterfall pattern.
"""
attributes = span.get("attributes", {})
result: dict[str, Any] = {}

# Helper to get attribute with prefix
def get_attr(key: str) -> Any:
return attributes.get(f"posthog.ai.{key}")

# Core identifiers
if (model := get_attr("model")) is not None:
result["model"] = model
if (provider := get_attr("provider")) is not None:
result["provider"] = provider
if (trace_id := get_attr("trace_id")) is not None:
result["trace_id"] = trace_id
if (span_id := get_attr("span_id")) is not None:
result["span_id"] = span_id
if (parent_id := get_attr("parent_id")) is not None:
result["parent_id"] = parent_id
if (session_id := get_attr("session_id")) is not None:
result["session_id"] = session_id
if (generation_id := get_attr("generation_id")) is not None:
result["generation_id"] = generation_id

# Token usage
if (input_tokens := get_attr("input_tokens")) is not None:
result["input_tokens"] = input_tokens
if (output_tokens := get_attr("output_tokens")) is not None:
result["output_tokens"] = output_tokens
if (cache_read_tokens := get_attr("cache_read_tokens")) is not None:
result["cache_read_tokens"] = cache_read_tokens
if (cache_write_tokens := get_attr("cache_write_tokens")) is not None:
result["cache_write_tokens"] = cache_write_tokens

# Cost
if (input_cost_usd := get_attr("input_cost_usd")) is not None:
result["input_cost_usd"] = input_cost_usd
if (output_cost_usd := get_attr("output_cost_usd")) is not None:
result["output_cost_usd"] = output_cost_usd
if (total_cost_usd := get_attr("total_cost_usd")) is not None:
result["total_cost_usd"] = total_cost_usd

# Operation
if (operation_name := get_attr("operation_name")) is not None:
result["operation_name"] = operation_name

# Content
if (input_content := get_attr("input")) is not None:
result["input"] = input_content
if (output_content := get_attr("output")) is not None:
result["output"] = output_content

# Model parameters
if (temperature := get_attr("temperature")) is not None:
result["temperature"] = temperature
if (max_tokens := get_attr("max_tokens")) is not None:
result["max_tokens"] = max_tokens
if (stream := get_attr("stream")) is not None:
result["stream"] = stream

# Error tracking
if (is_error := get_attr("is_error")) is not None:
result["is_error"] = is_error
if (error_message := get_attr("error_message")) is not None:
result["error_message"] = error_message

return result
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""
Provider-specific OTEL transformers.

Each provider (Mastra, Langchain, LlamaIndex, etc.) handles their
specific OTEL format quirks and normalizes to PostHog format.
"""

from .base import OtelInstrumentationPattern, ProviderTransformer
from .mastra import MastraTransformer

# Registry of all available provider transformers
# Add new providers here as they're implemented
PROVIDER_TRANSFORMERS: list[type[ProviderTransformer]] = [
MastraTransformer,
# Future: LangchainTransformer, LlamaIndexTransformer, etc.
]

__all__ = [
"OtelInstrumentationPattern",
"ProviderTransformer",
"MastraTransformer",
"PROVIDER_TRANSFORMERS",
]
Loading
Loading