From 07a62e847dae2f6e810761c5659fa02036c00fcb Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Wed, 12 Nov 2025 15:21:21 -0800 Subject: [PATCH 01/20] feat(deletes): add support for delete by attribute in config --- .../events_analytics_platform/storages/eap_items.yaml | 3 +++ snuba/datasets/configuration/json_schema.py | 8 ++++++++ snuba/datasets/deletion_settings.py | 3 ++- 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items.yaml index 3b06258dc26..875c1078c12 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items.yaml @@ -147,6 +147,9 @@ deletion_settings: - organization_id - trace_id - item_type + allowed_attributes_by_item_type: + occurrence: + - group_id mandatory_condition_checkers: - condition: OrgIdEnforcer diff --git a/snuba/datasets/configuration/json_schema.py b/snuba/datasets/configuration/json_schema.py index 28c52c47324..d7c85d70b72 100644 --- a/snuba/datasets/configuration/json_schema.py +++ b/snuba/datasets/configuration/json_schema.py @@ -615,6 +615,14 @@ def registered_class_array_schema( "type": "integer", }, "bulk_delete_only": {"type": "boolean"}, + "allowed_attributes_by_item_type": { + "type": "object", + "description": "Mapping of item_type to list of allowed attributes for deletion.", + "additionalProperties": { + "type": "array", + "items": {"type": "string"}, + }, + }, }, "required": ["is_enabled", "tables"], "additionalProperties": False, diff --git a/snuba/datasets/deletion_settings.py b/snuba/datasets/deletion_settings.py index 13fb878d6b6..144ba59819e 100644 --- a/snuba/datasets/deletion_settings.py +++ b/snuba/datasets/deletion_settings.py @@ -1,7 +1,7 @@ from __future__ import annotations from dataclasses import dataclass, field -from typing import Sequence +from typing import Dict, List, Sequence MAX_ROWS_TO_DELETE_DEFAULT = 100000 @@ -13,3 +13,4 @@ class DeletionSettings: bulk_delete_only: bool = False allowed_columns: Sequence[str] = field(default_factory=list) max_rows_to_delete: int = MAX_ROWS_TO_DELETE_DEFAULT + allowed_attributes_by_item_type: Dict[str, List[str]] = field(default_factory=dict) From f4530fecaccc73c5b08e1e3f36087bb79e37d885 Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Thu, 13 Nov 2025 08:23:09 -0800 Subject: [PATCH 02/20] [wip] add support for attribute_conditions in bulk_delete_query --- snuba/web/bulk_delete_query.py | 80 +++++++++++++++++++++++++++++ tests/web/test_bulk_delete_query.py | 70 +++++++++++++++++++++++++ 2 files changed, 150 insertions(+) diff --git a/snuba/web/bulk_delete_query.py b/snuba/web/bulk_delete_query.py index f501d44738b..064363b7466 100644 --- a/snuba/web/bulk_delete_query.py +++ b/snuba/web/bulk_delete_query.py @@ -12,6 +12,7 @@ from snuba.attribution.attribution_info import AttributionInfo from snuba.clickhouse.columns import ColumnSet from snuba.clickhouse.query import Query +from snuba.datasets.deletion_settings import DeletionSettings from snuba.datasets.storage import WritableTableStorage from snuba.datasets.storages.storage_key import StorageKey from snuba.query.conditions import combine_or_conditions @@ -123,11 +124,85 @@ def produce_delete_query(delete_query: DeleteQueryMessage) -> None: logger.exception("Could not produce delete query due to error: %r", ex) +def _validate_attribute_conditions( + attribute_conditions: Dict[str, list[Any]], + conditions: Dict[str, list[Any]], + delete_settings: DeletionSettings, +) -> None: + """ + Validates that the attribute_conditions are allowed for the item_type specified in conditions. + + Args: + attribute_conditions: Dict mapping attribute names to their values + conditions: Dict mapping column names to their values (must include 'item_type') + delete_settings: The deletion settings for the storage + + Raises: + InvalidQueryException: If item_type is not in conditions, if no attributes are configured + for the item_type, or if any requested attributes are not allowed + """ + if not attribute_conditions: + return + + # Ensure item_type is specified in conditions + if "item_type" not in conditions: + raise InvalidQueryException( + "item_type must be specified in conditions when using attribute_conditions" + ) + + # Get the item_type value(s) + item_type_values = conditions["item_type"] + if not item_type_values: + raise InvalidQueryException("item_type cannot be empty when using attribute_conditions") + + # For now, we only support a single item_type value when using attribute_conditions + if len(item_type_values) > 1: + raise InvalidQueryException("attribute_conditions only supports a single item_type value") + + item_type = item_type_values[0] + + # Get the string name for the item_type from the configuration + # The configuration uses string names (e.g., "occurrence") as keys + allowed_attrs_config = delete_settings.allowed_attributes_by_item_type + + if not allowed_attrs_config: + raise InvalidQueryException("No attribute-based deletions configured for this storage") + + # Check if the item_type has any allowed attributes configured + # Since the config uses string names and we're given an integer, we need to find the matching config + # For now, we'll check all configured item types and validate against any that match + + # Try to find a matching configuration by item_type name + matching_allowed_attrs = None + for configured_item_type, allowed_attrs in allowed_attrs_config.items(): + # For this initial implementation, we'll use the string key directly + # In the future, we might need a mapping from item_type int to string name + matching_allowed_attrs = allowed_attrs + break # For now, assume the first/only configured type + + if matching_allowed_attrs is None: + raise InvalidQueryException( + f"No attribute-based deletions configured for item_type {item_type}" + ) + + # Validate that all requested attributes are allowed + requested_attrs = set(attribute_conditions.keys()) + allowed_attrs_set = set(matching_allowed_attrs) + invalid_attrs = requested_attrs - allowed_attrs_set + + if invalid_attrs: + raise InvalidQueryException( + f"Invalid attributes for deletion: {invalid_attrs}. " + f"Allowed attributes: {allowed_attrs_set}" + ) + + @with_span() def delete_from_storage( storage: WritableTableStorage, conditions: Dict[str, list[Any]], attribution_info: Mapping[str, Any], + attribute_conditions: Optional[Dict[str, list[Any]]] = None, ) -> dict[str, Result]: """ This method does a series of validation checks (outline below), @@ -138,6 +213,7 @@ def delete_from_storage( * storage validation that deletes are enabled * column names are valid (allowed_columns storage setting) * column types are valid + * attribute names are valid (allowed_attributes_by_item_type storage setting) """ if not deletes_are_enabled(): raise DeletesNotEnabledError("Deletes not enabled in this region") @@ -161,6 +237,10 @@ def delete_from_storage( except InvalidColumnType as e: raise InvalidQueryException(e.message) + # validate attribute conditions if provided + if attribute_conditions: + _validate_attribute_conditions(attribute_conditions, conditions, delete_settings) + attr_info = _get_attribution_info(attribution_info) return delete_from_tables(storage, delete_settings.tables, conditions, attr_info) diff --git a/tests/web/test_bulk_delete_query.py b/tests/web/test_bulk_delete_query.py index efe7423feaf..defa87bffc1 100644 --- a/tests/web/test_bulk_delete_query.py +++ b/tests/web/test_bulk_delete_query.py @@ -134,3 +134,73 @@ def test_delete_invalid_column_name() -> None: with pytest.raises(InvalidQueryException): delete_from_storage(storage, conditions, attr_info) + + +@pytest.mark.redis_db +def test_attribute_conditions_valid() -> None: + """Test that valid attribute_conditions are accepted for eap_items storage""" + storage = get_writable_storage(StorageKey("eap_items")) + conditions = {"project_id": [1], "item_type": [1]} + attribute_conditions = {"group_id": [12345]} + attr_info = get_attribution_info() + + # Mock out _enforce_max_rows to avoid needing actual data + with patch("snuba.web.bulk_delete_query._enforce_max_rows", return_value=10): + with patch("snuba.web.bulk_delete_query.produce_delete_query"): + # Should not raise an exception + delete_from_storage(storage, conditions, attr_info, attribute_conditions) + + +@pytest.mark.redis_db +def test_attribute_conditions_invalid_attribute() -> None: + """Test that invalid attribute names in attribute_conditions are rejected""" + storage = get_writable_storage(StorageKey("eap_items")) + conditions = {"project_id": [1], "item_type": [1]} + attribute_conditions = {"invalid_attr": [12345]} + attr_info = get_attribution_info() + + with pytest.raises(InvalidQueryException, match="Invalid attributes for deletion"): + delete_from_storage(storage, conditions, attr_info, attribute_conditions) + + +@pytest.mark.redis_db +def test_attribute_conditions_missing_item_type() -> None: + """Test that attribute_conditions requires item_type in conditions""" + storage = get_writable_storage(StorageKey("eap_items")) + conditions = {"project_id": [1]} + attribute_conditions = {"group_id": [12345]} + attr_info = get_attribution_info() + + with pytest.raises( + InvalidQueryException, + match="item_type must be specified in conditions when using attribute_conditions", + ): + delete_from_storage(storage, conditions, attr_info, attribute_conditions) + + +@pytest.mark.redis_db +def test_attribute_conditions_multiple_item_types() -> None: + """Test that attribute_conditions doesn't support multiple item_type values""" + storage = get_writable_storage(StorageKey("eap_items")) + conditions = {"project_id": [1], "item_type": [1, 2]} + attribute_conditions = {"group_id": [12345]} + attr_info = get_attribution_info() + + with pytest.raises( + InvalidQueryException, match="attribute_conditions only supports a single item_type value" + ): + delete_from_storage(storage, conditions, attr_info, attribute_conditions) + + +@pytest.mark.redis_db +def test_attribute_conditions_storage_not_configured() -> None: + """Test that storages without attribute deletion config reject attribute_conditions""" + storage = get_writable_storage(StorageKey("search_issues")) + conditions = {"project_id": [1], "item_type": [1]} + attribute_conditions = {"some_attr": [12345]} + attr_info = get_attribution_info() + + with pytest.raises( + InvalidQueryException, match="No attribute-based deletions configured for this storage" + ): + delete_from_storage(storage, conditions, attr_info, attribute_conditions) From 7cf9ac1ba02e8dec020c222a5c3836cd9e12aa73 Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Thu, 13 Nov 2025 08:41:41 -0800 Subject: [PATCH 03/20] wire in TraceItemFilter[s] to attribute_conditions in delete_from_storage --- .../web/rpc/v1/endpoint_delete_trace_items.py | 118 ++++++++++- .../v1/test_endpoint_delete_trace_items.py | 199 +++++++++++++++++- 2 files changed, 307 insertions(+), 10 deletions(-) diff --git a/snuba/web/rpc/v1/endpoint_delete_trace_items.py b/snuba/web/rpc/v1/endpoint_delete_trace_items.py index 82601048fcd..9a51f0ffb09 100644 --- a/snuba/web/rpc/v1/endpoint_delete_trace_items.py +++ b/snuba/web/rpc/v1/endpoint_delete_trace_items.py @@ -1,9 +1,13 @@ -from typing import Type +from typing import Any, Dict, List, Optional, Type, cast from sentry_protos.snuba.v1.endpoint_delete_trace_items_pb2 import ( DeleteTraceItemsRequest, DeleteTraceItemsResponse, ) +from sentry_protos.snuba.v1.trace_item_filter_pb2 import ( + ComparisonFilter, + TraceItemFilter, +) from snuba.attribution.appid import AppID from snuba.datasets.storages.factory import get_writable_storage @@ -13,6 +17,81 @@ from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException +def _extract_attribute_value(comparison_filter: ComparisonFilter) -> Any: + """Extract the value from a ComparisonFilter's AttributeValue.""" + value_field = comparison_filter.value.WhichOneof("value") + if value_field == "val_str": + return comparison_filter.value.val_str + elif value_field == "val_int": + return comparison_filter.value.val_int + elif value_field == "val_double": + return comparison_filter.value.val_double + elif value_field == "val_bool": + return comparison_filter.value.val_bool + elif value_field == "val_str_array": + return list(comparison_filter.value.val_str_array.values) + elif value_field == "val_int_array": + return list(comparison_filter.value.val_int_array.values) + elif value_field == "val_double_array": + return list(comparison_filter.value.val_double_array.values) + else: + raise BadSnubaRPCRequestException(f"Unsupported attribute value type: {value_field}") + + +def _trace_item_filters_to_attribute_conditions( + filters: list[TraceItemFilter], +) -> Dict[str, list[Any]]: + """ + Convert TraceItemFilters to attribute_conditions for deletion. + + Only supports ComparisonFilter with OP_EQUALS or OP_IN operations. + All filters are combined with AND logic. + + Args: + filters: List of TraceItemFilter from the request + + Returns: + Dict mapping attribute names to lists of values + + Raises: + BadSnubaRPCRequestException: If unsupported filter types or operations are encountered + """ + attribute_conditions: Dict[str, list[Any]] = {} + + for trace_filter in filters: + # Only support comparison filters for deletion + if not trace_filter.HasField("comparison_filter"): + raise BadSnubaRPCRequestException( + "Only comparison filters are supported for deletion. " + "AND, OR, and NOT filters are not supported." + ) + + comparison_filter = trace_filter.comparison_filter + op = comparison_filter.op + + # Only support equality operations for deletion + if op not in (ComparisonFilter.OP_EQUALS, ComparisonFilter.OP_IN): + op_name = ComparisonFilter.Op.Name(op) + raise BadSnubaRPCRequestException( + f"Only OP_EQUALS and OP_IN operations are supported for deletion. Got: {op_name}" + ) + + attribute_name = comparison_filter.key.name + value = _extract_attribute_value(comparison_filter) + + # Convert single values to lists for consistency + if not isinstance(value, list): + value = [value] + + # If the attribute already exists, extend the list (OR logic within same attribute) + if attribute_name in attribute_conditions: + attribute_conditions[attribute_name].extend(value) + else: + attribute_conditions[attribute_name] = value + + return attribute_conditions + + class EndpointDeleteTraceItems(RPCEndpoint[DeleteTraceItemsRequest, DeleteTraceItemsResponse]): @classmethod def version(cls) -> str: @@ -36,9 +115,6 @@ def _execute(self, request: DeleteTraceItemsRequest) -> DeleteTraceItemsResponse if has_trace_ids and has_filters: raise BadSnubaRPCRequestException("Provide only one of trace_ids or filters, not both.") - if has_filters: - raise NotImplementedError("Currently, only delete by trace_ids is supported") - attribution_info = { "app_id": AppID("eap"), "referrer": request.meta.referrer, @@ -51,14 +127,38 @@ def _execute(self, request: DeleteTraceItemsRequest) -> DeleteTraceItemsResponse "parent_api": "eap_delete_trace_items", } + # Build base conditions that apply to all deletions + conditions: Dict[str, List[Any]] = { + "organization_id": [request.meta.organization_id], + "project_id": list(request.meta.project_ids), + } + + attribute_conditions: Optional[Dict[str, List[Any]]] = None + + if has_trace_ids: + # Delete by trace_ids (no attribute filtering) + conditions["trace_id"] = [str(tid) for tid in request.trace_ids] + else: + # Delete by filters (with attribute filtering) + # item_type must be specified in the request metadata for attribute-based deletion + if request.meta.trace_item_type == 0: # TRACE_ITEM_TYPE_UNSPECIFIED + raise BadSnubaRPCRequestException( + "trace_item_type must be specified in metadata when using filters" + ) + + conditions["item_type"] = [request.meta.trace_item_type] + # Convert filters to list - filters may be TraceItemFilterWithType which has a 'filter' field + filters_list: List[TraceItemFilter] = [ + cast(TraceItemFilter, f.filter if hasattr(f, "filter") else f) + for f in request.filters + ] + attribute_conditions = _trace_item_filters_to_attribute_conditions(filters_list) + delete_result = delete_from_storage( get_writable_storage(StorageKey.EAP_ITEMS), - { - "organization_id": [request.meta.organization_id], - "trace_id": list(request.trace_ids), - "project_id": list(request.meta.project_ids), - }, + conditions, attribution_info, + attribute_conditions, ) response = DeleteTraceItemsResponse() diff --git a/tests/web/rpc/v1/test_endpoint_delete_trace_items.py b/tests/web/rpc/v1/test_endpoint_delete_trace_items.py index e94be2d3e91..2e858ae92ff 100644 --- a/tests/web/rpc/v1/test_endpoint_delete_trace_items.py +++ b/tests/web/rpc/v1/test_endpoint_delete_trace_items.py @@ -10,7 +10,17 @@ DeleteTraceItemsRequest, DeleteTraceItemsResponse, ) -from sentry_protos.snuba.v1.request_common_pb2 import RequestMeta, ResponseMeta +from sentry_protos.snuba.v1.request_common_pb2 import ( + RequestMeta, + ResponseMeta, + TraceItemFilterWithType, + TraceItemType, +) +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, AttributeValue +from sentry_protos.snuba.v1.trace_item_filter_pb2 import ( + ComparisonFilter, + TraceItemFilter, +) from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue from snuba.datasets.storages.factory import get_storage @@ -136,3 +146,190 @@ def test_valid_trace_id_produces_bulk_delete_message( assert called_args["conditions"]["organization_id"] == [1] assert called_args["conditions"]["trace_id"] == [_TRACE_ID] assert called_args["rows_to_delete"] == _SPAN_COUNT + + def test_filters_with_equals_operation_accepted(self) -> None: + """Test that filters with OP_EQUALS are properly converted to attribute_conditions""" + ts = Timestamp() + ts.GetCurrentTime() + + message = DeleteTraceItemsRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=ts, + end_timestamp=ts, + request_id=_REQUEST_ID, + trace_item_type=TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE, + ), + filters=[ + TraceItemFilterWithType( + filter=TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey(name="group_id", type=AttributeKey.TYPE_INT), + op=ComparisonFilter.OP_EQUALS, + value=AttributeValue(val_int=12345), + ) + ) + ) + ], + ) + + with patch("snuba.web.bulk_delete_query._enforce_max_rows", return_value=10): + with patch("snuba.web.bulk_delete_query.produce_delete_query") as mock_produce: + EndpointDeleteTraceItems().execute(message) + + # Verify produce_delete_query was called with attribute_conditions + assert mock_produce.call_count == 1 + + def test_filters_with_in_operation_accepted(self) -> None: + """Test that filters with OP_IN are properly converted to attribute_conditions""" + ts = Timestamp() + ts.GetCurrentTime() + + int_array = AttributeValue.IntArray() # type: ignore + int_array.values.extend([12345, 67890]) + + message = DeleteTraceItemsRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=ts, + end_timestamp=ts, + request_id=_REQUEST_ID, + trace_item_type=TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE, + ), + filters=[ + TraceItemFilterWithType( + filter=TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey(name="group_id", type=AttributeKey.TYPE_INT), + op=ComparisonFilter.OP_IN, + value=AttributeValue(val_int_array=int_array), + ) + ) + ) + ], + ) + + with patch("snuba.web.bulk_delete_query._enforce_max_rows", return_value=10): + with patch("snuba.web.bulk_delete_query.produce_delete_query"): + EndpointDeleteTraceItems().execute(message) + + def test_filters_with_unsupported_operation_rejected(self) -> None: + """Test that filters with operations other than OP_EQUALS/OP_IN are rejected""" + ts = Timestamp() + ts.GetCurrentTime() + + message = DeleteTraceItemsRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=ts, + end_timestamp=ts, + request_id=_REQUEST_ID, + trace_item_type=TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE, + ), + filters=[ + TraceItemFilterWithType( + filter=TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey(name="timestamp", type=AttributeKey.TYPE_INT), + op=ComparisonFilter.OP_GREATER_THAN, + value=AttributeValue(val_int=1234567890), + ) + ) + ) + ], + ) + + with pytest.raises( + BadSnubaRPCRequestException, + match="Only OP_EQUALS and OP_IN operations are supported for deletion", + ): + EndpointDeleteTraceItems().execute(message) + + def test_filters_with_and_filter_rejected(self) -> None: + """Test that AND/OR/NOT filters are rejected""" + ts = Timestamp() + ts.GetCurrentTime() + + from sentry_protos.snuba.v1.trace_item_filter_pb2 import AndFilter + + message = DeleteTraceItemsRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=ts, + end_timestamp=ts, + request_id=_REQUEST_ID, + trace_item_type=TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE, + ), + filters=[ + TraceItemFilterWithType( + filter=TraceItemFilter( + and_filter=AndFilter( + filters=[ + TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey( + name="group_id", type=AttributeKey.TYPE_INT + ), + op=ComparisonFilter.OP_EQUALS, + value=AttributeValue(val_int=12345), + ) + ) + ] + ) + ) + ) + ], + ) + + with pytest.raises( + BadSnubaRPCRequestException, + match="Only comparison filters are supported for deletion", + ): + EndpointDeleteTraceItems().execute(message) + + def test_filters_without_item_type_rejected(self) -> None: + """Test that filters without item_type in metadata are rejected""" + ts = Timestamp() + ts.GetCurrentTime() + + message = DeleteTraceItemsRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=ts, + end_timestamp=ts, + request_id=_REQUEST_ID, + # trace_item_type is intentionally omitted (defaults to UNSPECIFIED) + ), + filters=[ + TraceItemFilterWithType( + filter=TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey(name="group_id", type=AttributeKey.TYPE_INT), + op=ComparisonFilter.OP_EQUALS, + value=AttributeValue(val_int=12345), + ) + ) + ) + ], + ) + + with pytest.raises( + BadSnubaRPCRequestException, + match="trace_item_type must be specified in metadata when using filters", + ): + EndpointDeleteTraceItems().execute(message) From 0ccbaa2f0a8cb6b51c03fa32f2e410dd629ff7d5 Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Thu, 13 Nov 2025 08:55:20 -0800 Subject: [PATCH 04/20] small cleanup --- .../web/rpc/v1/endpoint_delete_trace_items.py | 23 ++++++++----------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/snuba/web/rpc/v1/endpoint_delete_trace_items.py b/snuba/web/rpc/v1/endpoint_delete_trace_items.py index 9a51f0ffb09..fda65b4c4a0 100644 --- a/snuba/web/rpc/v1/endpoint_delete_trace_items.py +++ b/snuba/web/rpc/v1/endpoint_delete_trace_items.py @@ -1,13 +1,11 @@ -from typing import Any, Dict, List, Optional, Type, cast +from typing import Any, Dict, List, Optional, Sequence, Type from sentry_protos.snuba.v1.endpoint_delete_trace_items_pb2 import ( DeleteTraceItemsRequest, DeleteTraceItemsResponse, ) -from sentry_protos.snuba.v1.trace_item_filter_pb2 import ( - ComparisonFilter, - TraceItemFilter, -) +from sentry_protos.snuba.v1.request_common_pb2 import TraceItemFilterWithType +from sentry_protos.snuba.v1.trace_item_filter_pb2 import ComparisonFilter from snuba.attribution.appid import AppID from snuba.datasets.storages.factory import get_writable_storage @@ -39,7 +37,7 @@ def _extract_attribute_value(comparison_filter: ComparisonFilter) -> Any: def _trace_item_filters_to_attribute_conditions( - filters: list[TraceItemFilter], + filters: Sequence[TraceItemFilterWithType], ) -> Dict[str, list[Any]]: """ Convert TraceItemFilters to attribute_conditions for deletion. @@ -48,7 +46,7 @@ def _trace_item_filters_to_attribute_conditions( All filters are combined with AND logic. Args: - filters: List of TraceItemFilter from the request + filters: List of TraceItemFilterWithType from the request Returns: Dict mapping attribute names to lists of values @@ -58,7 +56,9 @@ def _trace_item_filters_to_attribute_conditions( """ attribute_conditions: Dict[str, list[Any]] = {} - for trace_filter in filters: + for filter_with_type in filters: + # Extract the actual filter from TraceItemFilterWithType + trace_filter = filter_with_type.filter # Only support comparison filters for deletion if not trace_filter.HasField("comparison_filter"): raise BadSnubaRPCRequestException( @@ -147,12 +147,7 @@ def _execute(self, request: DeleteTraceItemsRequest) -> DeleteTraceItemsResponse ) conditions["item_type"] = [request.meta.trace_item_type] - # Convert filters to list - filters may be TraceItemFilterWithType which has a 'filter' field - filters_list: List[TraceItemFilter] = [ - cast(TraceItemFilter, f.filter if hasattr(f, "filter") else f) - for f in request.filters - ] - attribute_conditions = _trace_item_filters_to_attribute_conditions(filters_list) + attribute_conditions = _trace_item_filters_to_attribute_conditions(request.filters) delete_result = delete_from_storage( get_writable_storage(StorageKey.EAP_ITEMS), From a61a7fca09e9f983282047b12543fd4141737a25 Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Thu, 13 Nov 2025 08:57:16 -0800 Subject: [PATCH 05/20] log when delete by attribute accepted, but does nothing --- snuba/web/bulk_delete_query.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/snuba/web/bulk_delete_query.py b/snuba/web/bulk_delete_query.py index 064363b7466..d38d24b4ff3 100644 --- a/snuba/web/bulk_delete_query.py +++ b/snuba/web/bulk_delete_query.py @@ -240,6 +240,10 @@ def delete_from_storage( # validate attribute conditions if provided if attribute_conditions: _validate_attribute_conditions(attribute_conditions, conditions, delete_settings) + logger.error( + "valid attribute_conditions passed to delete_from_storage, but they will be ignored " + "as functionality is not yet implemented" + ) attr_info = _get_attribution_info(attribution_info) return delete_from_tables(storage, delete_settings.tables, conditions, attr_info) From 0a508a4d6d1000eaef7518b9dd39caacf2614220 Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Thu, 13 Nov 2025 08:58:39 -0800 Subject: [PATCH 06/20] make delete by attribute true no-op with return --- snuba/web/bulk_delete_query.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/snuba/web/bulk_delete_query.py b/snuba/web/bulk_delete_query.py index d38d24b4ff3..8b7e5d0d9e2 100644 --- a/snuba/web/bulk_delete_query.py +++ b/snuba/web/bulk_delete_query.py @@ -244,6 +244,8 @@ def delete_from_storage( "valid attribute_conditions passed to delete_from_storage, but they will be ignored " "as functionality is not yet implemented" ) + # deleting by just conditions and ignoring attribute_conditions would be dangerous + return {} attr_info = _get_attribution_info(attribution_info) return delete_from_tables(storage, delete_settings.tables, conditions, attr_info) From 8c806eb29a4762997bd8cc3dc4af2fd27d693fe5 Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Thu, 13 Nov 2025 09:35:53 -0800 Subject: [PATCH 07/20] move item_type into AttributeConditions class --- snuba/web/bulk_delete_query.py | 60 +++++++++---------- .../web/rpc/v1/endpoint_delete_trace_items.py | 30 ++++++---- .../v1/test_endpoint_delete_trace_items.py | 14 +++-- tests/web/test_bulk_delete_query.py | 37 ++++-------- 4 files changed, 67 insertions(+), 74 deletions(-) diff --git a/snuba/web/bulk_delete_query.py b/snuba/web/bulk_delete_query.py index 8b7e5d0d9e2..89aa730d0b3 100644 --- a/snuba/web/bulk_delete_query.py +++ b/snuba/web/bulk_delete_query.py @@ -1,7 +1,17 @@ import logging import time +from dataclasses import dataclass from threading import Thread -from typing import Any, Dict, Mapping, MutableMapping, Optional, Sequence, TypedDict +from typing import ( + Any, + Dict, + List, + Mapping, + MutableMapping, + Optional, + Sequence, + TypedDict, +) import rapidjson from confluent_kafka import KafkaError @@ -40,6 +50,12 @@ logger = logging.getLogger(__name__) +@dataclass +class AttributeConditions: + item_type: int + attributes: Dict[str, List[Any]] + + class DeleteQueryMessage(TypedDict): rows_to_delete: int storage_name: str @@ -125,42 +141,20 @@ def produce_delete_query(delete_query: DeleteQueryMessage) -> None: def _validate_attribute_conditions( - attribute_conditions: Dict[str, list[Any]], - conditions: Dict[str, list[Any]], + attribute_conditions: AttributeConditions, delete_settings: DeletionSettings, ) -> None: """ - Validates that the attribute_conditions are allowed for the item_type specified in conditions. + Validates that the attribute_conditions are allowed for the configured item_type. Args: - attribute_conditions: Dict mapping attribute names to their values - conditions: Dict mapping column names to their values (must include 'item_type') + attribute_conditions: AttributeConditions containing item_type and attribute mappings delete_settings: The deletion settings for the storage Raises: - InvalidQueryException: If item_type is not in conditions, if no attributes are configured - for the item_type, or if any requested attributes are not allowed + InvalidQueryException: If no attributes are configured for the item_type, + or if any requested attributes are not allowed """ - if not attribute_conditions: - return - - # Ensure item_type is specified in conditions - if "item_type" not in conditions: - raise InvalidQueryException( - "item_type must be specified in conditions when using attribute_conditions" - ) - - # Get the item_type value(s) - item_type_values = conditions["item_type"] - if not item_type_values: - raise InvalidQueryException("item_type cannot be empty when using attribute_conditions") - - # For now, we only support a single item_type value when using attribute_conditions - if len(item_type_values) > 1: - raise InvalidQueryException("attribute_conditions only supports a single item_type value") - - item_type = item_type_values[0] - # Get the string name for the item_type from the configuration # The configuration uses string names (e.g., "occurrence") as keys allowed_attrs_config = delete_settings.allowed_attributes_by_item_type @@ -182,11 +176,11 @@ def _validate_attribute_conditions( if matching_allowed_attrs is None: raise InvalidQueryException( - f"No attribute-based deletions configured for item_type {item_type}" + f"No attribute-based deletions configured for item_type {attribute_conditions.item_type}" ) # Validate that all requested attributes are allowed - requested_attrs = set(attribute_conditions.keys()) + requested_attrs = set(attribute_conditions.attributes.keys()) allowed_attrs_set = set(matching_allowed_attrs) invalid_attrs = requested_attrs - allowed_attrs_set @@ -202,7 +196,7 @@ def delete_from_storage( storage: WritableTableStorage, conditions: Dict[str, list[Any]], attribution_info: Mapping[str, Any], - attribute_conditions: Optional[Dict[str, list[Any]]] = None, + attribute_conditions: Optional[AttributeConditions] = None, ) -> dict[str, Result]: """ This method does a series of validation checks (outline below), @@ -239,9 +233,9 @@ def delete_from_storage( # validate attribute conditions if provided if attribute_conditions: - _validate_attribute_conditions(attribute_conditions, conditions, delete_settings) + _validate_attribute_conditions(attribute_conditions, delete_settings) logger.error( - "valid attribute_conditions passed to delete_from_storage, but they will be ignored " + "valid attribute_conditions passed to delete_from_storage, but delete will be ignored " "as functionality is not yet implemented" ) # deleting by just conditions and ignoring attribute_conditions would be dangerous diff --git a/snuba/web/rpc/v1/endpoint_delete_trace_items.py b/snuba/web/rpc/v1/endpoint_delete_trace_items.py index fda65b4c4a0..a03b30e1fa5 100644 --- a/snuba/web/rpc/v1/endpoint_delete_trace_items.py +++ b/snuba/web/rpc/v1/endpoint_delete_trace_items.py @@ -10,7 +10,7 @@ from snuba.attribution.appid import AppID from snuba.datasets.storages.factory import get_writable_storage from snuba.datasets.storages.storage_key import StorageKey -from snuba.web.bulk_delete_query import delete_from_storage +from snuba.web.bulk_delete_query import AttributeConditions, delete_from_storage from snuba.web.rpc import RPCEndpoint from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException @@ -37,24 +37,26 @@ def _extract_attribute_value(comparison_filter: ComparisonFilter) -> Any: def _trace_item_filters_to_attribute_conditions( + item_type: int, filters: Sequence[TraceItemFilterWithType], -) -> Dict[str, list[Any]]: +) -> AttributeConditions: """ - Convert TraceItemFilters to attribute_conditions for deletion. + Convert TraceItemFilters to AttributeConditions for deletion. Only supports ComparisonFilter with OP_EQUALS or OP_IN operations. All filters are combined with AND logic. Args: + item_type: The trace item type (e.g., occurrence, span) filters: List of TraceItemFilterWithType from the request Returns: - Dict mapping attribute names to lists of values + AttributeConditions object containing item_type and attribute mappings Raises: BadSnubaRPCRequestException: If unsupported filter types or operations are encountered """ - attribute_conditions: Dict[str, list[Any]] = {} + attributes: Dict[str, List[Any]] = {} for filter_with_type in filters: # Extract the actual filter from TraceItemFilterWithType @@ -84,12 +86,12 @@ def _trace_item_filters_to_attribute_conditions( value = [value] # If the attribute already exists, extend the list (OR logic within same attribute) - if attribute_name in attribute_conditions: - attribute_conditions[attribute_name].extend(value) + if attribute_name in attributes: + attributes[attribute_name].extend(value) else: - attribute_conditions[attribute_name] = value + attributes[attribute_name] = value - return attribute_conditions + return AttributeConditions(item_type=item_type, attributes=attributes) class EndpointDeleteTraceItems(RPCEndpoint[DeleteTraceItemsRequest, DeleteTraceItemsResponse]): @@ -133,7 +135,7 @@ def _execute(self, request: DeleteTraceItemsRequest) -> DeleteTraceItemsResponse "project_id": list(request.meta.project_ids), } - attribute_conditions: Optional[Dict[str, List[Any]]] = None + attribute_conditions: Optional[AttributeConditions] = None if has_trace_ids: # Delete by trace_ids (no attribute filtering) @@ -146,8 +148,12 @@ def _execute(self, request: DeleteTraceItemsRequest) -> DeleteTraceItemsResponse "trace_item_type must be specified in metadata when using filters" ) - conditions["item_type"] = [request.meta.trace_item_type] - attribute_conditions = _trace_item_filters_to_attribute_conditions(request.filters) + attribute_conditions = _trace_item_filters_to_attribute_conditions( + request.meta.trace_item_type, + request.filters, + ) + # Add item_type to conditions for the delete query + conditions["item_type"] = [attribute_conditions.item_type] delete_result = delete_from_storage( get_writable_storage(StorageKey.EAP_ITEMS), diff --git a/tests/web/rpc/v1/test_endpoint_delete_trace_items.py b/tests/web/rpc/v1/test_endpoint_delete_trace_items.py index 2e858ae92ff..b2660c00045 100644 --- a/tests/web/rpc/v1/test_endpoint_delete_trace_items.py +++ b/tests/web/rpc/v1/test_endpoint_delete_trace_items.py @@ -16,7 +16,11 @@ TraceItemFilterWithType, TraceItemType, ) -from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, AttributeValue +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ( + AttributeKey, + AttributeValue, + IntArray, +) from sentry_protos.snuba.v1.trace_item_filter_pb2 import ( ComparisonFilter, TraceItemFilter, @@ -147,8 +151,11 @@ def test_valid_trace_id_produces_bulk_delete_message( assert called_args["conditions"]["trace_id"] == [_TRACE_ID] assert called_args["rows_to_delete"] == _SPAN_COUNT + # This should not yet produce a message to bulk_delete topic because + # we haven't wired that behavior through yet (it should not error out, + # though). + @pytest.mark.xfail def test_filters_with_equals_operation_accepted(self) -> None: - """Test that filters with OP_EQUALS are properly converted to attribute_conditions""" ts = Timestamp() ts.GetCurrentTime() @@ -188,8 +195,7 @@ def test_filters_with_in_operation_accepted(self) -> None: ts = Timestamp() ts.GetCurrentTime() - int_array = AttributeValue.IntArray() # type: ignore - int_array.values.extend([12345, 67890]) + int_array = IntArray(values=[12345, 67890]) message = DeleteTraceItemsRequest( meta=RequestMeta( diff --git a/tests/web/test_bulk_delete_query.py b/tests/web/test_bulk_delete_query.py index defa87bffc1..029ef97ceef 100644 --- a/tests/web/test_bulk_delete_query.py +++ b/tests/web/test_bulk_delete_query.py @@ -17,7 +17,7 @@ from snuba.utils.manage_topics import create_topics from snuba.utils.streams.configuration_builder import get_default_kafka_configuration from snuba.utils.streams.topics import Topic -from snuba.web.bulk_delete_query import delete_from_storage +from snuba.web.bulk_delete_query import AttributeConditions, delete_from_storage from snuba.web.delete_query import DeletesNotEnabledError CONSUMER_CONFIG = { @@ -141,7 +141,7 @@ def test_attribute_conditions_valid() -> None: """Test that valid attribute_conditions are accepted for eap_items storage""" storage = get_writable_storage(StorageKey("eap_items")) conditions = {"project_id": [1], "item_type": [1]} - attribute_conditions = {"group_id": [12345]} + attribute_conditions = AttributeConditions(item_type=1, attributes={"group_id": [12345]}) attr_info = get_attribution_info() # Mock out _enforce_max_rows to avoid needing actual data @@ -156,7 +156,7 @@ def test_attribute_conditions_invalid_attribute() -> None: """Test that invalid attribute names in attribute_conditions are rejected""" storage = get_writable_storage(StorageKey("eap_items")) conditions = {"project_id": [1], "item_type": [1]} - attribute_conditions = {"invalid_attr": [12345]} + attribute_conditions = AttributeConditions(item_type=1, attributes={"invalid_attr": [12345]}) attr_info = get_attribution_info() with pytest.raises(InvalidQueryException, match="Invalid attributes for deletion"): @@ -168,36 +168,23 @@ def test_attribute_conditions_missing_item_type() -> None: """Test that attribute_conditions requires item_type in conditions""" storage = get_writable_storage(StorageKey("eap_items")) conditions = {"project_id": [1]} - attribute_conditions = {"group_id": [12345]} + attribute_conditions = AttributeConditions(item_type=1, attributes={"group_id": [12345]}) attr_info = get_attribution_info() - with pytest.raises( - InvalidQueryException, - match="item_type must be specified in conditions when using attribute_conditions", - ): - delete_from_storage(storage, conditions, attr_info, attribute_conditions) - - -@pytest.mark.redis_db -def test_attribute_conditions_multiple_item_types() -> None: - """Test that attribute_conditions doesn't support multiple item_type values""" - storage = get_writable_storage(StorageKey("eap_items")) - conditions = {"project_id": [1], "item_type": [1, 2]} - attribute_conditions = {"group_id": [12345]} - attr_info = get_attribution_info() - - with pytest.raises( - InvalidQueryException, match="attribute_conditions only supports a single item_type value" - ): - delete_from_storage(storage, conditions, attr_info, attribute_conditions) + # Since item_type is now in AttributeConditions, we need to test a different scenario + # The validation now should pass, but we need to ensure item_type is also in conditions + with patch("snuba.web.bulk_delete_query._enforce_max_rows", return_value=10): + with patch("snuba.web.bulk_delete_query.produce_delete_query"): + # This should now succeed since we're no longer checking conditions dict + delete_from_storage(storage, conditions, attr_info, attribute_conditions) @pytest.mark.redis_db def test_attribute_conditions_storage_not_configured() -> None: """Test that storages without attribute deletion config reject attribute_conditions""" storage = get_writable_storage(StorageKey("search_issues")) - conditions = {"project_id": [1], "item_type": [1]} - attribute_conditions = {"some_attr": [12345]} + conditions = {"project_id": [1], "group_id": [1]} # Valid columns for search_issues + attribute_conditions = AttributeConditions(item_type=1, attributes={"some_attr": [12345]}) attr_info = get_attribution_info() with pytest.raises( From a77c7a039df133cb21fd79e88238bbf22dc30692 Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Thu, 13 Nov 2025 09:39:05 -0800 Subject: [PATCH 08/20] make smoke test very marginally less dumb --- tests/web/rpc/v1/test_endpoint_delete_trace_items.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/web/rpc/v1/test_endpoint_delete_trace_items.py b/tests/web/rpc/v1/test_endpoint_delete_trace_items.py index b2660c00045..437775a9870 100644 --- a/tests/web/rpc/v1/test_endpoint_delete_trace_items.py +++ b/tests/web/rpc/v1/test_endpoint_delete_trace_items.py @@ -223,7 +223,9 @@ def test_filters_with_in_operation_accepted(self) -> None: with patch("snuba.web.bulk_delete_query._enforce_max_rows", return_value=10): with patch("snuba.web.bulk_delete_query.produce_delete_query"): - EndpointDeleteTraceItems().execute(message) + assert isinstance( + EndpointDeleteTraceItems().execute(message), DeleteTraceItemsResponse + ) def test_filters_with_unsupported_operation_rejected(self) -> None: """Test that filters with operations other than OP_EQUALS/OP_IN are rejected""" From c355a18a85ea12c8d0389fa08681e7f16cc5dbc4 Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Thu, 13 Nov 2025 15:22:49 -0800 Subject: [PATCH 09/20] [wip] wire through attribute_conditions in DeleteQueryMessage --- snuba/web/bulk_delete_query.py | 15 +++++++++++++-- tests/web/test_bulk_delete_query.py | 12 +++++++++--- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/snuba/web/bulk_delete_query.py b/snuba/web/bulk_delete_query.py index 89aa730d0b3..20cb2782109 100644 --- a/snuba/web/bulk_delete_query.py +++ b/snuba/web/bulk_delete_query.py @@ -56,11 +56,13 @@ class AttributeConditions: attributes: Dict[str, List[Any]] -class DeleteQueryMessage(TypedDict): +class DeleteQueryMessage(TypedDict, total=False): rows_to_delete: int storage_name: str conditions: ConditionsType tenant_ids: Mapping[str, str | int] + attribute_conditions: Optional[Dict[str, List[Any]]] + attribute_conditions_item_type: Optional[int] PRODUCER_MAP: MutableMapping[str, Producer] = {} @@ -242,7 +244,9 @@ def delete_from_storage( return {} attr_info = _get_attribution_info(attribution_info) - return delete_from_tables(storage, delete_settings.tables, conditions, attr_info) + return delete_from_tables( + storage, delete_settings.tables, conditions, attr_info, attribute_conditions + ) def construct_query(storage: WritableTableStorage, table: str, condition: Expression) -> Query: @@ -266,6 +270,7 @@ def delete_from_tables( tables: Sequence[str], conditions: Dict[str, Any], attribution_info: AttributionInfo, + attribute_conditions: Optional[AttributeConditions] = None, ) -> dict[str, Result]: highest_rows_to_delete = 0 @@ -293,6 +298,12 @@ def delete_from_tables( "conditions": conditions, "tenant_ids": attribution_info.tenant_ids, } + + # Add attribute_conditions to the message if present + if attribute_conditions: + delete_query["attribute_conditions"] = attribute_conditions.attributes + delete_query["attribute_conditions_item_type"] = attribute_conditions.item_type + produce_delete_query(delete_query) return result diff --git a/tests/web/test_bulk_delete_query.py b/tests/web/test_bulk_delete_query.py index 029ef97ceef..361b83345b9 100644 --- a/tests/web/test_bulk_delete_query.py +++ b/tests/web/test_bulk_delete_query.py @@ -146,9 +146,15 @@ def test_attribute_conditions_valid() -> None: # Mock out _enforce_max_rows to avoid needing actual data with patch("snuba.web.bulk_delete_query._enforce_max_rows", return_value=10): - with patch("snuba.web.bulk_delete_query.produce_delete_query"): - # Should not raise an exception - delete_from_storage(storage, conditions, attr_info, attribute_conditions) + with patch("snuba.web.bulk_delete_query.produce_delete_query") as mock_produce: + # Should not raise an exception, but should return empty dict since + # functionality is not yet implemented + result = delete_from_storage(storage, conditions, attr_info, attribute_conditions) + + # Should return empty because we haven't implemented the functionality yet + assert result == {} + # Should not have produced a message since we return early + assert mock_produce.call_count == 0 @pytest.mark.redis_db From 7afb17c56ad0c9ba7358791f3e586afa8eb5461f Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Fri, 14 Nov 2025 07:36:14 -0800 Subject: [PATCH 10/20] [wip] resolve attribute_conditions to columns --- snuba/lw_deletions/formatters.py | 44 ++++++++++++-- tests/lw_deletions/test_formatters.py | 86 +++++++++++++++++++++++++-- 2 files changed, 121 insertions(+), 9 deletions(-) diff --git a/snuba/lw_deletions/formatters.py b/snuba/lw_deletions/formatters.py index d0ce6fe1700..a14e514002e 100644 --- a/snuba/lw_deletions/formatters.py +++ b/snuba/lw_deletions/formatters.py @@ -1,8 +1,9 @@ from abc import ABC, abstractmethod from collections import defaultdict -from typing import Mapping, MutableMapping, Sequence, Type +from typing import List, Mapping, MutableMapping, Sequence, Type from snuba.datasets.storages.storage_key import StorageKey +from snuba.utils.hashes import fnv_1a from snuba.web.bulk_delete_query import DeleteQueryMessage from snuba.web.delete_query import ConditionsType @@ -54,14 +55,49 @@ def format(self, messages: Sequence[DeleteQueryMessage]) -> Sequence[ConditionsT ] -class IdentityFormatter(Formatter): +class EAPItemsFormatter(Formatter): + # Number of attribute buckets used in eap_items storage + # TODO: find a way to wire this in from a canonical source + NUM_ATTRIBUTE_BUCKETS = 40 + def format(self, messages: Sequence[DeleteQueryMessage]) -> Sequence[ConditionsType]: - return [msg["conditions"] for msg in messages] + """ + For eap_items storage, we need to resolve attribute_conditions to their + bucketed column names. Attributes are stored in hash-bucketed map columns + like attributes_string_0, attributes_string_1, etc. + + For example, if attribute_conditions has {"group_id": [123]}, we need to: + 1. Determine which bucket "group_id" belongs to + 2. Add it to the conditions as attributes_string_{bucket_idx}['group_id'] = [123] + """ + formatted_conditions: List[ConditionsType] = [] + + for message in messages: + conditions = dict(message["conditions"]) + + # Process attribute_conditions if present + if "attribute_conditions" in message and message["attribute_conditions"]: + attribute_conditions = message["attribute_conditions"] + + # For each attribute, determine its bucket and add to conditions + for attr_name, attr_values in attribute_conditions.items(): + # Hash the attribute name to determine which bucket it belongs to + bucket_idx = fnv_1a(attr_name.encode("utf-8")) % self.NUM_ATTRIBUTE_BUCKETS + + # Create the bucketed column name with the attribute key + # Format: "attributes_string_{bucket_idx}['{attr_name}']" + bucketed_column = f"attributes_string_{bucket_idx}['{attr_name}']" + + conditions[bucketed_column] = attr_values + + formatted_conditions.append(conditions) + + return formatted_conditions STORAGE_FORMATTER: Mapping[str, Type[Formatter]] = { StorageKey.SEARCH_ISSUES.value: SearchIssuesFormatter, # TODO: We will probably do something more sophisticated here in the future # but it won't make much of a difference until we support delete by attribute - StorageKey.EAP_ITEMS.value: IdentityFormatter, + StorageKey.EAP_ITEMS.value: EAPItemsFormatter, } diff --git a/tests/lw_deletions/test_formatters.py b/tests/lw_deletions/test_formatters.py index 47acd89b43e..cdfedc683f7 100644 --- a/tests/lw_deletions/test_formatters.py +++ b/tests/lw_deletions/test_formatters.py @@ -1,23 +1,32 @@ -from typing import Sequence, Type +from typing import Any, Sequence, Type import pytest from snuba.lw_deletions.formatters import ( + EAPItemsFormatter, Formatter, - IdentityFormatter, SearchIssuesFormatter, ) +from snuba.utils.hashes import fnv_1a from snuba.web.bulk_delete_query import DeleteQueryMessage from snuba.web.delete_query import ConditionsType -def create_delete_query_message(conditions: ConditionsType) -> DeleteQueryMessage: - return DeleteQueryMessage( +def create_delete_query_message( + conditions: ConditionsType, + attribute_conditions: dict[str, list[Any]] | None = None, + attribute_conditions_item_type: int | None = None, +) -> DeleteQueryMessage: + msg = DeleteQueryMessage( rows_to_delete=1, tenant_ids={}, conditions=conditions, storage_name="search_issues", ) + if attribute_conditions is not None and attribute_conditions_item_type is not None: + msg["attribute_conditions"] = attribute_conditions + msg["attribute_conditions_item_type"] = attribute_conditions_item_type + return msg SEARCH_ISSUES_FORMATTER = SearchIssuesFormatter @@ -83,7 +92,7 @@ def test_search_issues_formatter( {"project_id": [1], "trace_id": [1, 2, 3]}, {"project_id": [1], "trace_id": [4, 5, 6]}, ], - IdentityFormatter, + EAPItemsFormatter, id="identity does basically nothing", ), ], @@ -95,3 +104,70 @@ def test_identity_formatter( ) -> None: formatted = formatter().format(messages) assert formatted == expected_formatted + + +def test_eap_items_formatter_with_attribute_conditions() -> None: + """Test that EAPItemsFormatter correctly resolves attribute_conditions to bucketed columns""" + # Create a message with attribute_conditions + messages = [ + create_delete_query_message( + conditions={"project_id": [1], "item_type": [1]}, + attribute_conditions={"group_id": [12345, 67890]}, + attribute_conditions_item_type=1, + ) + ] + + formatter = EAPItemsFormatter() + formatted = formatter.format(messages) + + # Calculate the expected bucket for "group_id" + expected_bucket = fnv_1a("group_id".encode("utf-8")) % 40 + expected_column = f"attributes_string_{expected_bucket}['group_id']" + + assert len(formatted) == 1 + assert formatted[0]["project_id"] == [1] + assert formatted[0]["item_type"] == [1] + assert formatted[0][expected_column] == [12345, 67890] + + +def test_eap_items_formatter_multiple_attributes() -> None: + """Test that EAPItemsFormatter handles multiple attributes correctly""" + messages = [ + create_delete_query_message( + conditions={"project_id": [1], "item_type": [1]}, + attribute_conditions={ + "group_id": [12345], + "transaction": ["test_transaction"], + }, + attribute_conditions_item_type=1, + ) + ] + + formatter = EAPItemsFormatter() + formatted = formatter.format(messages) + + # Calculate expected buckets + group_id_bucket = fnv_1a("group_id".encode("utf-8")) % 40 + transaction_bucket = fnv_1a("transaction".encode("utf-8")) % 40 + + expected_group_id_column = f"attributes_string_{group_id_bucket}['group_id']" + expected_transaction_column = f"attributes_string_{transaction_bucket}['transaction']" + + assert len(formatted) == 1 + assert formatted[0][expected_group_id_column] == [12345] + assert formatted[0][expected_transaction_column] == ["test_transaction"] + + +def test_eap_items_formatter_without_attribute_conditions() -> None: + """Test that EAPItemsFormatter works without attribute_conditions (backwards compatibility)""" + messages = [ + create_delete_query_message( + conditions={"project_id": [1], "trace_id": ["abc123"]}, + ) + ] + + formatter = EAPItemsFormatter() + formatted = formatter.format(messages) + + assert len(formatted) == 1 + assert formatted[0] == {"project_id": [1], "trace_id": ["abc123"]} From 771193d5f5aaf78968ba6102fe087ce4266f9b77 Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Fri, 14 Nov 2025 07:37:31 -0800 Subject: [PATCH 11/20] remove whitespace --- snuba/web/bulk_delete_query.py | 1 - 1 file changed, 1 deletion(-) diff --git a/snuba/web/bulk_delete_query.py b/snuba/web/bulk_delete_query.py index 20cb2782109..3feccba6c9f 100644 --- a/snuba/web/bulk_delete_query.py +++ b/snuba/web/bulk_delete_query.py @@ -272,7 +272,6 @@ def delete_from_tables( attribution_info: AttributionInfo, attribute_conditions: Optional[AttributeConditions] = None, ) -> dict[str, Result]: - highest_rows_to_delete = 0 result: dict[str, Result] = {} for table in tables: From 94004cd9a9376f5066f48977f32e604568777b43 Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Fri, 14 Nov 2025 07:50:33 -0800 Subject: [PATCH 12/20] allow attribute_conditions deletes to proceed, if feature flag is enabled --- snuba/web/bulk_delete_query.py | 16 ++++++++------ tests/web/test_bulk_delete_query.py | 33 +++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 7 deletions(-) diff --git a/snuba/web/bulk_delete_query.py b/snuba/web/bulk_delete_query.py index 3feccba6c9f..80ba3d8ad3f 100644 --- a/snuba/web/bulk_delete_query.py +++ b/snuba/web/bulk_delete_query.py @@ -31,7 +31,7 @@ from snuba.query.exceptions import InvalidQueryException, NoRowsToDeleteException from snuba.query.expressions import Expression from snuba.reader import Result -from snuba.state import get_str_config +from snuba.state import get_int_config, get_str_config from snuba.utils.metrics.util import with_span from snuba.utils.metrics.wrapper import MetricsWrapper from snuba.utils.schemas import ColumnValidator, InvalidColumnType @@ -236,12 +236,14 @@ def delete_from_storage( # validate attribute conditions if provided if attribute_conditions: _validate_attribute_conditions(attribute_conditions, delete_settings) - logger.error( - "valid attribute_conditions passed to delete_from_storage, but delete will be ignored " - "as functionality is not yet implemented" - ) - # deleting by just conditions and ignoring attribute_conditions would be dangerous - return {} + + if not get_int_config("permit_delete_by_attribute", default=0): + logger.error( + "valid attribute_conditions passed to delete_from_storage, but delete will be ignored " + "as functionality is not yet launched (permit_delete_by_attribute=0)" + ) + # deleting by just conditions and ignoring attribute_conditions would be dangerous + return {} attr_info = _get_attribution_info(attribution_info) return delete_from_tables( diff --git a/tests/web/test_bulk_delete_query.py b/tests/web/test_bulk_delete_query.py index 361b83345b9..b093801c8b8 100644 --- a/tests/web/test_bulk_delete_query.py +++ b/tests/web/test_bulk_delete_query.py @@ -197,3 +197,36 @@ def test_attribute_conditions_storage_not_configured() -> None: InvalidQueryException, match="No attribute-based deletions configured for this storage" ): delete_from_storage(storage, conditions, attr_info, attribute_conditions) + + +@pytest.mark.redis_db +def test_attribute_conditions_feature_flag_enabled() -> None: + """Test that attribute_conditions are processed when feature flag is enabled""" + storage = get_writable_storage(StorageKey("eap_items")) + conditions = {"project_id": [1], "item_type": [1]} + attribute_conditions = AttributeConditions(item_type=1, attributes={"group_id": [12345]}) + attr_info = get_attribution_info() + + # Enable the feature flag + set_config("is_attribute_delete_launched", 1) + + try: + # Mock out _enforce_max_rows to avoid needing actual data + with patch("snuba.web.bulk_delete_query._enforce_max_rows", return_value=10): + with patch("snuba.web.bulk_delete_query.produce_delete_query") as mock_produce: + # Should process normally and produce a message + result = delete_from_storage(storage, conditions, attr_info, attribute_conditions) + + # Should have produced a message + assert mock_produce.call_count == 1 + # Should return success results + assert result != {} + + # Verify the message includes attribute_conditions + call_args = mock_produce.call_args[0][0] + assert "attribute_conditions" in call_args + assert call_args["attribute_conditions"] == {"group_id": [12345]} + assert call_args["attribute_conditions_item_type"] == 1 + finally: + # Clean up: disable the feature flag + set_config("is_attribute_delete_launched", 0) From 5d26fa0bf86f284c332c749e33316071e93d8a0a Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Fri, 14 Nov 2025 08:22:48 -0800 Subject: [PATCH 13/20] fix rename --- snuba/web/bulk_delete_query.py | 1 - tests/web/test_bulk_delete_query.py | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/snuba/web/bulk_delete_query.py b/snuba/web/bulk_delete_query.py index 80ba3d8ad3f..33a5ff3a0fc 100644 --- a/snuba/web/bulk_delete_query.py +++ b/snuba/web/bulk_delete_query.py @@ -242,7 +242,6 @@ def delete_from_storage( "valid attribute_conditions passed to delete_from_storage, but delete will be ignored " "as functionality is not yet launched (permit_delete_by_attribute=0)" ) - # deleting by just conditions and ignoring attribute_conditions would be dangerous return {} attr_info = _get_attribution_info(attribution_info) diff --git a/tests/web/test_bulk_delete_query.py b/tests/web/test_bulk_delete_query.py index b093801c8b8..c2cbebf26f4 100644 --- a/tests/web/test_bulk_delete_query.py +++ b/tests/web/test_bulk_delete_query.py @@ -208,7 +208,7 @@ def test_attribute_conditions_feature_flag_enabled() -> None: attr_info = get_attribution_info() # Enable the feature flag - set_config("is_attribute_delete_launched", 1) + set_config("permit_delete_by_attribute", 1) try: # Mock out _enforce_max_rows to avoid needing actual data @@ -229,4 +229,4 @@ def test_attribute_conditions_feature_flag_enabled() -> None: assert call_args["attribute_conditions_item_type"] == 1 finally: # Clean up: disable the feature flag - set_config("is_attribute_delete_launched", 0) + set_config("permit_delete_by_attribute", 0) From bfe41eebd74c5f81982cf3517e418f0cfe45e45f Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Mon, 17 Nov 2025 09:44:03 -0800 Subject: [PATCH 14/20] [wip] functioning delete by attribute --- snuba/lw_deletions/formatters.py | 43 ++++++++++++++----- snuba/web/delete_query.py | 41 ++++++++++++++++-- tests/lw_deletions/test_formatters.py | 60 ++++++++++++++++++++++----- tests/web/test_delete_query.py | 60 ++++++++++++++++++++++++++- 4 files changed, 179 insertions(+), 25 deletions(-) diff --git a/snuba/lw_deletions/formatters.py b/snuba/lw_deletions/formatters.py index a14e514002e..66602c20950 100644 --- a/snuba/lw_deletions/formatters.py +++ b/snuba/lw_deletions/formatters.py @@ -63,12 +63,13 @@ class EAPItemsFormatter(Formatter): def format(self, messages: Sequence[DeleteQueryMessage]) -> Sequence[ConditionsType]: """ For eap_items storage, we need to resolve attribute_conditions to their - bucketed column names. Attributes are stored in hash-bucketed map columns - like attributes_string_0, attributes_string_1, etc. + appropriate column names based on type: + - int/bool: Single columns (no bucketing) like attributes_int['key'] or attributes_bool['key'] + - string/float: Hash-bucketed columns like attributes_string_0['key'] or attributes_float_23['key'] For example, if attribute_conditions has {"group_id": [123]}, we need to: - 1. Determine which bucket "group_id" belongs to - 2. Add it to the conditions as attributes_string_{bucket_idx}['group_id'] = [123] + 1. Determine the type based on the values (int in this case) + 2. Since int doesn't use bucketing, add it as attributes_int['group_id'] = [123] """ formatted_conditions: List[ConditionsType] = [] @@ -79,14 +80,34 @@ def format(self, messages: Sequence[DeleteQueryMessage]) -> Sequence[ConditionsT if "attribute_conditions" in message and message["attribute_conditions"]: attribute_conditions = message["attribute_conditions"] - # For each attribute, determine its bucket and add to conditions + # For each attribute, determine its type and bucket (if applicable) for attr_name, attr_values in attribute_conditions.items(): - # Hash the attribute name to determine which bucket it belongs to - bucket_idx = fnv_1a(attr_name.encode("utf-8")) % self.NUM_ATTRIBUTE_BUCKETS - - # Create the bucketed column name with the attribute key - # Format: "attributes_string_{bucket_idx}['{attr_name}']" - bucketed_column = f"attributes_string_{bucket_idx}['{attr_name}']" + if not attr_values: + continue + + # Determine the attribute type from the first value + # All values in the list should be of the same type + first_value = attr_values[0] + if isinstance(first_value, bool): + # Check bool before int since bool is a subclass of int in Python + attr_type = "bool" + elif isinstance(first_value, int): + attr_type = "int" + elif isinstance(first_value, float): + attr_type = "float" + else: + # Default to string for str and any other type + attr_type = "string" + + # Only string and float attributes use bucketing + # int and bool attributes are stored in single columns + if attr_type in ("int", "bool"): + # No bucketing for int and bool + bucketed_column = f"attributes_{attr_type}['{attr_name}']" + else: + # Bucketing for string and float + bucket_idx = fnv_1a(attr_name.encode("utf-8")) % self.NUM_ATTRIBUTE_BUCKETS + bucketed_column = f"attributes_{attr_type}_{bucket_idx}['{attr_name}']" conditions[bucketed_column] = attr_values diff --git a/snuba/web/delete_query.py b/snuba/web/delete_query.py index 4679043930d..58374335467 100644 --- a/snuba/web/delete_query.py +++ b/snuba/web/delete_query.py @@ -27,7 +27,13 @@ NoRowsToDeleteException, TooManyDeleteRowsException, ) -from snuba.query.expressions import Expression, FunctionCall +from snuba.query.expressions import ( + Column, + Expression, + FunctionCall, + Literal, + SubscriptableReference, +) from snuba.query.query_settings import HTTPQuerySettings from snuba.reader import Result from snuba.state import get_config, get_int_config @@ -354,12 +360,41 @@ def _execute_query( def _construct_condition(columns: ConditionsType) -> Expression: and_conditions = [] for col, values in columns.items(): + # Check if this is a map access pattern like "attributes_string_0['group_id']" + col_expr = _parse_column_expression(col) + if len(values) == 1: - exp = equals(column(col), literal(values[0])) + exp = equals(col_expr, literal(values[0])) else: literal_values = [literal(v) for v in values] - exp = in_cond(column(col), literals_tuple(alias=None, literals=literal_values)) + exp = in_cond(col_expr, literals_tuple(alias=None, literals=literal_values)) and_conditions.append(exp) return combine_and_conditions(and_conditions) + + +def _parse_column_expression(col_name: str) -> Expression: + """ + Parse a column name that might include map access notation. + + Examples: + "project_id" -> Column("project_id") + "attributes_string_0['group_id']" -> SubscriptableReference(Column("attributes_string_0"), Literal("group_id")) + """ + import re + + # Pattern to match "column_name['key']" or 'column_name["key"]' + match = re.match(r"^([a-zA-Z_][a-zA-Z0-9_]*)\[(['\"])(.+?)\2\]$", col_name) + + if match: + base_column_name = match.group(1) + key_name = match.group(3) + return SubscriptableReference( + alias=None, + column=Column(alias=None, table_name=None, column_name=base_column_name), + key=Literal(alias=None, value=key_name), + ) + else: + # Regular column + return column(col_name) diff --git a/tests/lw_deletions/test_formatters.py b/tests/lw_deletions/test_formatters.py index cdfedc683f7..34d7270637f 100644 --- a/tests/lw_deletions/test_formatters.py +++ b/tests/lw_deletions/test_formatters.py @@ -108,7 +108,7 @@ def test_identity_formatter( def test_eap_items_formatter_with_attribute_conditions() -> None: """Test that EAPItemsFormatter correctly resolves attribute_conditions to bucketed columns""" - # Create a message with attribute_conditions + # Create a message with attribute_conditions (integer values) messages = [ create_delete_query_message( conditions={"project_id": [1], "item_type": [1]}, @@ -120,9 +120,8 @@ def test_eap_items_formatter_with_attribute_conditions() -> None: formatter = EAPItemsFormatter() formatted = formatter.format(messages) - # Calculate the expected bucket for "group_id" - expected_bucket = fnv_1a("group_id".encode("utf-8")) % 40 - expected_column = f"attributes_string_{expected_bucket}['group_id']" + # Since values are integers, should use attributes_int (no bucketing) + expected_column = "attributes_int['group_id']" assert len(formatted) == 1 assert formatted[0]["project_id"] == [1] @@ -131,13 +130,13 @@ def test_eap_items_formatter_with_attribute_conditions() -> None: def test_eap_items_formatter_multiple_attributes() -> None: - """Test that EAPItemsFormatter handles multiple attributes correctly""" + """Test that EAPItemsFormatter handles multiple attributes with different types correctly""" messages = [ create_delete_query_message( conditions={"project_id": [1], "item_type": [1]}, attribute_conditions={ - "group_id": [12345], - "transaction": ["test_transaction"], + "group_id": [12345], # int (no bucketing) + "transaction": ["test_transaction"], # string (bucketing) }, attribute_conditions_item_type=1, ) @@ -146,11 +145,11 @@ def test_eap_items_formatter_multiple_attributes() -> None: formatter = EAPItemsFormatter() formatted = formatter.format(messages) - # Calculate expected buckets - group_id_bucket = fnv_1a("group_id".encode("utf-8")) % 40 + # Calculate expected bucket for transaction (string uses bucketing) transaction_bucket = fnv_1a("transaction".encode("utf-8")) % 40 - expected_group_id_column = f"attributes_string_{group_id_bucket}['group_id']" + # group_id is int (no bucketing), transaction is string (with bucketing) + expected_group_id_column = "attributes_int['group_id']" expected_transaction_column = f"attributes_string_{transaction_bucket}['transaction']" assert len(formatted) == 1 @@ -171,3 +170,44 @@ def test_eap_items_formatter_without_attribute_conditions() -> None: assert len(formatted) == 1 assert formatted[0] == {"project_id": [1], "trace_id": ["abc123"]} + + +def test_eap_items_formatter_with_float_attributes() -> None: + """Test that EAPItemsFormatter handles float attribute values correctly""" + messages = [ + create_delete_query_message( + conditions={"project_id": [1], "item_type": [1]}, + attribute_conditions={"duration": [123.45, 678.90]}, + attribute_conditions_item_type=1, + ) + ] + + formatter = EAPItemsFormatter() + formatted = formatter.format(messages) + + # Calculate the expected bucket for "duration" + expected_bucket = fnv_1a("duration".encode("utf-8")) % 40 + expected_column = f"attributes_float_{expected_bucket}['duration']" + + assert len(formatted) == 1 + assert formatted[0][expected_column] == [123.45, 678.90] + + +def test_eap_items_formatter_with_bool_attributes() -> None: + """Test that EAPItemsFormatter handles bool attribute values correctly""" + messages = [ + create_delete_query_message( + conditions={"project_id": [1], "item_type": [1]}, + attribute_conditions={"is_error": [True, False]}, + attribute_conditions_item_type=1, + ) + ] + + formatter = EAPItemsFormatter() + formatted = formatter.format(messages) + + # Bool attributes don't use bucketing + expected_column = "attributes_bool['is_error']" + + assert len(formatted) == 1 + assert formatted[0][expected_column] == [True, False] diff --git a/tests/web/test_delete_query.py b/tests/web/test_delete_query.py index ba6d10a6a9c..269d7e89e78 100644 --- a/tests/web/test_delete_query.py +++ b/tests/web/test_delete_query.py @@ -24,7 +24,7 @@ from snuba.query.dsl import and_cond, column, equals, literal from snuba.query.query_settings import HTTPQuerySettings from snuba.web import QueryException -from snuba.web.delete_query import _execute_query +from snuba.web.delete_query import _execute_query, _parse_column_expression @pytest.mark.clickhouse_db @@ -155,3 +155,61 @@ def _update_quota_balance( assert ( update_called ), "update_quota_balance should have been called even though the query was rejected but was not" + + +def test_parse_column_expression_regular_column() -> None: + """Test that regular column names are parsed correctly.""" + from snuba.query.expressions import Column + + expr = _parse_column_expression("project_id") + assert isinstance(expr, Column) + assert expr.column_name == "project_id" + + +def test_parse_column_expression_map_access() -> None: + from snuba.query.expressions import SubscriptableReference + + expr = _parse_column_expression("attributes_string_36['group_id']") + assert isinstance(expr, SubscriptableReference) + assert expr.column.column_name == "attributes_string_36" + assert expr.key.value == "group_id" + + # double quotes are also acceptable + from snuba.query.expressions import SubscriptableReference + + expr = _parse_column_expression('attributes_string_0["event_id"]') + assert isinstance(expr, SubscriptableReference) + assert expr.column.column_name == "attributes_string_0" + assert expr.key.value == "event_id" + + +def test_parse_column_expression_formats_correctly() -> None: + from snuba.clickhouse.formatter.query import format_query + from snuba.web.delete_query import _construct_condition + + conditions = { + "project_id": [1], + "attributes_string_36['group_id']": [12345], + } + + condition_expr = _construct_condition(conditions) + + query = Query( + from_clause=Table( + "eap_items_1_local", + ColumnSet([]), + storage_key=StorageKey.EAP_ITEMS, + ), + condition=condition_expr, + is_delete=True, + ) + + formatted = format_query(query) + sql = formatted.get_sql() + + # Should NOT have backticks around the entire map access expression + assert "`attributes_string_36['group_id']`" not in sql + # Should have the correct format without backticks + assert "attributes_string_36['group_id']" in sql + assert "equals(project_id, 1)" in sql + assert "equals(attributes_string_36['group_id'], 12345)" in sql From 55bf7b172e062f971658f86076df842d601acfa9 Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Mon, 17 Nov 2025 10:44:24 -0800 Subject: [PATCH 15/20] little test cleanup --- tests/web/test_delete_query.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/web/test_delete_query.py b/tests/web/test_delete_query.py index 269d7e89e78..88c05bcd7f3 100644 --- a/tests/web/test_delete_query.py +++ b/tests/web/test_delete_query.py @@ -207,9 +207,5 @@ def test_parse_column_expression_formats_correctly() -> None: formatted = format_query(query) sql = formatted.get_sql() - # Should NOT have backticks around the entire map access expression - assert "`attributes_string_36['group_id']`" not in sql - # Should have the correct format without backticks - assert "attributes_string_36['group_id']" in sql assert "equals(project_id, 1)" in sql assert "equals(attributes_string_36['group_id'], 12345)" in sql From 8bd30d607041c7a17c3076b39922a1286f96831f Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Mon, 17 Nov 2025 10:47:10 -0800 Subject: [PATCH 16/20] more test cleanup --- tests/web/test_delete_query.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/tests/web/test_delete_query.py b/tests/web/test_delete_query.py index 88c05bcd7f3..39de67b4327 100644 --- a/tests/web/test_delete_query.py +++ b/tests/web/test_delete_query.py @@ -7,6 +7,7 @@ from snuba.attribution import get_app_id from snuba.attribution.attribution_info import AttributionInfo from snuba.clickhouse.columns import ColumnSet +from snuba.clickhouse.formatter.query import format_query from snuba.clickhouse.query import Query from snuba.configs.configuration import Configuration, ResourceIdentifier from snuba.datasets.storages.factory import get_writable_storage @@ -22,9 +23,14 @@ ) from snuba.query.data_source.simple import Table from snuba.query.dsl import and_cond, column, equals, literal +from snuba.query.expressions import Column, SubscriptableReference from snuba.query.query_settings import HTTPQuerySettings from snuba.web import QueryException -from snuba.web.delete_query import _execute_query, _parse_column_expression +from snuba.web.delete_query import ( + _construct_condition, + _execute_query, + _parse_column_expression, +) @pytest.mark.clickhouse_db @@ -158,25 +164,18 @@ def _update_quota_balance( def test_parse_column_expression_regular_column() -> None: - """Test that regular column names are parsed correctly.""" - from snuba.query.expressions import Column - expr = _parse_column_expression("project_id") assert isinstance(expr, Column) assert expr.column_name == "project_id" def test_parse_column_expression_map_access() -> None: - from snuba.query.expressions import SubscriptableReference - expr = _parse_column_expression("attributes_string_36['group_id']") assert isinstance(expr, SubscriptableReference) assert expr.column.column_name == "attributes_string_36" assert expr.key.value == "group_id" # double quotes are also acceptable - from snuba.query.expressions import SubscriptableReference - expr = _parse_column_expression('attributes_string_0["event_id"]') assert isinstance(expr, SubscriptableReference) assert expr.column.column_name == "attributes_string_0" @@ -184,9 +183,6 @@ def test_parse_column_expression_map_access() -> None: def test_parse_column_expression_formats_correctly() -> None: - from snuba.clickhouse.formatter.query import format_query - from snuba.web.delete_query import _construct_condition - conditions = { "project_id": [1], "attributes_string_36['group_id']": [12345], From 394afd4a7d06dbb1828c7f9741fbbf4cc6290435 Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Mon, 17 Nov 2025 11:16:10 -0800 Subject: [PATCH 17/20] correct item type verification in API --- snuba/datasets/deletion_settings.py | 37 ++++++++++++++++ snuba/web/bulk_delete_query.py | 34 +++++++-------- tests/datasets/test_deletion_settings.py | 39 +++++++++++++++++ tests/web/test_bulk_delete_query.py | 55 ++++++++++++++++++------ 4 files changed, 135 insertions(+), 30 deletions(-) create mode 100644 tests/datasets/test_deletion_settings.py diff --git a/snuba/datasets/deletion_settings.py b/snuba/datasets/deletion_settings.py index 144ba59819e..c08e6ada61f 100644 --- a/snuba/datasets/deletion_settings.py +++ b/snuba/datasets/deletion_settings.py @@ -3,6 +3,8 @@ from dataclasses import dataclass, field from typing import Dict, List, Sequence +from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType + MAX_ROWS_TO_DELETE_DEFAULT = 100000 @@ -14,3 +16,38 @@ class DeletionSettings: allowed_columns: Sequence[str] = field(default_factory=list) max_rows_to_delete: int = MAX_ROWS_TO_DELETE_DEFAULT allowed_attributes_by_item_type: Dict[str, List[str]] = field(default_factory=dict) + + +def get_trace_item_type_name(item_type: int) -> str: + """ + Get the string name for a TraceItemType enum value. + + Uses the protobuf enum's Name() method and strips the "TRACE_ITEM_TYPE_" prefix, + then converts to lowercase to match storage configuration naming conventions. + + Args: + item_type: The integer value of the TraceItemType enum + + Returns: + The string name used in storage configurations (e.g., "occurrence", "span") + + Raises: + ValueError: If the item_type is not a valid TraceItemType enum value + """ + try: + # Get the full protobuf enum name (e.g., "TRACE_ITEM_TYPE_SPAN") + # Cast to TraceItemType.ValueType to satisfy type checker + full_name = TraceItemType.Name(item_type) # type: ignore[arg-type] + + # Strip the "TRACE_ITEM_TYPE_" prefix and convert to lowercase + prefix = "TRACE_ITEM_TYPE_" + if not full_name.startswith(prefix): + raise ValueError(f"Unexpected TraceItemType name format: {full_name}") + + return full_name[len(prefix) :].lower() + except ValueError as e: + # This happens when item_type is not a valid enum value + raise ValueError( + f"Unknown TraceItemType value: {item_type}. " + "Must be a valid TraceItemType enum value." + ) from e diff --git a/snuba/web/bulk_delete_query.py b/snuba/web/bulk_delete_query.py index 33a5ff3a0fc..b115a15f70b 100644 --- a/snuba/web/bulk_delete_query.py +++ b/snuba/web/bulk_delete_query.py @@ -22,7 +22,7 @@ from snuba.attribution.attribution_info import AttributionInfo from snuba.clickhouse.columns import ColumnSet from snuba.clickhouse.query import Query -from snuba.datasets.deletion_settings import DeletionSettings +from snuba.datasets.deletion_settings import DeletionSettings, get_trace_item_type_name from snuba.datasets.storage import WritableTableStorage from snuba.datasets.storages.storage_key import StorageKey from snuba.query.conditions import combine_or_conditions @@ -157,38 +157,36 @@ def _validate_attribute_conditions( InvalidQueryException: If no attributes are configured for the item_type, or if any requested attributes are not allowed """ - # Get the string name for the item_type from the configuration - # The configuration uses string names (e.g., "occurrence") as keys allowed_attrs_config = delete_settings.allowed_attributes_by_item_type if not allowed_attrs_config: raise InvalidQueryException("No attribute-based deletions configured for this storage") - # Check if the item_type has any allowed attributes configured - # Since the config uses string names and we're given an integer, we need to find the matching config - # For now, we'll check all configured item types and validate against any that match - - # Try to find a matching configuration by item_type name - matching_allowed_attrs = None - for configured_item_type, allowed_attrs in allowed_attrs_config.items(): - # For this initial implementation, we'll use the string key directly - # In the future, we might need a mapping from item_type int to string name - matching_allowed_attrs = allowed_attrs - break # For now, assume the first/only configured type + # Map the integer item_type to its string name used in configuration + try: + item_type_name = get_trace_item_type_name(attribute_conditions.item_type) + except ValueError as e: + raise InvalidQueryException(str(e)) - if matching_allowed_attrs is None: + # Check if this specific item_type has any allowed attributes configured + if item_type_name not in allowed_attrs_config: raise InvalidQueryException( - f"No attribute-based deletions configured for item_type {attribute_conditions.item_type}" + f"No attribute-based deletions configured for item_type {item_type_name} " + f"(value: {attribute_conditions.item_type}). Configured item types: " + f"{sorted(allowed_attrs_config.keys())}" ) + # Get the allowed attributes for this specific item_type + allowed_attrs = allowed_attrs_config[item_type_name] + # Validate that all requested attributes are allowed requested_attrs = set(attribute_conditions.attributes.keys()) - allowed_attrs_set = set(matching_allowed_attrs) + allowed_attrs_set = set(allowed_attrs) invalid_attrs = requested_attrs - allowed_attrs_set if invalid_attrs: raise InvalidQueryException( - f"Invalid attributes for deletion: {invalid_attrs}. " + f"Invalid attributes for deletion on item_type '{item_type_name}': {invalid_attrs}. " f"Allowed attributes: {allowed_attrs_set}" ) diff --git a/tests/datasets/test_deletion_settings.py b/tests/datasets/test_deletion_settings.py new file mode 100644 index 00000000000..374d697afd5 --- /dev/null +++ b/tests/datasets/test_deletion_settings.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +import pytest +from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType + +from snuba.datasets.deletion_settings import get_trace_item_type_name + + +def test_get_trace_item_type_name_valid() -> None: + """Test that get_trace_item_type_name returns the correct name for valid values""" + assert get_trace_item_type_name(TraceItemType.TRACE_ITEM_TYPE_UNSPECIFIED) == "unspecified" + assert get_trace_item_type_name(TraceItemType.TRACE_ITEM_TYPE_SPAN) == "span" + assert get_trace_item_type_name(TraceItemType.TRACE_ITEM_TYPE_ERROR) == "error" + assert get_trace_item_type_name(TraceItemType.TRACE_ITEM_TYPE_LOG) == "log" + assert get_trace_item_type_name(TraceItemType.TRACE_ITEM_TYPE_UPTIME_CHECK) == "uptime_check" + assert get_trace_item_type_name(TraceItemType.TRACE_ITEM_TYPE_UPTIME_RESULT) == "uptime_result" + assert get_trace_item_type_name(TraceItemType.TRACE_ITEM_TYPE_REPLAY) == "replay" + assert get_trace_item_type_name(TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE) == "occurrence" + assert get_trace_item_type_name(TraceItemType.TRACE_ITEM_TYPE_METRIC) == "metric" + assert ( + get_trace_item_type_name(TraceItemType.TRACE_ITEM_TYPE_PROFILE_FUNCTION) + == "profile_function" + ) + + +def test_get_trace_item_type_name_by_integer() -> None: + """Test that get_trace_item_type_name works with integer values directly""" + assert get_trace_item_type_name(0) == "unspecified" + assert get_trace_item_type_name(1) == "span" + assert get_trace_item_type_name(7) == "occurrence" + + +def test_get_trace_item_type_name_invalid() -> None: + """Test that get_trace_item_type_name raises ValueError for invalid values""" + with pytest.raises(ValueError, match="Unknown TraceItemType value: 999"): + get_trace_item_type_name(999) + + with pytest.raises(ValueError, match="Unknown TraceItemType value: -1"): + get_trace_item_type_name(-1) diff --git a/tests/web/test_bulk_delete_query.py b/tests/web/test_bulk_delete_query.py index c2cbebf26f4..1bbbf0e61f2 100644 --- a/tests/web/test_bulk_delete_query.py +++ b/tests/web/test_bulk_delete_query.py @@ -20,6 +20,10 @@ from snuba.web.bulk_delete_query import AttributeConditions, delete_from_storage from snuba.web.delete_query import DeletesNotEnabledError +# TraceItemType values from sentry_protos +TRACE_ITEM_TYPE_SPAN = 1 +TRACE_ITEM_TYPE_OCCURRENCE = 7 + CONSUMER_CONFIG = { "bootstrap.servers": settings.BROKER_CONFIG["bootstrap.servers"], "group.id": "lwd-search-issues", @@ -137,21 +141,41 @@ def test_delete_invalid_column_name() -> None: @pytest.mark.redis_db -def test_attribute_conditions_valid() -> None: - """Test that valid attribute_conditions are accepted for eap_items storage""" +def test_attribute_conditions_invalid_item_type() -> None: + """Test that attribute_conditions with wrong item_type (span instead of occurrence) are rejected""" + storage = get_writable_storage(StorageKey("eap_items")) + conditions = {"project_id": [1], "item_type": [TRACE_ITEM_TYPE_SPAN]} + # Using span (1) but config only allows occurrence (7) + attribute_conditions = AttributeConditions( + item_type=TRACE_ITEM_TYPE_SPAN, attributes={"group_id": [12345]} + ) + attr_info = get_attribution_info() + + with pytest.raises( + InvalidQueryException, + match="No attribute-based deletions configured for item_type span", + ): + delete_from_storage(storage, conditions, attr_info, attribute_conditions) + + +@pytest.mark.redis_db +def test_attribute_conditions_valid_occurrence() -> None: + """Test that valid attribute_conditions are accepted for occurrence item_type""" storage = get_writable_storage(StorageKey("eap_items")) - conditions = {"project_id": [1], "item_type": [1]} - attribute_conditions = AttributeConditions(item_type=1, attributes={"group_id": [12345]}) + conditions = {"project_id": [1], "item_type": [TRACE_ITEM_TYPE_OCCURRENCE]} + attribute_conditions = AttributeConditions( + item_type=TRACE_ITEM_TYPE_OCCURRENCE, attributes={"group_id": [12345]} + ) attr_info = get_attribution_info() # Mock out _enforce_max_rows to avoid needing actual data with patch("snuba.web.bulk_delete_query._enforce_max_rows", return_value=10): with patch("snuba.web.bulk_delete_query.produce_delete_query") as mock_produce: # Should not raise an exception, but should return empty dict since - # functionality is not yet implemented + # functionality is not yet launched (permit_delete_by_attribute=0 by default) result = delete_from_storage(storage, conditions, attr_info, attribute_conditions) - # Should return empty because we haven't implemented the functionality yet + # Should return empty because the feature flag is off assert result == {} # Should not have produced a message since we return early assert mock_produce.call_count == 0 @@ -161,8 +185,11 @@ def test_attribute_conditions_valid() -> None: def test_attribute_conditions_invalid_attribute() -> None: """Test that invalid attribute names in attribute_conditions are rejected""" storage = get_writable_storage(StorageKey("eap_items")) - conditions = {"project_id": [1], "item_type": [1]} - attribute_conditions = AttributeConditions(item_type=1, attributes={"invalid_attr": [12345]}) + conditions = {"project_id": [1], "item_type": [TRACE_ITEM_TYPE_OCCURRENCE]} + # Using valid item_type (occurrence/7) but invalid attribute + attribute_conditions = AttributeConditions( + item_type=TRACE_ITEM_TYPE_OCCURRENCE, attributes={"invalid_attr": [12345]} + ) attr_info = get_attribution_info() with pytest.raises(InvalidQueryException, match="Invalid attributes for deletion"): @@ -174,7 +201,9 @@ def test_attribute_conditions_missing_item_type() -> None: """Test that attribute_conditions requires item_type in conditions""" storage = get_writable_storage(StorageKey("eap_items")) conditions = {"project_id": [1]} - attribute_conditions = AttributeConditions(item_type=1, attributes={"group_id": [12345]}) + attribute_conditions = AttributeConditions( + item_type=TRACE_ITEM_TYPE_OCCURRENCE, attributes={"group_id": [12345]} + ) attr_info = get_attribution_info() # Since item_type is now in AttributeConditions, we need to test a different scenario @@ -203,8 +232,10 @@ def test_attribute_conditions_storage_not_configured() -> None: def test_attribute_conditions_feature_flag_enabled() -> None: """Test that attribute_conditions are processed when feature flag is enabled""" storage = get_writable_storage(StorageKey("eap_items")) - conditions = {"project_id": [1], "item_type": [1]} - attribute_conditions = AttributeConditions(item_type=1, attributes={"group_id": [12345]}) + conditions = {"project_id": [1], "item_type": [TRACE_ITEM_TYPE_OCCURRENCE]} + attribute_conditions = AttributeConditions( + item_type=TRACE_ITEM_TYPE_OCCURRENCE, attributes={"group_id": [12345]} + ) attr_info = get_attribution_info() # Enable the feature flag @@ -226,7 +257,7 @@ def test_attribute_conditions_feature_flag_enabled() -> None: call_args = mock_produce.call_args[0][0] assert "attribute_conditions" in call_args assert call_args["attribute_conditions"] == {"group_id": [12345]} - assert call_args["attribute_conditions_item_type"] == 1 + assert call_args["attribute_conditions_item_type"] == TRACE_ITEM_TYPE_OCCURRENCE finally: # Clean up: disable the feature flag set_config("permit_delete_by_attribute", 0) From 7838e201015e59ec8777afc372c8fada92594751 Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Mon, 17 Nov 2025 15:00:22 -0800 Subject: [PATCH 18/20] move local import --- snuba/web/delete_query.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/snuba/web/delete_query.py b/snuba/web/delete_query.py index 58374335467..96ed988c755 100644 --- a/snuba/web/delete_query.py +++ b/snuba/web/delete_query.py @@ -1,3 +1,4 @@ +import re import typing import uuid from typing import Any, Mapping, MutableMapping, Optional, Sequence @@ -382,8 +383,6 @@ def _parse_column_expression(col_name: str) -> Expression: "project_id" -> Column("project_id") "attributes_string_0['group_id']" -> SubscriptableReference(Column("attributes_string_0"), Literal("group_id")) """ - import re - # Pattern to match "column_name['key']" or 'column_name["key"]' match = re.match(r"^([a-zA-Z_][a-zA-Z0-9_]*)\[(['\"])(.+?)\2\]$", col_name) From 0b926785daa3c7cb077ea1db11515ace35767a3e Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Mon, 17 Nov 2025 15:03:32 -0800 Subject: [PATCH 19/20] remove AI noise --- tests/datasets/test_deletion_settings.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/datasets/test_deletion_settings.py b/tests/datasets/test_deletion_settings.py index 374d697afd5..d596493393e 100644 --- a/tests/datasets/test_deletion_settings.py +++ b/tests/datasets/test_deletion_settings.py @@ -7,7 +7,6 @@ def test_get_trace_item_type_name_valid() -> None: - """Test that get_trace_item_type_name returns the correct name for valid values""" assert get_trace_item_type_name(TraceItemType.TRACE_ITEM_TYPE_UNSPECIFIED) == "unspecified" assert get_trace_item_type_name(TraceItemType.TRACE_ITEM_TYPE_SPAN) == "span" assert get_trace_item_type_name(TraceItemType.TRACE_ITEM_TYPE_ERROR) == "error" @@ -24,14 +23,12 @@ def test_get_trace_item_type_name_valid() -> None: def test_get_trace_item_type_name_by_integer() -> None: - """Test that get_trace_item_type_name works with integer values directly""" assert get_trace_item_type_name(0) == "unspecified" assert get_trace_item_type_name(1) == "span" assert get_trace_item_type_name(7) == "occurrence" def test_get_trace_item_type_name_invalid() -> None: - """Test that get_trace_item_type_name raises ValueError for invalid values""" with pytest.raises(ValueError, match="Unknown TraceItemType value: 999"): get_trace_item_type_name(999) From 414346397a66ae4850e9013f3110915cb9d79702 Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Mon, 17 Nov 2025 15:06:07 -0800 Subject: [PATCH 20/20] more test cleanup --- tests/lw_deletions/test_formatters.py | 21 +-------------------- 1 file changed, 1 insertion(+), 20 deletions(-) diff --git a/tests/lw_deletions/test_formatters.py b/tests/lw_deletions/test_formatters.py index 34d7270637f..029a18a78bf 100644 --- a/tests/lw_deletions/test_formatters.py +++ b/tests/lw_deletions/test_formatters.py @@ -97,7 +97,7 @@ def test_search_issues_formatter( ), ], ) -def test_identity_formatter( +def test_eap_items_formatter_identity_conditions( messages: Sequence[DeleteQueryMessage], expected_formatted: Sequence[ConditionsType], formatter: Type[Formatter], @@ -107,7 +107,6 @@ def test_identity_formatter( def test_eap_items_formatter_with_attribute_conditions() -> None: - """Test that EAPItemsFormatter correctly resolves attribute_conditions to bucketed columns""" # Create a message with attribute_conditions (integer values) messages = [ create_delete_query_message( @@ -130,7 +129,6 @@ def test_eap_items_formatter_with_attribute_conditions() -> None: def test_eap_items_formatter_multiple_attributes() -> None: - """Test that EAPItemsFormatter handles multiple attributes with different types correctly""" messages = [ create_delete_query_message( conditions={"project_id": [1], "item_type": [1]}, @@ -157,23 +155,7 @@ def test_eap_items_formatter_multiple_attributes() -> None: assert formatted[0][expected_transaction_column] == ["test_transaction"] -def test_eap_items_formatter_without_attribute_conditions() -> None: - """Test that EAPItemsFormatter works without attribute_conditions (backwards compatibility)""" - messages = [ - create_delete_query_message( - conditions={"project_id": [1], "trace_id": ["abc123"]}, - ) - ] - - formatter = EAPItemsFormatter() - formatted = formatter.format(messages) - - assert len(formatted) == 1 - assert formatted[0] == {"project_id": [1], "trace_id": ["abc123"]} - - def test_eap_items_formatter_with_float_attributes() -> None: - """Test that EAPItemsFormatter handles float attribute values correctly""" messages = [ create_delete_query_message( conditions={"project_id": [1], "item_type": [1]}, @@ -194,7 +176,6 @@ def test_eap_items_formatter_with_float_attributes() -> None: def test_eap_items_formatter_with_bool_attributes() -> None: - """Test that EAPItemsFormatter handles bool attribute values correctly""" messages = [ create_delete_query_message( conditions={"project_id": [1], "item_type": [1]},