|
| 1 | +""" |
| 2 | +Processor functions for backfilling DetectorGroup associations for error detectors. |
| 3 | +
|
| 4 | +These functions contain the actual business logic for the backfill process, separated |
| 5 | +from the task definitions to avoid heavy import dependencies. |
| 6 | +""" |
| 7 | + |
| 8 | +import logging |
| 9 | +from datetime import UTC, datetime, timedelta |
| 10 | + |
| 11 | +from django.db.models import Exists, OuterRef |
| 12 | + |
| 13 | +from sentry.grouping.grouptype import ErrorGroupType |
| 14 | +from sentry.models.group import Group, GroupStatus |
| 15 | +from sentry.utils import metrics |
| 16 | +from sentry.utils.query import RangeQuerySetWrapper |
| 17 | +from sentry.workflow_engine.models import Detector, DetectorGroup, ErrorBackfillStatus |
| 18 | + |
| 19 | +logger = logging.getLogger(__name__) |
| 20 | + |
| 21 | +GROUPS_PER_BATCH = 400 |
| 22 | + |
| 23 | + |
| 24 | +def process_detector_backfill(backfill_status_id: int) -> None: |
| 25 | + """ |
| 26 | + Process a single ErrorBackfillStatus record, creating DetectorGroup associations |
| 27 | + for all open ErrorGroupType Groups in the detector's project. |
| 28 | + """ |
| 29 | + try: |
| 30 | + backfill_status = ErrorBackfillStatus.objects.select_for_update().get(id=backfill_status_id) |
| 31 | + except ErrorBackfillStatus.DoesNotExist: |
| 32 | + logger.warning( |
| 33 | + "error_detector_backfill.status_not_found", |
| 34 | + extra={"backfill_status_id": backfill_status_id}, |
| 35 | + ) |
| 36 | + return |
| 37 | + |
| 38 | + if backfill_status.status != "in_progress": |
| 39 | + backfill_status.status = "in_progress" |
| 40 | + backfill_status.save(update_fields=["status", "date_updated"]) |
| 41 | + |
| 42 | + try: |
| 43 | + detector = Detector.objects.get(id=backfill_status.detector_id) |
| 44 | + project_id = detector.project_id |
| 45 | + |
| 46 | + all_unresolved_groups = Group.objects.filter( |
| 47 | + project_id=project_id, |
| 48 | + status=GroupStatus.UNRESOLVED, |
| 49 | + type=ErrorGroupType.type_id, |
| 50 | + ) |
| 51 | + |
| 52 | + # Use NOT EXISTS subquery for efficiency |
| 53 | + existing_detector_groups_subquery = DetectorGroup.objects.filter( |
| 54 | + detector_id=detector.id, group_id=OuterRef("id") |
| 55 | + ) |
| 56 | + |
| 57 | + groups_needing_detector_groups = all_unresolved_groups.exclude( |
| 58 | + Exists(existing_detector_groups_subquery) |
| 59 | + ) |
| 60 | + |
| 61 | + created_count = 0 |
| 62 | + |
| 63 | + for group in RangeQuerySetWrapper(groups_needing_detector_groups, step=GROUPS_PER_BATCH): |
| 64 | + detector_group, created = DetectorGroup.objects.get_or_create( |
| 65 | + detector_id=detector.id, |
| 66 | + group_id=group.id, |
| 67 | + ) |
| 68 | + if created: |
| 69 | + detector_group.date_added = group.first_seen |
| 70 | + detector_group.save(update_fields=["date_added"]) |
| 71 | + created_count += 1 |
| 72 | + |
| 73 | + backfill_status.status = "completed" |
| 74 | + backfill_status.save(update_fields=["status", "date_updated"]) |
| 75 | + |
| 76 | + metrics.incr("error_detector_backfill.process_success") |
| 77 | + metrics.incr("error_detector_backfill.groups_created", amount=created_count) |
| 78 | + |
| 79 | + logger.info( |
| 80 | + "error_detector_backfill.completed", |
| 81 | + extra={ |
| 82 | + "backfill_status_id": backfill_status_id, |
| 83 | + "detector_id": detector.id, |
| 84 | + "project_id": project_id, |
| 85 | + "groups_created": created_count, |
| 86 | + }, |
| 87 | + ) |
| 88 | + |
| 89 | + except Exception as e: |
| 90 | + logger.exception( |
| 91 | + "error_detector_backfill.failed", |
| 92 | + extra={ |
| 93 | + "backfill_status_id": backfill_status_id, |
| 94 | + "error": str(e), |
| 95 | + }, |
| 96 | + ) |
| 97 | + metrics.incr("error_detector_backfill.process_error") |
| 98 | + raise |
| 99 | + |
| 100 | + |
| 101 | +def coordinate_backfills( |
| 102 | + max_batch_size: int, |
| 103 | + in_progress_timeout: timedelta, |
| 104 | + completed_cleanup_age: timedelta, |
| 105 | + schedule_task_fn, |
| 106 | +) -> None: |
| 107 | + """ |
| 108 | + Coordinate the error detector backfill process: reset stuck items, delete old completed |
| 109 | + items, and schedule new pending backfills. |
| 110 | + """ |
| 111 | + stuck_cutoff = datetime.now(UTC) - in_progress_timeout |
| 112 | + stuck_count = ErrorBackfillStatus.objects.filter( |
| 113 | + status="in_progress", |
| 114 | + date_updated__lt=stuck_cutoff, |
| 115 | + ).update( |
| 116 | + status="not_started", |
| 117 | + ) |
| 118 | + |
| 119 | + if stuck_count > 0: |
| 120 | + logger.info( |
| 121 | + "error_detector_backfill.reset_stuck", |
| 122 | + extra={"count": stuck_count}, |
| 123 | + ) |
| 124 | + metrics.incr("error_detector_backfill.reset_stuck", amount=stuck_count) |
| 125 | + |
| 126 | + completed_cutoff = datetime.now(UTC) - completed_cleanup_age |
| 127 | + deleted_count, _ = ErrorBackfillStatus.objects.filter( |
| 128 | + status="completed", |
| 129 | + date_updated__lt=completed_cutoff, |
| 130 | + ).delete() |
| 131 | + |
| 132 | + if deleted_count > 0: |
| 133 | + logger.info( |
| 134 | + "error_detector_backfill.cleaned_up", |
| 135 | + extra={"count": deleted_count}, |
| 136 | + ) |
| 137 | + metrics.incr("error_detector_backfill.cleaned_up", amount=deleted_count) |
| 138 | + |
| 139 | + pending_items = ErrorBackfillStatus.objects.filter( |
| 140 | + status="not_started", |
| 141 | + ).order_by( |
| 142 | + "date_added" |
| 143 | + )[:max_batch_size] |
| 144 | + |
| 145 | + scheduled_count = 0 |
| 146 | + for item in pending_items: |
| 147 | + try: |
| 148 | + schedule_task_fn(item.id) |
| 149 | + scheduled_count += 1 |
| 150 | + except Exception as e: |
| 151 | + logger.exception( |
| 152 | + "error_detector_backfill.schedule_failed", |
| 153 | + extra={ |
| 154 | + "backfill_status_id": item.id, |
| 155 | + "error": str(e), |
| 156 | + }, |
| 157 | + ) |
| 158 | + |
| 159 | + if scheduled_count > 0: |
| 160 | + logger.info( |
| 161 | + "error_detector_backfill.scheduled", |
| 162 | + extra={"count": scheduled_count}, |
| 163 | + ) |
| 164 | + metrics.incr("error_detector_backfill.scheduled", amount=scheduled_count) |
| 165 | + |
| 166 | + total_pending = ErrorBackfillStatus.objects.filter(status="not_started").count() |
| 167 | + total_in_progress = ErrorBackfillStatus.objects.filter(status="in_progress").count() |
| 168 | + total_completed = ErrorBackfillStatus.objects.filter(status="completed").count() |
| 169 | + |
| 170 | + logger.info( |
| 171 | + "error_detector_backfill.coordinator_run", |
| 172 | + extra={ |
| 173 | + "scheduled": scheduled_count, |
| 174 | + "stuck_reset": stuck_count, |
| 175 | + "cleaned_up": deleted_count, |
| 176 | + "total_pending": total_pending, |
| 177 | + "total_in_progress": total_in_progress, |
| 178 | + "total_completed": total_completed, |
| 179 | + }, |
| 180 | + ) |
| 181 | + |
| 182 | + metrics.gauge("error_detector_backfill.pending", total_pending) |
| 183 | + metrics.gauge("error_detector_backfill.in_progress", total_in_progress) |
| 184 | + metrics.gauge("error_detector_backfill.completed", total_completed) |
| 185 | + |
| 186 | + |
| 187 | +def populate_backfill_status_records( |
| 188 | + start_from: int | None = None, deadline: datetime | None = None |
| 189 | +) -> int | None: |
| 190 | + """ |
| 191 | + Populate ErrorBackfillStatus records for all error detectors. |
| 192 | +
|
| 193 | + Returns the detector ID to resume from if the deadline is reached, or None if complete. |
| 194 | + """ |
| 195 | + |
| 196 | + def process_batch(detectors: list[Detector]) -> int: |
| 197 | + detector_ids = [d.id for d in detectors] |
| 198 | + |
| 199 | + existing_ids = set( |
| 200 | + ErrorBackfillStatus.objects.filter(detector_id__in=detector_ids).values_list( |
| 201 | + "detector_id", flat=True |
| 202 | + ) |
| 203 | + ) |
| 204 | + |
| 205 | + new_records = [ |
| 206 | + ErrorBackfillStatus(detector_id=d.id, status="not_started") |
| 207 | + for d in detectors |
| 208 | + if d.id not in existing_ids |
| 209 | + ] |
| 210 | + |
| 211 | + if new_records: |
| 212 | + ErrorBackfillStatus.objects.bulk_create(new_records, ignore_conflicts=True) |
| 213 | + return len(new_records) |
| 214 | + return 0 |
| 215 | + |
| 216 | + error_detectors = Detector.objects.filter(type=ErrorGroupType.slug) |
| 217 | + if start_from is not None: |
| 218 | + error_detectors = error_detectors.filter(id__gte=start_from) |
| 219 | + |
| 220 | + created_count = 0 |
| 221 | + batch_size = 1000 |
| 222 | + batch_detectors = [] |
| 223 | + |
| 224 | + for detector in RangeQuerySetWrapper(error_detectors, step=batch_size): |
| 225 | + batch_detectors.append(detector) |
| 226 | + |
| 227 | + if deadline and datetime.now(UTC) >= deadline: |
| 228 | + logger.info( |
| 229 | + "error_detector_backfill.populate_deadline_reached", |
| 230 | + extra={ |
| 231 | + "created_count": created_count, |
| 232 | + "resume_from": detector.id, |
| 233 | + }, |
| 234 | + ) |
| 235 | + metrics.incr("error_detector_backfill.populated", amount=created_count) |
| 236 | + return detector.id |
| 237 | + |
| 238 | + if len(batch_detectors) >= batch_size: |
| 239 | + created_count += process_batch(batch_detectors) |
| 240 | + batch_detectors = [] |
| 241 | + |
| 242 | + if batch_detectors: |
| 243 | + created_count += process_batch(batch_detectors) |
| 244 | + |
| 245 | + logger.info( |
| 246 | + "error_detector_backfill.populated", |
| 247 | + extra={"created_count": created_count}, |
| 248 | + ) |
| 249 | + |
| 250 | + metrics.incr("error_detector_backfill.populated", amount=created_count) |
| 251 | + return None |
0 commit comments