Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/sentry/tasks/post_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,6 @@ def process_workflow_engine(job: PostProcessJob) -> None:
try:
process_workflows_event.apply_async(
kwargs=dict(
project_id=job["event"].project_id,
event_id=job["event"].event_id,
occurrence_id=job["event"].occurrence_id,
group_id=job["event"].group_id,
Expand Down
7 changes: 2 additions & 5 deletions src/sentry/workflow_engine/processors/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
Workflow,
WorkflowActionGroupStatus,
)
from sentry.workflow_engine.models.detector import Detector
from sentry.workflow_engine.registry import action_handler_registry
from sentry.workflow_engine.tasks.actions import build_trigger_action_task_params, trigger_action
from sentry.workflow_engine.types import WorkflowEventData
Expand Down Expand Up @@ -149,13 +148,11 @@ def get_unique_active_actions(


@scopedstats.timer()
def fire_actions(
actions: BaseQuerySet[Action], detector: Detector, event_data: WorkflowEventData
) -> None:
def fire_actions(actions: BaseQuerySet[Action], event_data: WorkflowEventData) -> None:
deduped_actions = get_unique_active_actions(actions)

for action in deduped_actions:
task_params = build_trigger_action_task_params(action, detector, event_data)
task_params = build_trigger_action_task_params(action, event_data)
trigger_action.apply_async(kwargs=task_params, headers={"sentry-propagate-traces": False})


Expand Down
20 changes: 1 addition & 19 deletions src/sentry/workflow_engine/processors/delayed_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
evaluate_data_conditions,
get_slow_conditions_for_groups,
)
from sentry.workflow_engine.processors.detector import get_detectors_by_groupevents_bulk
from sentry.workflow_engine.processors.log_util import track_batch_performance
from sentry.workflow_engine.processors.workflow_fire_history import create_workflow_fire_histories
from sentry.workflow_engine.types import WorkflowEventData
Expand Down Expand Up @@ -680,11 +679,6 @@ def fire_actions_for_groups(
},
)

# Bulk fetch detectors
event_id_to_detector = get_detectors_by_groupevents_bulk(
[group_event for group_event, _ in group_to_groupevent.values()]
)

# Feature check caching to keep us within the trace budget.
trigger_actions_ff = features.has("organizations:workflow-engine-trigger-actions", organization)
single_processing_ff = features.has(
Expand All @@ -708,17 +702,6 @@ def fire_actions_for_groups(
for group, (group_event, start_timestamp) in group_to_groupevent.items():
with tracker.track(str(group.id)), log_context.new_context(group_id=group.id):
workflow_event_data = WorkflowEventData(event=group_event, group=group)
detector = event_id_to_detector.get(group_event.event_id)

if detector is None:
logger.warning(
"No detector found for event, skipping",
extra={
"event_id": group_event.event_id,
"group_id": group.id,
},
)
continue

dcgs_for_group = groups_to_fire.get(group.id, set())
filtered_actions = filter_recently_fired_workflow_actions(
Expand All @@ -732,7 +715,6 @@ def fire_actions_for_groups(
)

workflow_fire_histories = create_workflow_fire_histories(
detector,
filtered_actions,
workflow_event_data,
should_trigger_actions(group_event.group.type),
Expand All @@ -758,7 +740,7 @@ def fire_actions_for_groups(
)
total_actions += len(filtered_actions)

fire_actions(filtered_actions, detector, workflow_event_data)
fire_actions(filtered_actions, workflow_event_data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Invalid Events Trigger Failing Actions

Removing the detector existence check allows workflow fire histories to be created and actions to be triggered for events without detectors. Previously, events missing detectors were skipped with a warning. Now, trigger_action tasks will be queued and fail when get_detector_by_event raises Detector.DoesNotExist, causing unnecessary task failures and retries instead of gracefully handling missing detectors upfront.

Fix in Cursor Fix in Web


logger.debug(
"workflow_engine.delayed_workflow.triggered_actions_summary",
Expand Down
86 changes: 1 addition & 85 deletions src/sentry/workflow_engine/processors/detector.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from __future__ import annotations

import logging
from collections import defaultdict
from collections.abc import Callable, Mapping
from collections.abc import Callable
from typing import NamedTuple

import sentry_sdk
Expand Down Expand Up @@ -188,89 +187,6 @@ def _create_event_detector_map(
return result, keys


def get_detectors_by_groupevents_bulk(
event_list: list[GroupEvent],
) -> Mapping[str, Detector]:
"""
Given a list of GroupEvents, return a mapping of event_id to Detector.
"""
if not event_list:
return {}

result: dict[str, Detector] = {}

# Separate events by whether they have occurrences or not
events_with_occurrences, error_events, events_missing_detectors = _split_events_by_occurrence(
event_list
)

# Fetch detectors for events with occurrences (by detector_id)
missing_detector_ids = set()
if events_with_occurrences:
detector_id_to_events: dict[int, list[GroupEvent]] = defaultdict(list)

for event, detector_id in events_with_occurrences:
detector_id_to_events[detector_id].append(event)

def _extract_events_lookup_key(detector: Detector) -> int:
return detector.id

if detector_id_to_events:
detectors = Detector.objects.filter(id__in=list(detector_id_to_events.keys()))
mapping, found_detector_ids = _create_event_detector_map(
detectors,
key_event_map=detector_id_to_events,
detector_key_extractor=_extract_events_lookup_key,
)
result.update(mapping)

missing_detector_ids = set(detector_id_to_events.keys()) - found_detector_ids

# Fetch detectors for events without occurrences (by project_id)
projects_missing_detectors = set()
if error_events:
# Group events by project_id
project_to_events: dict[int, list[GroupEvent]] = defaultdict(list)

for event in error_events:
project_to_events[event.project_id].append(event)

def _extract_events_lookup_key(detector: Detector) -> int:
return detector.project_id

detectors = Detector.objects.filter(
project_id__in=project_to_events.keys(),
type=ErrorGroupType.slug,
)
mapping, projects_with_error_detectors = _create_event_detector_map(
detectors,
key_event_map=project_to_events,
detector_key_extractor=_extract_events_lookup_key,
)
result.update(mapping)

projects_missing_detectors = set(project_to_events.keys()) - projects_with_error_detectors

# Log all missing detectors
if missing_detector_ids or projects_missing_detectors or events_missing_detectors:
metrics.incr(
"workflow_engine.detectors.error",
amount=len(projects_missing_detectors) + len(missing_detector_ids),
)
logger.error(
"Detectors not found for events",
extra={
"projects_missing_error_detectors": projects_missing_detectors,
"missing_detectors": missing_detector_ids,
"events_missing_detectors": [
(event.event_id, event.group_id) for event in events_missing_detectors
],
},
)

return result


def create_issue_platform_payload(result: DetectorEvaluationResult, detector_type: str) -> None:
occurrence, status_change = None, None

Expand Down
3 changes: 1 addition & 2 deletions src/sentry/workflow_engine/processors/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,14 +540,13 @@ def process_workflows(

should_trigger_actions = should_fire_workflow_actions(organization, event_data.group.type)
create_workflow_fire_histories(
detector,
actions,
event_data,
should_trigger_actions,
is_delayed=False,
start_timestamp=event_start_time,
)

fire_actions(actions, detector, event_data)
fire_actions(actions, event_data)

return triggered_workflows
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
Action,
DataCondition,
DataConditionGroup,
Detector,
WorkflowDataConditionGroup,
WorkflowFireHistory,
)
Expand All @@ -24,7 +23,6 @@

@scopedstats.timer()
def create_workflow_fire_histories(
detector: Detector,
actions_to_fire: BaseQuerySet[Action],
event_data: WorkflowEventData,
is_single_processing: bool,
Expand Down Expand Up @@ -63,7 +61,6 @@ def create_workflow_fire_histories(

fire_histories = [
WorkflowFireHistory(
detector_id=detector.id,
workflow_id=workflow_id,
group=event_data.group,
event_id=event_id,
Expand Down
15 changes: 4 additions & 11 deletions src/sentry/workflow_engine/tasks/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from sentry.taskworker.retry import Retry
from sentry.utils import metrics
from sentry.utils.exceptions import timeout_grouping_context
from sentry.workflow_engine.models import Action, Detector
from sentry.workflow_engine.models import Action
from sentry.workflow_engine.tasks.utils import (
build_workflow_event_data_from_activity,
build_workflow_event_data_from_event,
Expand All @@ -23,7 +23,7 @@


def build_trigger_action_task_params(
action: Action, detector: Detector, event_data: WorkflowEventData
action: Action, event_data: WorkflowEventData
) -> dict[str, object]:
"""
Build parameters for trigger_action task invocation.
Expand All @@ -45,7 +45,6 @@ def build_trigger_action_task_params(

return {
"action_id": action.id,
"detector_id": detector.id,
"workflow_id": getattr(action, "workflow_id", None),
"event_id": event_id,
"activity_id": activity_id,
Expand Down Expand Up @@ -75,7 +74,7 @@ def trigger_action(
group_state: GroupState,
has_reappeared: bool,
has_escalated: bool,
detector_id: int | None = None,
detector_id: int | None = None, # TODO: remove
) -> None:
from sentry.notifications.notification_action.utils import should_fire_workflow_actions
from sentry.workflow_engine.processors.detector import get_detector_by_event
Expand All @@ -90,11 +89,6 @@ def trigger_action(

action = Action.objects.annotate(workflow_id=Value(workflow_id)).get(id=action_id)

# TODO: remove detector usage from this task
detector: Detector | None = None
if detector_id is not None:
detector = Detector.objects.get(id=detector_id)

if event_id is not None:
event_data = build_workflow_event_data_from_event(
event_id=event_id,
Expand All @@ -118,8 +112,7 @@ def trigger_action(
)
raise ValueError("Exactly one of event_id or activity_id must be provided")

if detector is None:
detector = get_detector_by_event(event_data)
detector = get_detector_by_event(event_data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Activity Actions Fail: Detector Type Mismatch

The unconditional call to get_detector_by_event raises TypeError for Activity events since it only supports GroupEvent. Previously, the detector was passed via detector_id parameter, avoiding this call for Activity-triggered actions. Now Activity events that trigger actions will fail when the async task attempts to fetch the detector.

Fix in Cursor Fix in Web


metrics.incr(
"workflow_engine.tasks.trigger_action_task_started",
Expand Down
2 changes: 1 addition & 1 deletion src/sentry/workflow_engine/tasks/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def process_workflows_event(
has_reappeared: bool,
has_escalated: bool,
start_timestamp_seconds: float | None = None,
project_id: int | None = None,
project_id: int | None = None, # TODO: remove
**kwargs: dict[str, Any],
) -> None:
from sentry.workflow_engine.buffer.batch_client import DelayedWorkflowClient
Expand Down
84 changes: 0 additions & 84 deletions tests/sentry/workflow_engine/processors/test_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from sentry.workflow_engine.processors.detector import (
associate_new_group_with_detector,
get_detector_by_event,
get_detectors_by_groupevents_bulk,
process_detectors,
)
from sentry.workflow_engine.types import (
Expand Down Expand Up @@ -928,89 +927,6 @@ def test_defaults_to_error_detector(self) -> None:
assert result == self.error_detector


class TestGetDetectorsByGroupEventsBulk(TestCase):
def setUp(self) -> None:
super().setUp()
self.project1 = self.create_project()
self.project2 = self.create_project()
self.group1 = self.create_group(project=self.project1)
self.group2 = self.create_group(project=self.project2)

self.detector1 = self.create_detector(project=self.project1, type="metric_issue")
self.detector2 = self.create_detector(project=self.project2, type="error")
self.detector3 = self.create_detector(project=self.project2, type="metric_issue")

self.event1 = self.store_event(project_id=self.project1.id, data={})
self.event2 = self.store_event(project_id=self.project2.id, data={})

def test_empty_list(self) -> None:
result = get_detectors_by_groupevents_bulk([])
assert result == {}

def test_mixed_occurrences(self) -> None:
"""Test bulk fetch with mixed events (some with occurrences, some without)"""
occurrence = IssueOccurrence(
id=uuid.uuid4().hex,
project_id=1,
event_id="asdf",
fingerprint=["asdf"],
issue_title="title",
subtitle="subtitle",
resource_id=None,
evidence_data={"detector_id": self.detector1.id},
evidence_display=[],
type=MetricIssue,
detection_time=timezone.now(),
level="error",
culprit="",
)

group_event1 = GroupEvent.from_event(self.event1, self.group1)
group_event1.occurrence = occurrence

group_event2 = GroupEvent.from_event(self.event2, self.group2)
group_event2.occurrence = None

events = [group_event1, group_event2]
result = get_detectors_by_groupevents_bulk(events)

assert result[group_event1.event_id] == self.detector1
assert result[group_event2.event_id] == self.detector2
assert len(result) == 2

def test_mixed_occurrences_missing_detectors(self) -> None:
occurrence = IssueOccurrence(
id=uuid.uuid4().hex,
project_id=1,
event_id="asdf",
fingerprint=["asdf"],
issue_title="title",
subtitle="subtitle",
resource_id=None,
evidence_data={},
evidence_display=[],
type=MetricIssue,
detection_time=timezone.now(),
level="error",
culprit="",
)
self.detector2.delete()

group_event1 = GroupEvent.from_event(self.event1, self.group1)
group_event1.occurrence = occurrence

group_event2 = GroupEvent.from_event(self.event2, self.group2)
group_event2.occurrence = None

events = [group_event1, group_event2]

with mock.patch("sentry.workflow_engine.processors.detector.metrics") as mock_metrics:
result = get_detectors_by_groupevents_bulk(events)

assert result == {}
mock_metrics.incr.assert_called_with("workflow_engine.detectors.error", amount=1)


class TestAssociateNewGroupWithDetector(TestCase):
def setUp(self) -> None:
super().setUp()
Expand Down
Loading
Loading