Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/sentry/monitors/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,7 @@ class Meta:

@data_source_type_registry.register(DATA_SOURCE_CRON_MONITOR)
class CronMonitorDataSourceHandler(DataSourceTypeHandler[Monitor]):
@override
@staticmethod
def bulk_get_query_object(
data_sources: list[DataSource],
Expand All @@ -834,6 +835,7 @@ def bulk_get_query_object(
}
return {ds.id: qs_lookup.get(ds.source_id) for ds in data_sources}

@override
@staticmethod
def related_model(instance) -> list[ModelRelation]:
return [ModelRelation(Monitor, {"id": instance.source_id})]
Expand All @@ -848,3 +850,8 @@ def get_instance_limit(org: Organization) -> int | None:
def get_current_instance_count(org: Organization) -> int:
# We don't have a limit at the moment, so no need to count.
raise NotImplementedError

@override
@staticmethod
def get_relocation_model_name() -> str:
return "monitors.monitor"
7 changes: 7 additions & 0 deletions src/sentry/snuba/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ def write_relocation_import(

@data_source_type_registry.register(DATA_SOURCE_SNUBA_QUERY_SUBSCRIPTION)
class QuerySubscriptionDataSourceHandler(DataSourceTypeHandler[QuerySubscription]):
@override
@staticmethod
def bulk_get_query_object(
data_sources: list[DataSource],
Expand All @@ -203,6 +204,7 @@ def bulk_get_query_object(
}
return {ds.id: qs_lookup.get(ds.source_id) for ds in data_sources}

@override
@staticmethod
def related_model(instance) -> list[ModelRelation]:
return [ModelRelation(QuerySubscription, {"id": instance.source_id})]
Expand All @@ -223,3 +225,8 @@ def get_current_instance_count(org: Organization) -> int:
QuerySubscription.Status.UPDATING.value,
),
).count()

@override
@staticmethod
def get_relocation_model_name() -> str:
return "sentry.querysubscription"
7 changes: 7 additions & 0 deletions src/sentry/uptime/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ class UptimeRegionScheduleMode(enum.StrEnum):

@data_source_type_registry.register(DATA_SOURCE_UPTIME_SUBSCRIPTION)
class UptimeSubscriptionDataSourceHandler(DataSourceTypeHandler[UptimeSubscription]):
@override
@staticmethod
def bulk_get_query_object(
data_sources: list[DataSource],
Expand All @@ -210,6 +211,7 @@ def bulk_get_query_object(
}
return {ds.id: qs_lookup.get(ds.source_id) for ds in data_sources}

@override
@staticmethod
def related_model(instance) -> list[ModelRelation]:
return [ModelRelation(UptimeSubscription, {"id": instance.source_id})]
Expand All @@ -225,6 +227,11 @@ def get_current_instance_count(org: Organization) -> int:
# We don't have a limit at the moment, so no need to count.
raise NotImplementedError

@override
@staticmethod
def get_relocation_model_name() -> str:
return "uptime.uptimesubscription"


def get_detector(uptime_subscription: UptimeSubscription, prefetch_workflow_data=False) -> Detector:
"""
Expand Down
51 changes: 50 additions & 1 deletion src/sentry/workflow_engine/models/data_source.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
import builtins
import dataclasses
import logging
from typing import Generic, TypeVar

from django.db import models
from django.db.models.signals import pre_save
from django.dispatch import receiver

from sentry.backup.scopes import RelocationScope
from sentry.backup.dependencies import NormalizedModelName, PrimaryKeyMap
from sentry.backup.helpers import ImportFlags
from sentry.backup.scopes import ImportScope, RelocationScope
from sentry.db.models import DefaultFieldsModel, FlexibleForeignKey, region_silo_model
from sentry.utils.registry import NoRegistrationExistsError
from sentry.workflow_engine.models.data_source_detector import DataSourceDetector
from sentry.workflow_engine.registry import data_source_type_registry
from sentry.workflow_engine.types import DataSourceTypeHandler

logger = logging.getLogger(__name__)

T = TypeVar("T")


Expand All @@ -25,6 +30,13 @@ class DataPacket(Generic[T]):
@region_silo_model
class DataSource(DefaultFieldsModel):
__relocation_scope__ = RelocationScope.Organization
# DataSource.source_id dynamically references different models based on the 'type' field.
# We declare all possible dependencies here to ensure proper import ordering.
__relocation_dependencies__ = {
"monitors.monitor", # For DATA_SOURCE_CRON_MONITOR
"sentry.querysubscription", # For DATA_SOURCE_SNUBA_QUERY_SUBSCRIPTION
"uptime.uptimesubscription", # For DATA_SOURCE_UPTIME_SUBSCRIPTION
}

organization = FlexibleForeignKey("sentry.Organization")

Expand All @@ -49,6 +61,43 @@ def type_handler(self) -> builtins.type[DataSourceTypeHandler]:
raise ValueError(f"Unknown data source type: {self.type}")
return handler

def normalize_before_relocation_import(
self, pk_map: PrimaryKeyMap, scope: ImportScope, flags: ImportFlags
) -> int | None:
old_pk = super().normalize_before_relocation_import(pk_map, scope, flags)
if old_pk is None:
return None

# Map source_id based on the data source type
try:
handler = data_source_type_registry.get(self.type)
model_name = NormalizedModelName(handler.get_relocation_model_name())
old_source_id = int(self.source_id)
new_source_id = pk_map.get_pk(model_name, old_source_id)

if new_source_id is not None:
self.source_id = str(new_source_id)
else:
# Referenced model not in pk_map. This may be correct (reset_pks=False) or broken
# (reset_pks=True but referenced model was filtered out or failed to import).
logger.warning(
"DataSource source_id not remapped - referenced model not in pk_map",
extra={
"data_source_id": old_pk,
"type": self.type,
"source_id": old_source_id,
"model": str(model_name),
},
)
except Exception:
logger.exception(
"DataSource.normalize_before_relocation_import failed",
extra={"data_source_id": old_pk, "type": self.type, "source_id": self.source_id},
)
return None
Comment on lines +92 to +97
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The except Exception block returns None, causing entire DataSource import to be skipped on error, contradicting intended lax behavior.
Severity: HIGH | Confidence: 0.95

🔍 Detailed Analysis

When an exception occurs within the normalize_before_relocation_import method, such as int(self.source_id) failing due to malformed source_id data, the broad except Exception block at lines 92-97 returns None. This causes the entire DataSource record to be skipped during import, which contradicts the intended behavior of allowing the import to proceed, logging the issue, and leaving source_id unmapped.

💡 Suggested Fix

Modify the except Exception block in normalize_before_relocation_import to return old_pk instead of None. This ensures that DataSource records are not entirely skipped during import when an unexpected error occurs, aligning with the intended behavior of allowing partial imports and logging issues.

🤖 Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: src/sentry/workflow_engine/models/data_source.py#L92-L97

Potential issue: When an exception occurs within the
`normalize_before_relocation_import` method, such as `int(self.source_id)` failing due
to malformed `source_id` data, the broad `except Exception` block at lines 92-97 returns
`None`. This causes the entire `DataSource` record to be skipped during import, which
contradicts the intended behavior of allowing the import to proceed, logging the issue,
and leaving `source_id` unmapped.

Did we get this right? 👍 / 👎 to inform future reviews.


return old_pk


@receiver(pre_save, sender=DataSource)
def ensure_type_handler_registered(sender, instance: DataSource, **kwargs):
Expand Down
16 changes: 15 additions & 1 deletion src/sentry/workflow_engine/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,9 @@ def execute(event_data: WorkflowEventData, action: Action, detector: Detector) -
raise NotImplementedError


class DataSourceTypeHandler(Generic[T]):
class DataSourceTypeHandler(ABC, Generic[T]):
@staticmethod
@abstractmethod
def bulk_get_query_object(data_sources) -> dict[int, T | None]:
"""
Bulk fetch related data-source models returning a dict of the
Expand All @@ -118,6 +119,7 @@ def bulk_get_query_object(data_sources) -> dict[int, T | None]:
raise NotImplementedError

@staticmethod
@abstractmethod
def related_model(instance) -> list[ModelRelation]:
"""
A list of deletion ModelRelations. The model relation query should map
Expand All @@ -127,6 +129,7 @@ def related_model(instance) -> list[ModelRelation]:
raise NotImplementedError

@staticmethod
@abstractmethod
def get_instance_limit(org: Organization) -> int | None:
"""
Returns the maximum number of instances of this data source type for the organization.
Expand All @@ -135,13 +138,24 @@ def get_instance_limit(org: Organization) -> int | None:
raise NotImplementedError

@staticmethod
@abstractmethod
def get_current_instance_count(org: Organization) -> int:
"""
Returns the current number of instances of this data source type for the organization.
Only called if `get_instance_limit` returns a number >0
"""
raise NotImplementedError

@staticmethod
@abstractmethod
def get_relocation_model_name() -> str:
"""
Returns the normalized model name (e.g., "sentry.querysubscription") for the model that
source_id references. This is used during backup/relocation to map old PKs to new PKs.
The format is "app_label.model_name" in lowercase.
"""
raise NotImplementedError


class DataConditionHandler(Generic[T]):
class Group(StrEnum):
Expand Down
74 changes: 74 additions & 0 deletions tests/sentry/workflow_engine/models/test_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

import pytest

from sentry.backup.dependencies import ImportKind, NormalizedModelName, PrimaryKeyMap
from sentry.backup.helpers import ImportFlags
from sentry.backup.scopes import ImportScope
from sentry.monitors.types import DATA_SOURCE_CRON_MONITOR
from sentry.workflow_engine.registry import data_source_type_registry
from tests.sentry.workflow_engine.test_base import BaseWorkflowTest

Expand All @@ -18,3 +22,73 @@ def test_data_source_valid_type(self) -> None:
data_source = self.create_data_source(type="test")
assert data_source is not None
assert data_source.type == "test"

def test_normalize_before_relocation_import(self) -> None:
"""Test that normalize_before_relocation_import correctly maps source_id"""
monitor = self.create_monitor(project=self.project)
data_source = self.create_data_source(
type=DATA_SOURCE_CRON_MONITOR,
source_id=str(monitor.id),
organization_id=self.organization.id,
)

old_monitor_pk = monitor.id
new_monitor_pk = 9999
old_data_source_id = data_source.id
old_org_id = data_source.organization_id

# Create a PrimaryKeyMap that maps the old monitor ID to a new one
pk_map = PrimaryKeyMap()
pk_map.insert(
model_name=NormalizedModelName("monitors.monitor"),
old=old_monitor_pk,
new=new_monitor_pk,
kind=ImportKind.Inserted,
)
pk_map.insert(
model_name=NormalizedModelName("sentry.organization"),
old=old_org_id,
new=old_org_id,
kind=ImportKind.Inserted,
)

old_data_source_pk = data_source.normalize_before_relocation_import(
pk_map, ImportScope.Organization, ImportFlags()
)

assert (
old_data_source_pk == old_data_source_id
), f"Expected {old_data_source_id}, got {old_data_source_pk}"
assert data_source.source_id == str(new_monitor_pk)
assert data_source.pk is None

def test_normalize_before_relocation_import_missing_source(self) -> None:
"""Test that normalize_before_relocation_import succeeds but doesn't update source_id if mapping not found"""
monitor = self.create_monitor(project=self.project)
data_source = self.create_data_source(
type=DATA_SOURCE_CRON_MONITOR,
source_id=str(monitor.id),
organization_id=self.organization.id,
)

old_source_id = data_source.source_id
old_data_source_id = data_source.id
old_org_id = data_source.organization_id

# Create a PrimaryKeyMap without the monitor mapping
pk_map = PrimaryKeyMap()
pk_map.insert(
model_name=NormalizedModelName("sentry.organization"),
old=old_org_id,
new=old_org_id,
kind=ImportKind.Inserted,
)

result = data_source.normalize_before_relocation_import(
pk_map, ImportScope.Organization, ImportFlags()
)

# Should succeed but leave source_id unchanged
assert result == old_data_source_id
assert data_source.source_id == old_source_id
assert data_source.pk is None
Loading