Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions snuba/datasets/deletion_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from dataclasses import dataclass, field
from typing import Dict, List, Sequence

from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType

MAX_ROWS_TO_DELETE_DEFAULT = 100000


Expand All @@ -14,3 +16,38 @@ class DeletionSettings:
allowed_columns: Sequence[str] = field(default_factory=list)
max_rows_to_delete: int = MAX_ROWS_TO_DELETE_DEFAULT
allowed_attributes_by_item_type: Dict[str, List[str]] = field(default_factory=dict)


def get_trace_item_type_name(item_type: int) -> str:
"""
Get the string name for a TraceItemType enum value.

Uses the protobuf enum's Name() method and strips the "TRACE_ITEM_TYPE_" prefix,
then converts to lowercase to match storage configuration naming conventions.

Args:
item_type: The integer value of the TraceItemType enum

Returns:
The string name used in storage configurations (e.g., "occurrence", "span")

Raises:
ValueError: If the item_type is not a valid TraceItemType enum value
"""
try:
# Get the full protobuf enum name (e.g., "TRACE_ITEM_TYPE_SPAN")
# Cast to TraceItemType.ValueType to satisfy type checker
full_name = TraceItemType.Name(item_type) # type: ignore[arg-type]

# Strip the "TRACE_ITEM_TYPE_" prefix and convert to lowercase
prefix = "TRACE_ITEM_TYPE_"
if not full_name.startswith(prefix):
raise ValueError(f"Unexpected TraceItemType name format: {full_name}")

return full_name[len(prefix) :].lower()
except ValueError as e:
# This happens when item_type is not a valid enum value
raise ValueError(
f"Unknown TraceItemType value: {item_type}. "
"Must be a valid TraceItemType enum value."
) from e
65 changes: 61 additions & 4 deletions snuba/lw_deletions/formatters.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from abc import ABC, abstractmethod
from collections import defaultdict
from typing import Mapping, MutableMapping, Sequence, Type
from typing import List, Mapping, MutableMapping, Sequence, Type

from snuba.datasets.storages.storage_key import StorageKey
from snuba.utils.hashes import fnv_1a
from snuba.web.bulk_delete_query import DeleteQueryMessage
from snuba.web.delete_query import ConditionsType

Expand Down Expand Up @@ -54,14 +55,70 @@ def format(self, messages: Sequence[DeleteQueryMessage]) -> Sequence[ConditionsT
]


class IdentityFormatter(Formatter):
class EAPItemsFormatter(Formatter):
# Number of attribute buckets used in eap_items storage
# TODO: find a way to wire this in from a canonical source
NUM_ATTRIBUTE_BUCKETS = 40
Comment on lines +59 to +61
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: EAPItemsFormatter uses a hardcoded NUM_ATTRIBUTE_BUCKETS, causing incorrect bucket indexing if configuration changes, leading to silent delete failures.
Severity: CRITICAL | Confidence: 1.00

🔍 Detailed Analysis

The EAPItemsFormatter uses a hardcoded NUM_ATTRIBUTE_BUCKETS = 40 for calculating attribute bucket indices. If the num_attribute_buckets configuration changes (e.g., to 64), but the hardcoded value is not updated, delete queries will incorrectly calculate bucket indices. For example, an attribute hashing to 45 would target attributes_string_5 instead of attributes_string_45, leading to silent failures where data is not deleted as intended, causing data inconsistency.

💡 Suggested Fix

Parameterize EAPItemsFormatter to receive num_attribute_buckets as a constructor argument, injecting the value from the canonical storage configuration, similar to other transformers.

🤖 Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: snuba/lw_deletions/formatters.py#L59-L61

Potential issue: The `EAPItemsFormatter` uses a hardcoded `NUM_ATTRIBUTE_BUCKETS = 40`
for calculating attribute bucket indices. If the `num_attribute_buckets` configuration
changes (e.g., to 64), but the hardcoded value is not updated, delete queries will
incorrectly calculate bucket indices. For example, an attribute hashing to 45 would
target `attributes_string_5` instead of `attributes_string_45`, leading to silent
failures where data is not deleted as intended, causing data inconsistency.

Did we get this right? 👍 / 👎 to inform future reviews.

Reference_id: 2749980


def format(self, messages: Sequence[DeleteQueryMessage]) -> Sequence[ConditionsType]:
return [msg["conditions"] for msg in messages]
"""
For eap_items storage, we need to resolve attribute_conditions to their
appropriate column names based on type:
- int/bool: Single columns (no bucketing) like attributes_int['key'] or attributes_bool['key']
- string/float: Hash-bucketed columns like attributes_string_0['key'] or attributes_float_23['key']

For example, if attribute_conditions has {"group_id": [123]}, we need to:
1. Determine the type based on the values (int in this case)
2. Since int doesn't use bucketing, add it as attributes_int['group_id'] = [123]
"""
formatted_conditions: List[ConditionsType] = []

for message in messages:
conditions = dict(message["conditions"])

# Process attribute_conditions if present
if "attribute_conditions" in message and message["attribute_conditions"]:
attribute_conditions = message["attribute_conditions"]

# For each attribute, determine its type and bucket (if applicable)
for attr_name, attr_values in attribute_conditions.items():
if not attr_values:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like a case that shouldn't happen? should we be logging something if it does? or is this a valid case and im just missing something

continue

# Determine the attribute type from the first value
# All values in the list should be of the same type
first_value = attr_values[0]
if isinstance(first_value, bool):
# Check bool before int since bool is a subclass of int in Python
attr_type = "bool"
elif isinstance(first_value, int):
attr_type = "int"
elif isinstance(first_value, float):
attr_type = "float"
else:
# Default to string for str and any other type
attr_type = "string"

# Only string and float attributes use bucketing
# int and bool attributes are stored in single columns
if attr_type in ("int", "bool"):
# No bucketing for int and bool
bucketed_column = f"attributes_{attr_type}['{attr_name}']"
else:
# Bucketing for string and float
bucket_idx = fnv_1a(attr_name.encode("utf-8")) % self.NUM_ATTRIBUTE_BUCKETS
bucketed_column = f"attributes_{attr_type}_{bucket_idx}['{attr_name}']"

conditions[bucketed_column] = attr_values

formatted_conditions.append(conditions)

return formatted_conditions


STORAGE_FORMATTER: Mapping[str, Type[Formatter]] = {
StorageKey.SEARCH_ISSUES.value: SearchIssuesFormatter,
# TODO: We will probably do something more sophisticated here in the future
# but it won't make much of a difference until we support delete by attribute
StorageKey.EAP_ITEMS.value: IdentityFormatter,
StorageKey.EAP_ITEMS.value: EAPItemsFormatter,
}
65 changes: 37 additions & 28 deletions snuba/web/bulk_delete_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from snuba.attribution.attribution_info import AttributionInfo
from snuba.clickhouse.columns import ColumnSet
from snuba.clickhouse.query import Query
from snuba.datasets.deletion_settings import DeletionSettings
from snuba.datasets.deletion_settings import DeletionSettings, get_trace_item_type_name
from snuba.datasets.storage import WritableTableStorage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.query.conditions import combine_or_conditions
Expand All @@ -31,7 +31,7 @@
from snuba.query.exceptions import InvalidQueryException, NoRowsToDeleteException
from snuba.query.expressions import Expression
from snuba.reader import Result
from snuba.state import get_str_config
from snuba.state import get_int_config, get_str_config
from snuba.utils.metrics.util import with_span
from snuba.utils.metrics.wrapper import MetricsWrapper
from snuba.utils.schemas import ColumnValidator, InvalidColumnType
Expand All @@ -56,11 +56,13 @@ class AttributeConditions:
attributes: Dict[str, List[Any]]


class DeleteQueryMessage(TypedDict):
class DeleteQueryMessage(TypedDict, total=False):
rows_to_delete: int
storage_name: str
conditions: ConditionsType
tenant_ids: Mapping[str, str | int]
attribute_conditions: Optional[Dict[str, List[Any]]]
attribute_conditions_item_type: Optional[int]


PRODUCER_MAP: MutableMapping[str, Producer] = {}
Expand Down Expand Up @@ -155,38 +157,36 @@ def _validate_attribute_conditions(
InvalidQueryException: If no attributes are configured for the item_type,
or if any requested attributes are not allowed
"""
# Get the string name for the item_type from the configuration
# The configuration uses string names (e.g., "occurrence") as keys
allowed_attrs_config = delete_settings.allowed_attributes_by_item_type

if not allowed_attrs_config:
raise InvalidQueryException("No attribute-based deletions configured for this storage")

# Check if the item_type has any allowed attributes configured
# Since the config uses string names and we're given an integer, we need to find the matching config
# For now, we'll check all configured item types and validate against any that match

# Try to find a matching configuration by item_type name
matching_allowed_attrs = None
for configured_item_type, allowed_attrs in allowed_attrs_config.items():
# For this initial implementation, we'll use the string key directly
# In the future, we might need a mapping from item_type int to string name
matching_allowed_attrs = allowed_attrs
break # For now, assume the first/only configured type
# Map the integer item_type to its string name used in configuration
try:
item_type_name = get_trace_item_type_name(attribute_conditions.item_type)
except ValueError as e:
raise InvalidQueryException(str(e))

if matching_allowed_attrs is None:
# Check if this specific item_type has any allowed attributes configured
if item_type_name not in allowed_attrs_config:
raise InvalidQueryException(
f"No attribute-based deletions configured for item_type {attribute_conditions.item_type}"
f"No attribute-based deletions configured for item_type {item_type_name} "
f"(value: {attribute_conditions.item_type}). Configured item types: "
f"{sorted(allowed_attrs_config.keys())}"
)

# Get the allowed attributes for this specific item_type
allowed_attrs = allowed_attrs_config[item_type_name]

# Validate that all requested attributes are allowed
requested_attrs = set(attribute_conditions.attributes.keys())
allowed_attrs_set = set(matching_allowed_attrs)
allowed_attrs_set = set(allowed_attrs)
invalid_attrs = requested_attrs - allowed_attrs_set

if invalid_attrs:
raise InvalidQueryException(
f"Invalid attributes for deletion: {invalid_attrs}. "
f"Invalid attributes for deletion on item_type '{item_type_name}': {invalid_attrs}. "
f"Allowed attributes: {allowed_attrs_set}"
)

Expand Down Expand Up @@ -234,15 +234,18 @@ def delete_from_storage(
# validate attribute conditions if provided
if attribute_conditions:
_validate_attribute_conditions(attribute_conditions, delete_settings)
logger.error(
"valid attribute_conditions passed to delete_from_storage, but delete will be ignored "
"as functionality is not yet implemented"
)
# deleting by just conditions and ignoring attribute_conditions would be dangerous
return {}

if not get_int_config("permit_delete_by_attribute", default=0):
logger.error(
"valid attribute_conditions passed to delete_from_storage, but delete will be ignored "
"as functionality is not yet launched (permit_delete_by_attribute=0)"
)
return {}

attr_info = _get_attribution_info(attribution_info)
return delete_from_tables(storage, delete_settings.tables, conditions, attr_info)
return delete_from_tables(
storage, delete_settings.tables, conditions, attr_info, attribute_conditions
)


def construct_query(storage: WritableTableStorage, table: str, condition: Expression) -> Query:
Expand All @@ -266,8 +269,8 @@ def delete_from_tables(
tables: Sequence[str],
conditions: Dict[str, Any],
attribution_info: AttributionInfo,
attribute_conditions: Optional[AttributeConditions] = None,
) -> dict[str, Result]:

highest_rows_to_delete = 0
result: dict[str, Result] = {}
for table in tables:
Expand All @@ -293,6 +296,12 @@ def delete_from_tables(
"conditions": conditions,
"tenant_ids": attribution_info.tenant_ids,
}

# Add attribute_conditions to the message if present
if attribute_conditions:
delete_query["attribute_conditions"] = attribute_conditions.attributes
delete_query["attribute_conditions_item_type"] = attribute_conditions.item_type

produce_delete_query(delete_query)
return result

Expand Down
40 changes: 37 additions & 3 deletions snuba/web/delete_query.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
import typing
import uuid
from typing import Any, Mapping, MutableMapping, Optional, Sequence
Expand Down Expand Up @@ -27,7 +28,13 @@
NoRowsToDeleteException,
TooManyDeleteRowsException,
)
from snuba.query.expressions import Expression, FunctionCall
from snuba.query.expressions import (
Column,
Expression,
FunctionCall,
Literal,
SubscriptableReference,
)
from snuba.query.query_settings import HTTPQuerySettings
from snuba.reader import Result
from snuba.state import get_config, get_int_config
Expand Down Expand Up @@ -354,12 +361,39 @@ def _execute_query(
def _construct_condition(columns: ConditionsType) -> Expression:
and_conditions = []
for col, values in columns.items():
# Check if this is a map access pattern like "attributes_string_0['group_id']"
col_expr = _parse_column_expression(col)

if len(values) == 1:
exp = equals(column(col), literal(values[0]))
exp = equals(col_expr, literal(values[0]))
else:
literal_values = [literal(v) for v in values]
exp = in_cond(column(col), literals_tuple(alias=None, literals=literal_values))
exp = in_cond(col_expr, literals_tuple(alias=None, literals=literal_values))

and_conditions.append(exp)

return combine_and_conditions(and_conditions)


def _parse_column_expression(col_name: str) -> Expression:
"""
Parse a column name that might include map access notation.

Examples:
"project_id" -> Column("project_id")
"attributes_string_0['group_id']" -> SubscriptableReference(Column("attributes_string_0"), Literal("group_id"))
"""
# Pattern to match "column_name['key']" or 'column_name["key"]'
match = re.match(r"^([a-zA-Z_][a-zA-Z0-9_]*)\[(['\"])(.+?)\2\]$", col_name)

if match:
base_column_name = match.group(1)
key_name = match.group(3)
return SubscriptableReference(
alias=None,
column=Column(alias=None, table_name=None, column_name=base_column_name),
key=Literal(alias=None, value=key_name),
)
else:
# Regular column
return column(col_name)
36 changes: 36 additions & 0 deletions tests/datasets/test_deletion_settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from __future__ import annotations

import pytest
from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType

from snuba.datasets.deletion_settings import get_trace_item_type_name


def test_get_trace_item_type_name_valid() -> None:
assert get_trace_item_type_name(TraceItemType.TRACE_ITEM_TYPE_UNSPECIFIED) == "unspecified"
assert get_trace_item_type_name(TraceItemType.TRACE_ITEM_TYPE_SPAN) == "span"
assert get_trace_item_type_name(TraceItemType.TRACE_ITEM_TYPE_ERROR) == "error"
assert get_trace_item_type_name(TraceItemType.TRACE_ITEM_TYPE_LOG) == "log"
assert get_trace_item_type_name(TraceItemType.TRACE_ITEM_TYPE_UPTIME_CHECK) == "uptime_check"
assert get_trace_item_type_name(TraceItemType.TRACE_ITEM_TYPE_UPTIME_RESULT) == "uptime_result"
assert get_trace_item_type_name(TraceItemType.TRACE_ITEM_TYPE_REPLAY) == "replay"
assert get_trace_item_type_name(TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE) == "occurrence"
assert get_trace_item_type_name(TraceItemType.TRACE_ITEM_TYPE_METRIC) == "metric"
assert (
get_trace_item_type_name(TraceItemType.TRACE_ITEM_TYPE_PROFILE_FUNCTION)
== "profile_function"
)


def test_get_trace_item_type_name_by_integer() -> None:
assert get_trace_item_type_name(0) == "unspecified"
assert get_trace_item_type_name(1) == "span"
assert get_trace_item_type_name(7) == "occurrence"


def test_get_trace_item_type_name_invalid() -> None:
with pytest.raises(ValueError, match="Unknown TraceItemType value: 999"):
get_trace_item_type_name(999)

with pytest.raises(ValueError, match="Unknown TraceItemType value: -1"):
get_trace_item_type_name(-1)
Loading
Loading