Skip to content

Commit cae5807

Browse files
andrewm4894claude
andcommitted
fix(otel): Add Mastra OTEL ingestion support with provider transformers
- Detect Mastra by instrumentation scope name (@mastra/otel) - Treat Mastra as v1 framework (all attributes in spans, no log merging) - Mark v1 framework root spans as $ai_span instead of $ai_trace to fix tree hierarchy - Add provider transformer pattern for framework-specific data transformations - Filter out raw input/output attributes to prevent duplicate otel.input/otel.output Fixes tree display issue where Mastra generations weren't appearing as children under the trace. Root spans from v1 frameworks must be $ai_span (not $ai_trace) since TraceQueryRunner filters out $ai_trace events from the events array. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 2f47460 commit cae5807

File tree

6 files changed

+479
-8
lines changed

6 files changed

+479
-8
lines changed

products/llm_analytics/backend/api/otel/conventions/genai.py

Lines changed: 84 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,17 @@
44
Implements the GenAI semantic conventions (gen_ai.*) as fallback
55
when PostHog-native attributes are not present.
66
7+
Supports provider-specific transformations for frameworks like Mastra
8+
that use custom OTEL formats.
9+
710
Reference: https://opentelemetry.io/docs/specs/semconv/gen-ai/
811
"""
912

1013
from collections import defaultdict
1114
from typing import Any
1215

16+
from .providers import PROVIDER_TRANSFORMERS
17+
1318

1419
def has_genai_attributes(span: dict[str, Any]) -> bool:
1520
"""Check if span uses GenAI semantic conventions."""
@@ -60,16 +65,44 @@ def _extract_indexed_messages(attributes: dict[str, Any], prefix: str) -> list[d
6065
return messages if messages else None
6166

6267

63-
def extract_genai_attributes(span: dict[str, Any]) -> dict[str, Any]:
68+
def extract_genai_attributes(span: dict[str, Any], scope: dict[str, Any] | None = None) -> dict[str, Any]:
6469
"""
6570
Extract GenAI semantic convention attributes from span.
6671
6772
GenAI conventions use `gen_ai.*` prefix and are fallback
6873
when PostHog-native attributes are not present.
74+
75+
Supports provider-specific transformations for frameworks that use
76+
custom OTEL formats (e.g., Mastra).
77+
78+
Args:
79+
span: Parsed OTEL span
80+
scope: Instrumentation scope info (for provider detection)
81+
82+
Returns:
83+
Extracted attributes dict
6984
"""
85+
import structlog
86+
87+
logger = structlog.get_logger(__name__)
7088
attributes = span.get("attributes", {})
89+
scope = scope or {}
7190
result: dict[str, Any] = {}
7291

92+
# Detect provider-specific transformer
93+
provider_transformer = None
94+
for transformer_class in PROVIDER_TRANSFORMERS:
95+
transformer = transformer_class()
96+
if transformer.can_handle(span, scope):
97+
provider_transformer = transformer
98+
logger.info(
99+
"provider_transformer_detected",
100+
provider=transformer.get_provider_name(),
101+
scope_name=scope.get("name"),
102+
span_name=span.get("name"),
103+
)
104+
break
105+
73106
# Model (prefer request, fallback to response, then system)
74107
model = (
75108
attributes.get("gen_ai.request.model")
@@ -100,14 +133,62 @@ def extract_genai_attributes(span: dict[str, Any]) -> dict[str, Any]:
100133
result["prompt"] = prompts
101134
# Fallback to direct gen_ai.prompt attribute
102135
elif (prompt := attributes.get("gen_ai.prompt")) is not None:
103-
result["prompt"] = prompt
136+
# Try provider-specific transformation
137+
if provider_transformer:
138+
logger.info(
139+
"provider_transform_prompt_attempt",
140+
provider=provider_transformer.get_provider_name(),
141+
prompt_type=type(prompt).__name__,
142+
prompt_length=len(str(prompt)) if prompt else 0,
143+
)
144+
transformed = provider_transformer.transform_prompt(prompt)
145+
if transformed is not None:
146+
logger.info(
147+
"provider_transform_prompt_success",
148+
provider=provider_transformer.get_provider_name(),
149+
result_type=type(transformed).__name__,
150+
result_length=len(transformed) if isinstance(transformed, list) else 0,
151+
)
152+
result["prompt"] = transformed
153+
else:
154+
logger.info(
155+
"provider_transform_prompt_none",
156+
provider=provider_transformer.get_provider_name(),
157+
)
158+
result["prompt"] = prompt
159+
else:
160+
result["prompt"] = prompt
104161

105162
completions = _extract_indexed_messages(attributes, "gen_ai.completion")
106163
if completions:
107164
result["completion"] = completions
108165
# Fallback to direct gen_ai.completion attribute
109166
elif (completion := attributes.get("gen_ai.completion")) is not None:
110-
result["completion"] = completion
167+
# Try provider-specific transformation
168+
if provider_transformer:
169+
logger.info(
170+
"provider_transform_completion_attempt",
171+
provider=provider_transformer.get_provider_name(),
172+
completion_type=type(completion).__name__,
173+
completion_length=len(str(completion)) if completion else 0,
174+
)
175+
transformed = provider_transformer.transform_completion(completion)
176+
if transformed is not None:
177+
logger.info(
178+
"provider_transform_completion_success",
179+
provider=provider_transformer.get_provider_name(),
180+
result_type=type(transformed).__name__,
181+
result_length=len(transformed) if isinstance(transformed, list) else 0,
182+
)
183+
result["completion"] = transformed
184+
else:
185+
logger.info(
186+
"provider_transform_completion_none",
187+
provider=provider_transformer.get_provider_name(),
188+
)
189+
result["completion"] = completion
190+
else:
191+
result["completion"] = completion
111192

112193
# Model parameters
113194
if (temperature := attributes.get("gen_ai.request.temperature")) is not None:
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
"""
2+
Provider-specific OTEL transformers.
3+
4+
Each provider (Mastra, Langchain, LlamaIndex, etc.) handles their
5+
specific OTEL format quirks and normalizes to PostHog format.
6+
"""
7+
8+
from .base import ProviderTransformer
9+
from .mastra import MastraTransformer
10+
11+
# Registry of all available provider transformers
12+
# Add new providers here as they're implemented
13+
PROVIDER_TRANSFORMERS: list[type[ProviderTransformer]] = [
14+
MastraTransformer,
15+
# Future: LangchainTransformer, LlamaIndexTransformer, etc.
16+
]
17+
18+
__all__ = [
19+
"ProviderTransformer",
20+
"MastraTransformer",
21+
"PROVIDER_TRANSFORMERS",
22+
]
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
"""
2+
Base provider transformer interface.
3+
4+
Provider transformers handle framework/library-specific OTEL formats
5+
and normalize them to PostHog's standard format.
6+
"""
7+
8+
from abc import ABC, abstractmethod
9+
from typing import Any
10+
11+
12+
class ProviderTransformer(ABC):
13+
"""
14+
Base class for provider-specific OTEL transformers.
15+
16+
Each provider (Mastra, Langchain, LlamaIndex, etc.) can implement
17+
a transformer to handle their specific OTEL format quirks.
18+
"""
19+
20+
@abstractmethod
21+
def can_handle(self, span: dict[str, Any], scope: dict[str, Any]) -> bool:
22+
"""
23+
Detect if this transformer can handle the given span.
24+
25+
Args:
26+
span: Parsed OTEL span
27+
scope: Instrumentation scope info
28+
29+
Returns:
30+
True if this transformer recognizes and can handle this span
31+
"""
32+
pass
33+
34+
@abstractmethod
35+
def transform_prompt(self, prompt: Any) -> Any:
36+
"""
37+
Transform provider-specific prompt format to standard format.
38+
39+
Args:
40+
prompt: Raw prompt value from gen_ai.prompt attribute
41+
42+
Returns:
43+
Normalized prompt (list of message dicts, string, or None if no transformation needed)
44+
"""
45+
pass
46+
47+
@abstractmethod
48+
def transform_completion(self, completion: Any) -> Any:
49+
"""
50+
Transform provider-specific completion format to standard format.
51+
52+
Args:
53+
completion: Raw completion value from gen_ai.completion attribute
54+
55+
Returns:
56+
Normalized completion (list of message dicts, string, or None if no transformation needed)
57+
"""
58+
pass
59+
60+
def get_provider_name(self) -> str:
61+
"""
62+
Get the provider name for logging/debugging.
63+
64+
Returns:
65+
Human-readable provider name
66+
"""
67+
return self.__class__.__name__.replace("Transformer", "")
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
"""
2+
Mastra provider transformer.
3+
4+
Handles Mastra's OTEL format which wraps messages in custom structures:
5+
- Input: {"messages": [{"role": "user", "content": [...]}]}
6+
- Output: {"files": [], "text": "...", "warnings": [], ...}
7+
"""
8+
9+
import json
10+
from typing import Any
11+
12+
from .base import ProviderTransformer
13+
14+
15+
class MastraTransformer(ProviderTransformer):
16+
"""
17+
Transform Mastra's OTEL format to PostHog standard format.
18+
19+
Mastra uses @mastra/otel instrumentation scope and wraps messages
20+
in custom structures that need unwrapping.
21+
"""
22+
23+
def can_handle(self, span: dict[str, Any], scope: dict[str, Any]) -> bool:
24+
"""
25+
Detect Mastra by instrumentation scope name.
26+
27+
Mastra sets scope.name to "@mastra/otel" in its span converter.
28+
"""
29+
scope_name = scope.get("name", "")
30+
31+
# Primary detection: instrumentation scope
32+
if scope_name == "@mastra/otel":
33+
return True
34+
35+
# Fallback: check for mastra-prefixed attributes
36+
attributes = span.get("attributes", {})
37+
return any(key.startswith("mastra.") for key in attributes.keys())
38+
39+
def transform_prompt(self, prompt: Any) -> Any:
40+
"""
41+
Transform Mastra's wrapped input format.
42+
43+
Mastra wraps messages as: {"messages": [{"role": "user", "content": [...]}]}
44+
where content can be an array of objects like [{"type": "text", "text": "..."}]
45+
"""
46+
import structlog
47+
48+
logger = structlog.get_logger(__name__)
49+
50+
if not isinstance(prompt, str):
51+
logger.info("mastra_transform_prompt_skip_not_string", prompt_type=type(prompt).__name__)
52+
return None # No transformation needed
53+
54+
try:
55+
parsed = json.loads(prompt)
56+
logger.info(
57+
"mastra_transform_prompt_parsed",
58+
has_messages=("messages" in parsed) if isinstance(parsed, dict) else False,
59+
parsed_type=type(parsed).__name__,
60+
)
61+
62+
# Check for Mastra input format: {"messages": [...]}
63+
if not isinstance(parsed, dict) or "messages" not in parsed:
64+
return None # Not Mastra format
65+
66+
messages = parsed["messages"]
67+
if not isinstance(messages, list):
68+
return None
69+
70+
# Transform Mastra messages to standard format
71+
result = []
72+
for msg in messages:
73+
if not isinstance(msg, dict) or "role" not in msg:
74+
continue
75+
76+
# Handle Mastra's content array format: [{"type": "text", "text": "..."}]
77+
if "content" in msg and isinstance(msg["content"], list):
78+
text_parts = []
79+
for content_item in msg["content"]:
80+
if isinstance(content_item, dict):
81+
if content_item.get("type") == "text" and "text" in content_item:
82+
text_parts.append(content_item["text"])
83+
84+
if text_parts:
85+
result.append({"role": msg["role"], "content": " ".join(text_parts)})
86+
else:
87+
# Keep as-is if we can't extract text
88+
result.append(msg)
89+
else:
90+
# Standard format message
91+
result.append(msg)
92+
93+
return result if result else None
94+
95+
except (json.JSONDecodeError, TypeError, KeyError):
96+
return None
97+
98+
def transform_completion(self, completion: Any) -> Any:
99+
"""
100+
Transform Mastra's wrapped output format.
101+
102+
Mastra wraps output as: {"files": [], "text": "...", "warnings": [], ...}
103+
Extract just the text content.
104+
"""
105+
if not isinstance(completion, str):
106+
return None # No transformation needed
107+
108+
try:
109+
parsed = json.loads(completion)
110+
111+
# Check for Mastra output format: {"text": "...", ...}
112+
if not isinstance(parsed, dict) or "text" not in parsed:
113+
return None # Not Mastra format
114+
115+
# Extract text content as assistant message
116+
return [{"role": "assistant", "content": parsed["text"]}]
117+
118+
except (json.JSONDecodeError, TypeError, KeyError):
119+
return None

0 commit comments

Comments
 (0)