diff --git a/migrations_lockfile.txt b/migrations_lockfile.txt index 741de05aa2ea89..f1bfcb45faf36e 100644 --- a/migrations_lockfile.txt +++ b/migrations_lockfile.txt @@ -39,4 +39,5 @@ tempest: 0003_use_encrypted_char_field uptime: 0049_cleanup_failed_safe_deletes -workflow_engine: 0103_add_unique_constraint + +workflow_engine: 0104_bulkjobstatus diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index 00b3e00d8fe80e..ac5a99353d9d7e 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -3233,6 +3233,12 @@ default=50, flags=FLAG_AUTOMATOR_MODIFIABLE, ) +register( + "workflow_engine.error_backfill.target_running_tasks", + type=Int, + default=1, + flags=FLAG_AUTOMATOR_MODIFIABLE, +) # Restrict uptime issue creation for specific host provider identifiers. Items # in this list map to the `host_provider_id` column in the UptimeSubscription diff --git a/src/sentry/workflow_engine/migrations/0104_bulkjobstatus.py b/src/sentry/workflow_engine/migrations/0104_bulkjobstatus.py new file mode 100644 index 00000000000000..385b67d1b51266 --- /dev/null +++ b/src/sentry/workflow_engine/migrations/0104_bulkjobstatus.py @@ -0,0 +1,56 @@ +# Generated by Django 5.2.1 on 2025-11-13 00:23 + +from django.db import migrations, models + +import sentry.db.models.fields.bounded +import sentry.db.models.fields.jsonfield +from sentry.new_migrations.migrations import CheckedMigration + + +class Migration(CheckedMigration): + # This flag is used to mark that a migration shouldn't be automatically run in production. + # This should only be used for operations where it's safe to run the migration after your + # code has deployed. So this should not be used for most operations that alter the schema + # of a table. + # Here are some things that make sense to mark as post deployment: + # - Large data migrations. Typically we want these to be run manually so that they can be + # monitored and not block the deploy for a long period of time while they run. + # - Adding indexes to large tables. Since this can take a long time, we'd generally prefer to + # run this outside deployments so that we don't block them. Note that while adding an index + # is a schema change, it's completely safe to run the operation after the code has deployed. + # Once deployed, run these manually via: https://develop.sentry.dev/database-migrations/#migration-deployment + + is_post_deployment = False + + dependencies = [ + ("workflow_engine", "0103_add_unique_constraint"), + ] + + operations = [ + migrations.CreateModel( + name="BulkJobStatus", + fields=[ + ( + "id", + sentry.db.models.fields.bounded.BoundedBigAutoField( + primary_key=True, serialize=False + ), + ), + ("date_updated", models.DateTimeField(auto_now=True)), + ("date_added", models.DateTimeField(auto_now_add=True)), + ("job_type", models.CharField(db_index=True, max_length=100)), + ("batch_key", models.CharField(max_length=200, unique=True)), + ("work_chunk_info", sentry.db.models.fields.jsonfield.JSONField()), + ("status", models.CharField(db_index=True, default="not_started", max_length=20)), + ], + options={ + "db_table": "workflow_engine_bulk_job_status", + "indexes": [ + models.Index( + fields=["job_type", "status", "date_updated"], + name="bulkjob_type_stat_upd_idx", + ) + ], + }, + ), + ] diff --git a/src/sentry/workflow_engine/models/__init__.py b/src/sentry/workflow_engine/models/__init__.py index 76ef000bd7cb5d..458c960f1504cd 100644 --- a/src/sentry/workflow_engine/models/__init__.py +++ b/src/sentry/workflow_engine/models/__init__.py @@ -3,6 +3,8 @@ "ActionAlertRuleTriggerAction", "AlertRuleDetector", "AlertRuleWorkflow", + "BulkJobState", + "BulkJobStatus", "Condition", "DataCondition", "DataConditionAlertRuleTrigger", @@ -26,6 +28,7 @@ from .action_alertruletriggeraction import ActionAlertRuleTriggerAction from .alertrule_detector import AlertRuleDetector from .alertrule_workflow import AlertRuleWorkflow +from .bulk_job_status import BulkJobState, BulkJobStatus from .data_condition import Condition, DataCondition from .data_condition_group import DataConditionGroup from .data_condition_group_action import DataConditionGroupAction diff --git a/src/sentry/workflow_engine/models/bulk_job_status.py b/src/sentry/workflow_engine/models/bulk_job_status.py new file mode 100644 index 00000000000000..d2154a72c67dce --- /dev/null +++ b/src/sentry/workflow_engine/models/bulk_job_status.py @@ -0,0 +1,62 @@ +from enum import StrEnum + +from django.db import models + +import sentry +from sentry.backup.scopes import RelocationScope +from sentry.db.models import DefaultFieldsModel, region_silo_model +from sentry.db.models.fields.jsonfield import JSONField + + +class BulkJobState(StrEnum): + NOT_STARTED = "not_started" + IN_PROGRESS = "in_progress" + COMPLETED = "completed" + + +@region_silo_model +class BulkJobStatus(DefaultFieldsModel): + """ + Generic tracking model for bulk job execution. + + This model tracks the execution state of bulk jobs. It is completely decoupled from + job implementation details - all job-specific behavior is defined in BulkJobSpec + instances that are looked up via the job_type field. + """ + + __relocation_scope__ = RelocationScope.Excluded + + # Identifies which BulkJobSpec to use (e.g., "error_backfill"). + # Used to look up the job implementation from bulk_job_registry. + job_type = models.CharField(max_length=100, db_index=True) + + # Unique identifier for this work chunk (e.g., "error_detector:123"). + # Prevents duplicate job records. Format is job-specific but typically + # includes the resource ID being processed. + batch_key = models.CharField(max_length=200, unique=True) + + # JSON-serialized work chunk data (Pydantic model). + # Contains all information needed to process this specific chunk + # (e.g., {"detector_id": 123}). Deserialized using the job's + # work_chunk_model when processing. + work_chunk_info = JSONField() + + # Current execution state (NOT_STARTED, IN_PROGRESS, COMPLETED). + # Coordinator uses this to schedule pending jobs and reset stuck ones. + status = models.CharField( + max_length=20, + choices=[(status.value, status.name.replace("_", " ").title()) for status in BulkJobState], + default=BulkJobState.NOT_STARTED, + db_index=True, + ) + + class Meta: + db_table = "workflow_engine_bulk_job_status" + app_label = "workflow_engine" + indexes = [ + models.Index( + fields=["job_type", "status", "date_updated"], name="bulkjob_type_stat_upd_idx" + ), + ] + + __repr__ = sentry.db.models.sane_repr("job_type", "batch_key", "status") diff --git a/src/sentry/workflow_engine/processors/bulk_job.py b/src/sentry/workflow_engine/processors/bulk_job.py new file mode 100644 index 00000000000000..0accd98a7ea78d --- /dev/null +++ b/src/sentry/workflow_engine/processors/bulk_job.py @@ -0,0 +1,462 @@ +""" +Generic bulk job processing infrastructure. + +Provides a registry-based framework for defining and executing bulk background jobs +with job-specific work chunk processing, coordination, and status tracking. + +## Quick Start: Adding a New Bulk Job + +### 1. Define Your Work Chunk Model + +Create a Pydantic model representing a unit of work: + +```python +from pydantic import BaseModel + +class MyJobWorkChunk(BaseModel): + resource_id: int + # Add any other fields needed to process this chunk +``` + +### 2. Create Your Job Spec + +Subclass BulkJobSpec and implement the required methods: + +```python +from sentry.workflow_engine.processors.bulk_job import BulkJobSpec, bulk_job_registry + +class MyBulkJob(BulkJobSpec): + job_type = "my_job_type" + work_chunk_model = MyJobWorkChunk + + # Optional: Override coordination settings (defaults shown) + max_batch_size = 100 # Max tasks to schedule per coordinator run + target_running_tasks = 50 # Target concurrent tasks + in_progress_timeout = timedelta(hours=1) # Timeout for stuck tasks + completed_cleanup_age = timedelta(days=30) # Age before cleanup + + def process_work_chunk(self, work_chunk: BaseModel) -> dict[str, Any]: + # Implement your processing logic + assert isinstance(work_chunk, MyJobWorkChunk) + # ... do work ... + return {"processed": True} # Return metadata for logging + + def generate_work_chunks(self, start_from: str | None) -> Iterable[BaseModel]: + # Generate all work chunks, optionally resuming from a key + queryset = MyModel.objects.all() + if start_from: + resource_id = int(start_from.split(":")[-1]) + queryset = queryset.filter(id__gte=resource_id) + + for item in queryset: + yield MyJobWorkChunk(resource_id=item.id) + + def get_batch_key(self, work_chunk: BaseModel) -> str: + # Return a unique key for deduplication + assert isinstance(work_chunk, MyJobWorkChunk) + return f"my_job:{work_chunk.resource_id}" + +# Register your job +MY_BULK_JOB = MyBulkJob() +bulk_job_registry.register(MY_BULK_JOB.job_type)(MY_BULK_JOB) +``` + +### 3. Use the Generic Tasks + +The generic tasks in `sentry.workflow_engine.tasks.bulk_job` work with any registered job: + +```python +from sentry.workflow_engine.tasks.bulk_job import ( + populate_bulk_job_records_task, + coordinate_bulk_jobs_task, + process_bulk_job_task, +) + +# Populate: Create BulkJobStatus records for all work chunks +populate_bulk_job_records_task.apply_async(kwargs={"job_type": "my_job_type"}) + +# Coordinate: Schedule tasks to process pending jobs (run periodically) +coordinate_bulk_jobs_task.apply_async(kwargs={"job_type": "my_job_type"}) + +# Process: Execute a single job (scheduled by coordinator) +process_bulk_job_task.apply_async( + kwargs={"job_status_id": 123, "job_type": "my_job_type"} +) +``` + +### 4. Monitor Progress + +Track job progress using BulkJobStatus records: + +```python +from sentry.workflow_engine.models import BulkJobStatus, BulkJobState + +# Check status +pending = BulkJobStatus.objects.filter( + job_type="my_job_type", + status=BulkJobState.NOT_STARTED +).count() + +in_progress = BulkJobStatus.objects.filter( + job_type="my_job_type", + status=BulkJobState.IN_PROGRESS +).count() + +completed = BulkJobStatus.objects.filter( + job_type="my_job_type", + status=BulkJobState.COMPLETED +).count() +``` + +### Key Features + +- **Automatic Deduplication**: Uses `batch_key` to prevent duplicate job records +- **Resumable Population**: `populate` can be interrupted and resumed via `start_from` +- **Concurrency Control**: Coordinator maintains `target_running_tasks` concurrent jobs +- **Automatic Retry**: Stuck jobs (in progress > timeout) are automatically reset +- **Cleanup**: Completed jobs older than `completed_cleanup_age` are auto-deleted +- **Metrics**: Automatic metrics reporting with job_type tags + +For a complete example, see `src/sentry/workflow_engine/processors/error_backfill.py` +and the integration tests in `tests/sentry/workflow_engine/test_bulk_job_integration.py`. +""" + +import logging +from abc import ABC, abstractmethod +from collections.abc import Callable, Iterable +from datetime import UTC, datetime, timedelta +from typing import Any + +from django.db import router, transaction +from django.db.models import Count +from pydantic import BaseModel + +from sentry.locks import locks +from sentry.utils import metrics +from sentry.utils.registry import Registry +from sentry.workflow_engine.models import BulkJobState, BulkJobStatus + +logger = logging.getLogger(__name__) + + +class BulkJobSpec(ABC): + """ + Abstract base class for bulk job specifications. + + Subclasses must define job_type and work_chunk_model as class attributes, + and implement all abstract methods to define job-specific behavior. + """ + + job_type: str + work_chunk_model: type[BaseModel] + + # Coordination configuration + max_batch_size: int = 100 + target_running_tasks: int = 50 + in_progress_timeout: timedelta = timedelta(hours=1) + completed_cleanup_age: timedelta = timedelta(days=30) + + @abstractmethod + def process_work_chunk(self, work_chunk: BaseModel) -> dict[str, Any]: + """Process a single work chunk and return result metadata for logging.""" + pass + + @abstractmethod + def generate_work_chunks(self, start_from: str | None) -> Iterable[BaseModel]: + """Generate work chunks for job status record creation.""" + pass + + @abstractmethod + def get_batch_key(self, work_chunk: BaseModel) -> str: + """Get the unique batch key for a work chunk.""" + pass + + +# Registry of all bulk job specs +bulk_job_registry: Registry[BulkJobSpec] = Registry(enable_reverse_lookup=False) + + +def process_bulk_job(job_status_id: int) -> None: + """ + Generic processor for any bulk job type. + + Looks up the job spec from the registry and executes the job-specific processing logic. + Uses select_for_update to prevent concurrent processing of the same job. + """ + # Atomically claim the job and mark as in progress + with transaction.atomic(using=router.db_for_write(BulkJobStatus)): + try: + job_status = BulkJobStatus.objects.select_for_update().get(id=job_status_id) + except BulkJobStatus.DoesNotExist: + logger.warning( + "bulk_job.status_not_found", + extra={"job_status_id": job_status_id}, + ) + return + + job_spec = bulk_job_registry.get(job_status.job_type) + + # Mark as in progress and update date_updated to prevent false stuck detection + job_status.status = BulkJobState.IN_PROGRESS + job_status.save(update_fields=["status", "date_updated"]) + + # Deserialize work chunk using job-specific model + work_chunk = job_spec.work_chunk_model(**job_status.work_chunk_info) + + # Execute job-specific processing outside transaction + try: + result = job_spec.process_work_chunk(work_chunk) + + # Mark as completed + job_status.status = BulkJobState.COMPLETED + job_status.save(update_fields=["status", "date_updated"]) + + metrics.incr( + "workflow_engine.bulk_job.process_success", tags={"job_type": job_status.job_type} + ) + + logger.info( + "bulk_job.completed", + extra={ + "job_status_id": job_status_id, + "job_type": job_status.job_type, + **result, + }, + ) + + except Exception as e: + logger.exception( + "bulk_job.failed", + extra={ + "job_status_id": job_status_id, + "job_type": job_status.job_type, + "error": str(e), + }, + ) + metrics.incr( + "workflow_engine.bulk_job.process_error", tags={"job_type": job_status.job_type} + ) + raise + + +def coordinate_bulk_jobs( + job_type: str, + max_batch_size: int, + in_progress_timeout: timedelta, + completed_cleanup_age: timedelta, + schedule_task_fn: Callable[[int], None], + target_running_tasks: int, +) -> None: + """ + Generic coordinator for bulk jobs: reset stuck items, delete old completed items, + and schedule new pending jobs to maintain target concurrency. + + Only schedules enough tasks to bring the current running count up to target_running_tasks. + """ + stuck_cutoff = datetime.now(UTC) - in_progress_timeout + stuck_count = BulkJobStatus.objects.filter( + job_type=job_type, + status=BulkJobState.IN_PROGRESS, + date_updated__lt=stuck_cutoff, + ).update( + status=BulkJobState.NOT_STARTED, + ) + + if stuck_count > 0: + logger.info( + "bulk_job.reset_stuck", + extra={"count": stuck_count, "job_type": job_type}, + ) + metrics.incr( + "workflow_engine.bulk_job.reset_stuck", amount=stuck_count, tags={"job_type": job_type} + ) + + completed_cutoff = datetime.now(UTC) - completed_cleanup_age + deleted_count, _ = BulkJobStatus.objects.filter( + job_type=job_type, + status=BulkJobState.COMPLETED, + date_updated__lt=completed_cutoff, + ).delete() + + if deleted_count > 0: + logger.info( + "bulk_job.cleaned_up", + extra={"count": deleted_count, "job_type": job_type}, + ) + metrics.incr( + "workflow_engine.bulk_job.cleaned_up", amount=deleted_count, tags={"job_type": job_type} + ) + + # Acquire lock to prevent race conditions with concurrent coordinators + lock = locks.get(f"bulk_job_coordinate:{job_type}", duration=60, name="bulk_job") + with lock.acquire(): + # Count current in-progress tasks and schedule up to target concurrency + current_in_progress = BulkJobStatus.objects.filter( + job_type=job_type, status=BulkJobState.IN_PROGRESS + ).count() + + # Calculate how many tasks to schedule to reach target + tasks_to_schedule = max(0, target_running_tasks - current_in_progress) + # Cap at max_batch_size for safety + tasks_to_schedule = min(tasks_to_schedule, max_batch_size) + + scheduled_count = 0 + if tasks_to_schedule == 0: + logger.info( + "bulk_job.target_reached", + extra={ + "job_type": job_type, + "current_in_progress": current_in_progress, + "target_running_tasks": target_running_tasks, + }, + ) + else: + pending_items = BulkJobStatus.objects.filter( + job_type=job_type, + status=BulkJobState.NOT_STARTED, + ).order_by("date_added")[:tasks_to_schedule] + + for item in pending_items: + try: + schedule_task_fn(item.id) + scheduled_count += 1 + except Exception as e: + logger.exception( + "bulk_job.schedule_failed", + extra={ + "job_status_id": item.id, + "job_type": job_type, + "error": str(e), + }, + ) + + if scheduled_count > 0: + logger.info( + "bulk_job.scheduled", + extra={"count": scheduled_count, "job_type": job_type}, + ) + metrics.incr( + "workflow_engine.bulk_job.scheduled", + amount=scheduled_count, + tags={"job_type": job_type}, + ) + + # Single aggregation query for all status counts + status_counts = dict( + BulkJobStatus.objects.filter(job_type=job_type) + .values("status") + .annotate(count=Count("id")) + .values_list("status", "count") + ) + + total_pending = status_counts.get(BulkJobState.NOT_STARTED, 0) + total_in_progress = status_counts.get(BulkJobState.IN_PROGRESS, 0) + total_completed = status_counts.get(BulkJobState.COMPLETED, 0) + + logger.info( + "bulk_job.coordinator_run", + extra={ + "job_type": job_type, + "scheduled": scheduled_count, + "stuck_reset": stuck_count, + "cleaned_up": deleted_count, + "total_pending": total_pending, + "total_in_progress": total_in_progress, + "total_completed": total_completed, + }, + ) + + metrics.gauge("workflow_engine.bulk_job.pending", total_pending, tags={"job_type": job_type}) + metrics.gauge( + "workflow_engine.bulk_job.in_progress", total_in_progress, tags={"job_type": job_type} + ) + metrics.gauge( + "workflow_engine.bulk_job.completed", total_completed, tags={"job_type": job_type} + ) + + +def create_bulk_job_records( + job_type: str, + start_from: str | None = None, + deadline: datetime | None = None, + batch_size: int = 1000, +) -> str | None: + """ + Create BulkJobStatus records for a bulk job type. + + Returns a resume key if the deadline is reached, or None if complete. + """ + job_spec = bulk_job_registry.get(job_type) + + created_count = 0 + work_chunk_batch: list[BaseModel] = [] + + for work_chunk in job_spec.generate_work_chunks(start_from): + # Check deadline before adding to batch to avoid reprocessing + if deadline and datetime.now(UTC) >= deadline: + # Flush remaining batch before stopping + if work_chunk_batch: + created_count += _create_job_records_batch(job_type, job_spec, work_chunk_batch) + + # Return current item as resume point (will be processed on resume) + resume_key = job_spec.get_batch_key(work_chunk) + logger.info( + "bulk_job.populate_deadline_reached", + extra={ + "job_type": job_type, + "created_count": created_count, + "resume_from": resume_key, + }, + ) + metrics.incr( + "workflow_engine.bulk_job.populated", + amount=created_count, + tags={"job_type": job_type}, + ) + return resume_key + + work_chunk_batch.append(work_chunk) + + if len(work_chunk_batch) >= batch_size: + created_count += _create_job_records_batch(job_type, job_spec, work_chunk_batch) + work_chunk_batch = [] + + if work_chunk_batch: + created_count += _create_job_records_batch(job_type, job_spec, work_chunk_batch) + + logger.info( + "bulk_job.populated", + extra={"job_type": job_type, "created_count": created_count}, + ) + + metrics.incr( + "workflow_engine.bulk_job.populated", amount=created_count, tags={"job_type": job_type} + ) + return None + + +def _create_job_records_batch( + job_type: str, job_spec: BulkJobSpec, work_chunks: list[BaseModel] +) -> int: + """ + Create BulkJobStatus records for a batch of work chunks. + + Returns the number of new records actually created (excludes duplicates). + Relies on unique constraint on batch_key to prevent duplicates. + """ + records = [ + BulkJobStatus( + job_type=job_type, + batch_key=job_spec.get_batch_key(chunk), + work_chunk_info=chunk.dict(), + status=BulkJobState.NOT_STARTED, + ) + for chunk in work_chunks + ] + + if records: + # bulk_create with ignore_conflicts returns all objects (including conflicts) + # Count only objects that got a primary key assigned (were actually inserted) + created = BulkJobStatus.objects.bulk_create(records, ignore_conflicts=True) + return sum(1 for obj in created if obj.pk is not None) + return 0 diff --git a/src/sentry/workflow_engine/processors/error_backfill.py b/src/sentry/workflow_engine/processors/error_backfill.py new file mode 100644 index 00000000000000..97964400bc43d7 --- /dev/null +++ b/src/sentry/workflow_engine/processors/error_backfill.py @@ -0,0 +1,138 @@ +"""Error detector backfill job implementation.""" + +import logging +from collections.abc import Iterable +from datetime import timedelta +from typing import Any + +from django.db.models import Exists, OuterRef +from pydantic import BaseModel + +from sentry import options +from sentry.grouping.grouptype import ErrorGroupType +from sentry.models.group import Group, GroupStatus +from sentry.utils import metrics +from sentry.utils.query import RangeQuerySetWrapper +from sentry.workflow_engine.models import Detector, DetectorGroup +from sentry.workflow_engine.processors.bulk_job import BulkJobSpec, bulk_job_registry + +logger = logging.getLogger(__name__) + +GROUPS_PER_BATCH = 400 + + +class ErrorDetectorWorkChunk(BaseModel): + """Work chunk for backfilling error detector groups.""" + + detector_id: int + + +class ErrorBackfillJob(BulkJobSpec): + """Bulk job specification for backfilling error detector groups.""" + + job_type = "error_backfill" + work_chunk_model = ErrorDetectorWorkChunk + + # Configuration for coordination + max_batch_size = 100 + in_progress_timeout = timedelta(hours=1) + completed_cleanup_age = timedelta(days=30) + + @property + def target_running_tasks(self) -> int: + return options.get("workflow_engine.error_backfill.target_running_tasks") + + def process_work_chunk(self, work_chunk: BaseModel) -> dict[str, Any]: + assert isinstance(work_chunk, ErrorDetectorWorkChunk) + return _backfill_detector_groups(work_chunk) + + def generate_work_chunks(self, start_from: str | None) -> Iterable[BaseModel]: + return _generate_error_detector_work_chunks(start_from) + + def get_batch_key(self, work_chunk: BaseModel) -> str: + assert isinstance(work_chunk, ErrorDetectorWorkChunk) + return _get_error_detector_batch_key(work_chunk) + + +def _backfill_detector_groups(work_chunk: ErrorDetectorWorkChunk) -> dict[str, Any]: + """ + Create DetectorGroup associations for all unresolved ErrorGroupType Groups + in the detector's project that don't already have a DetectorGroup. + + Returns a dict with backfill results to be included in logging extra data. + """ + detector_id = work_chunk.detector_id + + try: + detector = Detector.objects.get(id=detector_id) + except Detector.DoesNotExist: + logger.info( + "error_backfill.detector_not_found", + extra={"detector_id": detector_id}, + ) + return { + "detector_id": detector_id, + "groups_created": 0, + "skipped": "detector_deleted", + } + + project_id = detector.project_id + + all_unresolved_groups = Group.objects.filter( + project_id=project_id, + status=GroupStatus.UNRESOLVED, + type=ErrorGroupType.type_id, + ) + + # Use NOT EXISTS subquery for efficiency + existing_detector_groups_subquery = DetectorGroup.objects.filter( + detector_id=detector_id, group_id=OuterRef("id") + ) + + groups_needing_detector_groups = all_unresolved_groups.exclude( + Exists(existing_detector_groups_subquery) + ) + + created_count = 0 + + for group in RangeQuerySetWrapper(groups_needing_detector_groups, step=GROUPS_PER_BATCH): + detector_group, created = DetectorGroup.objects.get_or_create( + detector_id=detector_id, + group_id=group.id, + ) + if created: + detector_group.date_added = group.first_seen + detector_group.save(update_fields=["date_added"]) + created_count += 1 + + metrics.incr("workflow_engine.error_backfill.groups_created", amount=created_count) + + return { + "detector_id": detector_id, + "project_id": project_id, + "groups_created": created_count, + } + + +def _generate_error_detector_work_chunks( + start_from: str | None, +) -> Iterable[ErrorDetectorWorkChunk]: + """Generate work chunks for all error detectors.""" + error_detectors = Detector.objects.filter(type=ErrorGroupType.slug) + if start_from is not None: + # Extract detector_id from batch_key format "error_detector:{id}" + detector_id = int(start_from.split(":")[-1]) + error_detectors = error_detectors.filter(id__gte=detector_id) + + for detector in error_detectors.iterator(): + yield ErrorDetectorWorkChunk(detector_id=detector.id) + + +def _get_error_detector_batch_key(work_chunk: ErrorDetectorWorkChunk) -> str: + """Get the batch key for an error detector work chunk.""" + return f"error_detector:{work_chunk.detector_id}" + + +# Create singleton instance and register it +ERROR_BACKFILL_JOB = ErrorBackfillJob() +bulk_job_registry.register("error_backfill")(ERROR_BACKFILL_JOB) diff --git a/src/sentry/workflow_engine/tasks/bulk_job.py b/src/sentry/workflow_engine/tasks/bulk_job.py new file mode 100644 index 00000000000000..7244c60457c15a --- /dev/null +++ b/src/sentry/workflow_engine/tasks/bulk_job.py @@ -0,0 +1,159 @@ +"""Generic tasks for bulk job processing.""" + +import logging +from typing import Any + +from sentry.silo.base import SiloMode +from sentry.tasks.base import instrumented_task +from sentry.taskworker.namespaces import workflow_engine_tasks + +logger = logging.getLogger(__name__) + +# Maximum number of times populate task can reschedule itself +MAX_POPULATE_ITERATIONS = 1000 + + +@instrumented_task( + name="sentry.workflow_engine.tasks.bulk_job.process_bulk_job", + namespace=workflow_engine_tasks, + processing_deadline_duration=300, + silo_mode=SiloMode.REGION, +) +def process_bulk_job_task(job_status_id: int, job_type: str, **kwargs: dict[str, Any]) -> None: + """ + Generic task to process a single BulkJobStatus record for any job type. + + This task: + 1. Marks the job status as in_progress + 2. Processes the work chunk using the job-specific processor + 3. Marks the job status as completed + """ + from sentry.workflow_engine.processors.bulk_job import process_bulk_job + + process_bulk_job(job_status_id) + + +@instrumented_task( + name="sentry.workflow_engine.tasks.bulk_job.coordinate_bulk_jobs", + namespace=workflow_engine_tasks, + processing_deadline_duration=300, + silo_mode=SiloMode.REGION, +) +def coordinate_bulk_jobs_task(job_type: str, **kwargs: dict[str, Any]) -> None: + """ + Generic coordinator task for bulk jobs. + + This task runs periodically and: + 1. Finds pending items and schedules them for processing up to target concurrency + 2. Resets any in_progress items that have been stuck for too long + 3. Deletes completed items that are older than the cleanup age + + The configuration (target_running_tasks, timeouts, etc.) is read from the job spec. + """ + from sentry.workflow_engine.processors.bulk_job import bulk_job_registry, coordinate_bulk_jobs + + job_spec = bulk_job_registry.get(job_type) + + def schedule_task(job_status_id: int) -> None: + process_bulk_job_task.apply_async( + kwargs={"job_status_id": job_status_id, "job_type": job_type}, + expires=int(job_spec.in_progress_timeout.total_seconds()), + ) + + coordinate_bulk_jobs( + job_spec.job_type, + max_batch_size=job_spec.max_batch_size, + in_progress_timeout=job_spec.in_progress_timeout, + completed_cleanup_age=job_spec.completed_cleanup_age, + schedule_task_fn=schedule_task, + target_running_tasks=job_spec.target_running_tasks, + ) + + +@instrumented_task( + name="sentry.workflow_engine.tasks.bulk_job.populate_bulk_job_records", + namespace=workflow_engine_tasks, + processing_deadline_duration=600, + silo_mode=SiloMode.REGION, +) +def populate_bulk_job_records_task( + job_type: str, start_from: str | None = None, iteration: int = 0, **kwargs: dict[str, Any] +) -> None: + """ + Generic task to populate BulkJobStatus records for a job type. + + If the task hits its processing deadline, it reschedules itself to continue from + where it left off. Includes a max iteration limit to prevent infinite loops. + """ + from datetime import UTC, datetime, timedelta + + from sentry.workflow_engine.processors.bulk_job import create_bulk_job_records + + if iteration >= MAX_POPULATE_ITERATIONS: + logger.error( + "bulk_job.populate_max_iterations_reached", + extra={ + "job_type": job_type, + "iteration": iteration, + "start_from": start_from, + }, + ) + return + + deadline = datetime.now(UTC) + timedelta(seconds=540) + resume_from = create_bulk_job_records(job_type, start_from=start_from, deadline=deadline) + + if resume_from is not None: + populate_bulk_job_records_task.apply_async( + kwargs={"job_type": job_type, "start_from": resume_from, "iteration": iteration + 1} + ) + + +# Backward compatibility aliases for error backfill +# These can be removed once all callers are updated to use the generic tasks + + +@instrumented_task( + name="sentry.workflow_engine.tasks.error_backfill.process_error_backfill", + namespace=workflow_engine_tasks, + processing_deadline_duration=300, + silo_mode=SiloMode.REGION, +) +def process_error_backfill(backfill_status_id: int, **kwargs: dict[str, Any]) -> None: + """ + Backward compatibility wrapper for process_bulk_job_task. + Prefer using process_bulk_job_task with job_type parameter. + """ + return process_bulk_job_task(backfill_status_id, "error_backfill", **kwargs) + + +@instrumented_task( + name="sentry.workflow_engine.tasks.error_backfill.coordinate_error_backfill", + namespace=workflow_engine_tasks, + processing_deadline_duration=300, + silo_mode=SiloMode.REGION, +) +def coordinate_error_backfill(**kwargs: dict[str, Any]) -> None: + """ + Backward compatibility wrapper for coordinate_bulk_jobs_task. + Prefer using coordinate_bulk_jobs_task with job_type parameter. + """ + return coordinate_bulk_jobs_task("error_backfill", **kwargs) + + +@instrumented_task( + name="sentry.workflow_engine.tasks.error_backfill.populate_error_backfill_status", + namespace=workflow_engine_tasks, + processing_deadline_duration=600, + silo_mode=SiloMode.REGION, +) +def populate_error_backfill_status( + start_from: str | None = None, iteration: int = 0, **kwargs: dict[str, Any] +) -> None: + """ + Backward compatibility wrapper for populate_bulk_job_records_task. + Prefer using populate_bulk_job_records_task with job_type parameter. + """ + return populate_bulk_job_records_task( + "error_backfill", start_from=start_from, iteration=iteration, **kwargs + ) diff --git a/tests/sentry/tasks/test_error_detector_backfill.py b/tests/sentry/tasks/test_error_detector_backfill.py new file mode 100644 index 00000000000000..8348e07b7c1840 --- /dev/null +++ b/tests/sentry/tasks/test_error_detector_backfill.py @@ -0,0 +1,314 @@ +from datetime import UTC, datetime, timedelta +from unittest.mock import patch + +from sentry.grouping.grouptype import ErrorGroupType +from sentry.models.group import GroupStatus +from sentry.workflow_engine.models import DetectorGroup, ErrorBackfillStatus +from sentry.workflow_engine.tasks.error_detector_backfill import ( + coordinate_error_backfill, + populate_error_backfill_status, + process_error_backfill, +) +from tests.sentry.workflow_engine.test_base import BaseWorkflowTest + + +class ProcessErrorBackfillTest(BaseWorkflowTest): + def setUp(self) -> None: + super().setUp() + self.detector = self.create_detector(type=ErrorGroupType.slug) + + def test_task_persistent_name(self) -> None: + """Test that the task has a persistent name""" + assert ( + process_error_backfill.name + == "sentry.workflow_engine.tasks.error_detector_backfill.process_error_backfill" + ) + + def test_process_backfill_success(self) -> None: + """Test that processing a backfill creates DetectorGroups for all unresolved groups""" + # Create some unresolved error groups + group1 = self.create_group(project=self.detector.project, status=GroupStatus.UNRESOLVED) + group2 = self.create_group(project=self.detector.project, status=GroupStatus.UNRESOLVED) + group3 = self.create_group(project=self.detector.project, status=GroupStatus.RESOLVED) + + backfill_status = ErrorBackfillStatus.objects.create( + detector=self.detector, + status="not_started", + ) + + # Process the backfill + with self.tasks(): + process_error_backfill(backfill_status.id) + + # Check that DetectorGroups were created for unresolved groups + assert DetectorGroup.objects.filter( + detector_id=self.detector.id, group_id=group1.id + ).exists() + assert DetectorGroup.objects.filter( + detector_id=self.detector.id, group_id=group2.id + ).exists() + + # Should not create for resolved group + assert not DetectorGroup.objects.filter( + detector_id=self.detector.id, group_id=group3.id + ).exists() + + # Check that status was updated to completed + backfill_status.refresh_from_db() + assert backfill_status.status == "completed" + + def test_process_backfill_already_exists(self) -> None: + """Test that processing a backfill succeeds even if some DetectorGroups already exist""" + group1 = self.create_group(project=self.detector.project, status=GroupStatus.UNRESOLVED) + group2 = self.create_group(project=self.detector.project, status=GroupStatus.UNRESOLVED) + + # Create DetectorGroup for group1 first + DetectorGroup.objects.create( + detector_id=self.detector.id, + group_id=group1.id, + ) + + backfill_status = ErrorBackfillStatus.objects.create( + detector=self.detector, + status="not_started", + ) + + # Process the backfill (should not fail) + with self.tasks(): + process_error_backfill(backfill_status.id) + + # Check that status was updated to completed + backfill_status.refresh_from_db() + assert backfill_status.status == "completed" + + # group1 should still have its DetectorGroup, group2 should have one now + assert ( + DetectorGroup.objects.filter(detector_id=self.detector.id, group_id=group1.id).count() + == 1 + ) + assert DetectorGroup.objects.filter( + detector_id=self.detector.id, group_id=group2.id + ).exists() + + def test_process_backfill_not_found(self) -> None: + """Test that processing a non-existent backfill doesn't crash""" + # This should log a warning but not crash + with self.tasks(): + process_error_backfill(999999) + + def test_process_backfill_marks_in_progress(self) -> None: + """Test that processing marks the backfill as in_progress and then completed""" + backfill_status = ErrorBackfillStatus.objects.create( + detector=self.detector, + status="not_started", + ) + + # Create a group to process + self.create_group(project=self.detector.project, status=GroupStatus.UNRESOLVED) + + # Process the backfill + with self.tasks(): + process_error_backfill(backfill_status.id) + + # Check that status transitioned from not_started -> in_progress -> completed + backfill_status.refresh_from_db() + assert backfill_status.status == "completed" + + def test_process_backfill_date_added_preservation(self) -> None: + """Test that DetectorGroup.date_added is set to Group.first_seen""" + from datetime import timedelta + + from django.utils import timezone + + # Create a group with a specific first_seen date + old_date = timezone.now() - timedelta(days=30) + group = self.create_group(project=self.detector.project, status=GroupStatus.UNRESOLVED) + group.first_seen = old_date + group.save() + + backfill_status = ErrorBackfillStatus.objects.create( + detector=self.detector, + status="not_started", + ) + + with self.tasks(): + process_error_backfill(backfill_status.id) + + # Check that DetectorGroup.date_added matches Group.first_seen + detector_group = DetectorGroup.objects.get(detector_id=self.detector.id, group_id=group.id) + assert detector_group.date_added == old_date + + +class CoordinateErrorBackfillTest(BaseWorkflowTest): + def setUp(self) -> None: + super().setUp() + self.detector = self.create_detector(type=ErrorGroupType.slug) + + def test_task_persistent_name(self) -> None: + """Test that the task has a persistent name""" + assert ( + coordinate_error_backfill.name + == "sentry.workflow_engine.tasks.error_detector_backfill.coordinate_error_backfill" + ) + + def test_coordinate_schedules_pending_items(self) -> None: + """Test that coordinator schedules pending backfill items""" + detector2 = self.create_detector(type=ErrorGroupType.slug, project=self.create_project()) + + ErrorBackfillStatus.objects.create(detector=self.detector, status="not_started") + ErrorBackfillStatus.objects.create(detector=detector2, status="not_started") + + with patch( + "sentry.workflow_engine.tasks.error_detector_backfill.process_error_backfill.apply_async" + ) as mock_apply_async: + with self.tasks(): + coordinate_error_backfill() + + # Should have scheduled both items + assert mock_apply_async.call_count == 2 + + def test_coordinate_resets_stuck_items(self) -> None: + """Test that coordinator resets items stuck in_progress for more than 1 hour""" + backfill_status = ErrorBackfillStatus.objects.create( + detector=self.detector, status="in_progress" + ) + + # Manually set date_updated to more than 1 hour ago + old_time = datetime.now(UTC) - timedelta(hours=2) + ErrorBackfillStatus.objects.filter(id=backfill_status.id).update(date_updated=old_time) + + # Mock apply_async to prevent the task from running + with patch( + "sentry.workflow_engine.tasks.error_detector_backfill.process_error_backfill.apply_async" + ): + with self.tasks(): + coordinate_error_backfill() + + # Check that status was reset to not_started + backfill_status.refresh_from_db() + assert backfill_status.status == "not_started" + + def test_coordinate_deletes_old_completed(self) -> None: + """Test that coordinator deletes completed items older than 30 days""" + backfill_status = ErrorBackfillStatus.objects.create( + detector=self.detector, status="completed" + ) + + # Manually set date_updated to more than 30 days ago + old_time = datetime.now(UTC) - timedelta(days=31) + ErrorBackfillStatus.objects.filter(id=backfill_status.id).update(date_updated=old_time) + + with self.tasks(): + coordinate_error_backfill() + + # Check that the record was deleted + assert not ErrorBackfillStatus.objects.filter(id=backfill_status.id).exists() + + def test_coordinate_respects_batch_size(self) -> None: + """Test that coordinator only schedules up to MAX_BACKFILL_BATCH_SIZE items""" + from sentry.workflow_engine.tasks.error_detector_backfill import MAX_BACKFILL_BATCH_SIZE + + # Create more than MAX_BACKFILL_BATCH_SIZE items + for i in range(MAX_BACKFILL_BATCH_SIZE + 10): + project = self.create_project() + detector = self.create_detector(type=ErrorGroupType.slug, project=project) + ErrorBackfillStatus.objects.create(detector=detector, status="not_started") + + with patch( + "sentry.workflow_engine.tasks.error_detector_backfill.process_error_backfill.apply_async" + ) as mock_apply_async: + with self.tasks(): + coordinate_error_backfill() + + # Should only schedule MAX_BACKFILL_BATCH_SIZE items + assert mock_apply_async.call_count == MAX_BACKFILL_BATCH_SIZE + + def test_coordinate_ignores_recent_in_progress(self) -> None: + """Test that coordinator doesn't reset recent in_progress items""" + backfill_status = ErrorBackfillStatus.objects.create( + detector=self.detector, status="in_progress" + ) + + with self.tasks(): + coordinate_error_backfill() + + # Check that status is still in_progress + backfill_status.refresh_from_db() + assert backfill_status.status == "in_progress" + + def test_coordinate_ignores_recent_completed(self) -> None: + """Test that coordinator doesn't delete recent completed items""" + backfill_status = ErrorBackfillStatus.objects.create( + detector=self.detector, status="completed" + ) + + with self.tasks(): + coordinate_error_backfill() + + # Check that the record still exists + assert ErrorBackfillStatus.objects.filter(id=backfill_status.id).exists() + + def test_coordinate_logs_metrics(self) -> None: + """Test that coordinator logs appropriate metrics""" + detector2 = self.create_detector(type=ErrorGroupType.slug, project=self.create_project()) + + ErrorBackfillStatus.objects.create(detector=self.detector, status="not_started") + backfill_status_completed = ErrorBackfillStatus.objects.create( + detector=detector2, status="completed" + ) + + # Set completed item to old date for cleanup + old_time = datetime.now(UTC) - timedelta(days=31) + ErrorBackfillStatus.objects.filter(id=backfill_status_completed.id).update( + date_updated=old_time + ) + + with ( + patch("sentry.utils.metrics.incr") as mock_incr, + patch("sentry.utils.metrics.gauge") as mock_gauge, + ): + with self.tasks(): + coordinate_error_backfill() + + # Check that metrics were recorded + assert mock_incr.called + assert mock_gauge.called + + +class PopulateErrorBackfillStatusTest(BaseWorkflowTest): + def setUp(self) -> None: + super().setUp() + + def test_task_persistent_name(self) -> None: + """Test that the task has a persistent name""" + assert ( + populate_error_backfill_status.name + == "sentry.workflow_engine.tasks.error_detector_backfill.populate_error_backfill_status" + ) + + def test_populate_creates_records(self) -> None: + """Test that populate creates ErrorBackfillStatus records for all error detectors""" + detector1 = self.create_detector(type=ErrorGroupType.slug) + detector2 = self.create_detector(type=ErrorGroupType.slug, project=self.create_project()) + detector3 = self.create_detector(type=ErrorGroupType.slug, project=self.create_project()) + + # Populate + with self.tasks(): + populate_error_backfill_status() + + # Check that records were created + assert ErrorBackfillStatus.objects.filter(detector=detector1).exists() + assert ErrorBackfillStatus.objects.filter(detector=detector2).exists() + assert ErrorBackfillStatus.objects.filter(detector=detector3).exists() + + def test_populate_idempotent(self) -> None: + """Test that populate is idempotent and doesn't create duplicates""" + detector = self.create_detector(type=ErrorGroupType.slug) + + # Populate twice + with self.tasks(): + populate_error_backfill_status() + populate_error_backfill_status() + + # Should only have one record + assert ErrorBackfillStatus.objects.filter(detector=detector).count() == 1 diff --git a/tests/sentry/workflow_engine/models/test_bulk_job_status.py b/tests/sentry/workflow_engine/models/test_bulk_job_status.py new file mode 100644 index 00000000000000..ee4d706fe18536 --- /dev/null +++ b/tests/sentry/workflow_engine/models/test_bulk_job_status.py @@ -0,0 +1,130 @@ +import pytest + +from sentry.grouping.grouptype import ErrorGroupType +from sentry.workflow_engine.models import BulkJobState, BulkJobStatus, Detector +from sentry.workflow_engine.processors.error_backfill import ( + ERROR_BACKFILL_JOB, + ErrorDetectorWorkChunk, +) +from tests.sentry.workflow_engine.test_base import BaseWorkflowTest + + +class BulkJobStatusTest(BaseWorkflowTest): + def setUp(self) -> None: + super().setUp() + self.detector = self.create_detector(type=ErrorGroupType.slug) + + def test_create_error_backfill_status(self) -> None: + """Test that we can create a BulkJobStatus record""" + work_chunk = ErrorDetectorWorkChunk(detector_id=self.detector.id) + batch_key = ERROR_BACKFILL_JOB.get_batch_key(work_chunk) + backfill_status = BulkJobStatus( + job_type=ERROR_BACKFILL_JOB.job_type, + batch_key=batch_key, + work_chunk_info=work_chunk.dict(), + status=BulkJobState.NOT_STARTED, + ) + backfill_status.save() + + assert backfill_status.id is not None + assert backfill_status.batch_key == f"error_detector:{self.detector.id}" + assert backfill_status.work_chunk_info["detector_id"] == self.detector.id + assert backfill_status.status == BulkJobState.NOT_STARTED + assert backfill_status.date_added is not None + assert backfill_status.date_updated is not None + + def test_unique_constraint(self) -> None: + """Test that batch_key is unique""" + from django.db import IntegrityError + + work_chunk = ErrorDetectorWorkChunk(detector_id=self.detector.id) + batch_key = ERROR_BACKFILL_JOB.get_batch_key(work_chunk) + backfill_status = BulkJobStatus( + job_type=ERROR_BACKFILL_JOB.job_type, + batch_key=batch_key, + work_chunk_info=work_chunk.dict(), + status=BulkJobState.NOT_STARTED, + ) + backfill_status.save() + + # Creating a duplicate should fail + with pytest.raises(IntegrityError): + duplicate = BulkJobStatus( + job_type=ERROR_BACKFILL_JOB.job_type, + batch_key=batch_key, + work_chunk_info=work_chunk.dict(), + status=BulkJobState.NOT_STARTED, + ) + duplicate.save() + + def test_status_transitions(self) -> None: + """Test that we can transition between statuses""" + work_chunk = ErrorDetectorWorkChunk(detector_id=self.detector.id) + batch_key = ERROR_BACKFILL_JOB.get_batch_key(work_chunk) + backfill_status = BulkJobStatus( + job_type=ERROR_BACKFILL_JOB.job_type, + batch_key=batch_key, + work_chunk_info=work_chunk.dict(), + status=BulkJobState.NOT_STARTED, + ) + backfill_status.save() + + # Transition to in_progress + backfill_status.status = BulkJobState.IN_PROGRESS + backfill_status.save() + backfill_status.refresh_from_db() + assert backfill_status.status == BulkJobState.IN_PROGRESS + + # Transition to completed + backfill_status.status = BulkJobState.COMPLETED + backfill_status.save() + backfill_status.refresh_from_db() + assert backfill_status.status == BulkJobState.COMPLETED + + def test_queryset_filter_by_status(self) -> None: + """Test that we can efficiently query by status""" + # Create several backfill statuses with different statuses + detector2 = self.create_detector(type=ErrorGroupType.slug, project=self.create_project()) + detector3 = self.create_detector(type=ErrorGroupType.slug, project=self.create_project()) + + for detector, status in [ + (self.detector, BulkJobState.NOT_STARTED), + (detector2, BulkJobState.IN_PROGRESS), + (detector3, BulkJobState.COMPLETED), + ]: + work_chunk = ErrorDetectorWorkChunk(detector_id=detector.id) + batch_key = ERROR_BACKFILL_JOB.get_batch_key(work_chunk) + backfill_status = BulkJobStatus( + job_type=ERROR_BACKFILL_JOB.job_type, + batch_key=batch_key, + work_chunk_info=work_chunk.dict(), + status=status, + ) + backfill_status.save() + + # Query by status + assert BulkJobStatus.objects.filter(status=BulkJobState.NOT_STARTED).count() == 1 + assert BulkJobStatus.objects.filter(status=BulkJobState.IN_PROGRESS).count() == 1 + assert BulkJobStatus.objects.filter(status=BulkJobState.COMPLETED).count() == 1 + + def test_detector_delete_does_not_cascade(self) -> None: + """Test that deleting a detector does not cascade to BulkJobStatus (decoupled)""" + work_chunk = ErrorDetectorWorkChunk(detector_id=self.detector.id) + batch_key = ERROR_BACKFILL_JOB.get_batch_key(work_chunk) + backfill_status = BulkJobStatus( + job_type=ERROR_BACKFILL_JOB.job_type, + batch_key=batch_key, + work_chunk_info=work_chunk.dict(), + status=BulkJobState.NOT_STARTED, + ) + backfill_status.save() + + detector_id = self.detector.id + backfill_status_id = backfill_status.id + + # Delete the detector + self.detector.delete() + + # The backfill status should still exist (decoupled model) + assert not Detector.objects.filter(id=detector_id).exists() + assert BulkJobStatus.objects.filter(id=backfill_status_id).exists() diff --git a/tests/sentry/workflow_engine/processors/test_backfill.py b/tests/sentry/workflow_engine/processors/test_backfill.py new file mode 100644 index 00000000000000..48f2a9b4187b6c --- /dev/null +++ b/tests/sentry/workflow_engine/processors/test_backfill.py @@ -0,0 +1,491 @@ +from datetime import UTC, datetime, timedelta +from unittest.mock import MagicMock, patch + +from sentry.grouping.grouptype import ErrorGroupType +from sentry.models.group import GroupStatus +from sentry.workflow_engine.models import BulkJobState, BulkJobStatus, DetectorGroup +from sentry.workflow_engine.processors.bulk_job import ( + coordinate_bulk_jobs, + create_bulk_job_records, + process_bulk_job, +) +from sentry.workflow_engine.processors.error_backfill import ( + ERROR_BACKFILL_JOB, + ErrorDetectorWorkChunk, +) +from tests.sentry.workflow_engine.test_base import BaseWorkflowTest + + +def create_backfill_status(detector_id: int, status: BulkJobState) -> BulkJobStatus: + """Helper to create a BulkJobStatus record for error detector backfilling.""" + work_chunk = ErrorDetectorWorkChunk(detector_id=detector_id) + batch_key = ERROR_BACKFILL_JOB.get_batch_key(work_chunk) + backfill_status = BulkJobStatus( + job_type=ERROR_BACKFILL_JOB.job_type, + batch_key=batch_key, + work_chunk_info=work_chunk.dict(), + status=status, + ) + backfill_status.save() + return backfill_status + + +class ProcessDetectorBackfillTest(BaseWorkflowTest): + def setUp(self) -> None: + super().setUp() + self.detector = self.create_detector(type=ErrorGroupType.slug) + + def test_process_backfill_success(self) -> None: + """Test that processing a backfill creates DetectorGroups for all unresolved groups""" + # Create some unresolved error groups + group1 = self.create_group(project=self.detector.project, status=GroupStatus.UNRESOLVED) + group2 = self.create_group(project=self.detector.project, status=GroupStatus.UNRESOLVED) + group3 = self.create_group(project=self.detector.project, status=GroupStatus.RESOLVED) + + backfill_status = create_backfill_status(self.detector.id, BulkJobState.NOT_STARTED) + + # Process the backfill + process_bulk_job(backfill_status.id) + + # Check that DetectorGroups were created for unresolved groups + assert DetectorGroup.objects.filter( + detector_id=self.detector.id, group_id=group1.id + ).exists() + assert DetectorGroup.objects.filter( + detector_id=self.detector.id, group_id=group2.id + ).exists() + + # Should not create for resolved group + assert not DetectorGroup.objects.filter( + detector_id=self.detector.id, group_id=group3.id + ).exists() + + # Check that status was updated to completed + backfill_status.refresh_from_db() + assert backfill_status.status == BulkJobState.COMPLETED + + def test_process_backfill_already_exists(self) -> None: + """Test that processing a backfill succeeds even if some DetectorGroups already exist""" + group1 = self.create_group(project=self.detector.project, status=GroupStatus.UNRESOLVED) + group2 = self.create_group(project=self.detector.project, status=GroupStatus.UNRESOLVED) + + # Create DetectorGroup for group1 first + DetectorGroup.objects.create( + detector_id=self.detector.id, + group_id=group1.id, + ) + + backfill_status = create_backfill_status(self.detector.id, BulkJobState.NOT_STARTED) + + # Process the backfill (should not fail) + process_bulk_job(backfill_status.id) + + # Check that status was updated to completed + backfill_status.refresh_from_db() + assert backfill_status.status == BulkJobState.COMPLETED + + # group1 should still have its DetectorGroup, group2 should have one now + assert ( + DetectorGroup.objects.filter(detector_id=self.detector.id, group_id=group1.id).count() + == 1 + ) + assert DetectorGroup.objects.filter( + detector_id=self.detector.id, group_id=group2.id + ).exists() + + def test_process_backfill_not_found(self) -> None: + """Test that processing a non-existent backfill doesn't crash""" + # This should log a warning but not crash + process_bulk_job(999999) + + def test_process_backfill_marks_in_progress(self) -> None: + """Test that processing marks the backfill as in_progress and then completed""" + backfill_status = create_backfill_status(self.detector.id, BulkJobState.NOT_STARTED) + + # Create a group to process + self.create_group(project=self.detector.project, status=GroupStatus.UNRESOLVED) + + # Process the backfill + process_bulk_job(backfill_status.id) + + # Check that status transitioned from not_started -> in_progress -> completed + backfill_status.refresh_from_db() + assert backfill_status.status == BulkJobState.COMPLETED + + def test_process_backfill_date_added_preservation(self) -> None: + """Test that DetectorGroup.date_added is set to Group.first_seen""" + from datetime import timedelta + + from django.utils import timezone + + # Create a group with a specific first_seen date + old_date = timezone.now() - timedelta(days=30) + group = self.create_group(project=self.detector.project, status=GroupStatus.UNRESOLVED) + group.first_seen = old_date + group.save() + + backfill_status = create_backfill_status(self.detector.id, BulkJobState.NOT_STARTED) + + process_bulk_job(backfill_status.id) + + # Check that DetectorGroup.date_added matches Group.first_seen + detector_group = DetectorGroup.objects.get(detector_id=self.detector.id, group_id=group.id) + assert detector_group.date_added == old_date + + +class CoordinateBackfillsTest(BaseWorkflowTest): + def setUp(self) -> None: + super().setUp() + self.detector = self.create_detector(type=ErrorGroupType.slug) + + def test_coordinate_schedules_pending_items(self) -> None: + """Test that coordinator schedules pending backfill items""" + detector2 = self.create_detector(type=ErrorGroupType.slug, project=self.create_project()) + + create_backfill_status(self.detector.id, BulkJobState.NOT_STARTED) + create_backfill_status(detector2.id, BulkJobState.NOT_STARTED) + + mock_schedule = MagicMock() + + coordinate_bulk_jobs( + ERROR_BACKFILL_JOB.job_type, + max_batch_size=100, + in_progress_timeout=timedelta(hours=1), + completed_cleanup_age=timedelta(days=30), + schedule_task_fn=mock_schedule, + target_running_tasks=50, + ) + + # Should have scheduled both items + assert mock_schedule.call_count == 2 + + def test_coordinate_resets_stuck_items(self) -> None: + """Test that coordinator resets items stuck in_progress for more than the timeout""" + backfill_status = create_backfill_status(self.detector.id, BulkJobState.IN_PROGRESS) + + # Manually set date_updated to more than 1 hour ago + old_time = datetime.now(UTC) - timedelta(hours=2) + BulkJobStatus.objects.filter(id=backfill_status.id).update(date_updated=old_time) + + mock_schedule = MagicMock() + + coordinate_bulk_jobs( + ERROR_BACKFILL_JOB.job_type, + max_batch_size=100, + in_progress_timeout=timedelta(hours=1), + completed_cleanup_age=timedelta(days=30), + schedule_task_fn=mock_schedule, + target_running_tasks=50, + ) + + # Check that status was reset to not_started + backfill_status.refresh_from_db() + assert backfill_status.status == BulkJobState.NOT_STARTED + + def test_coordinate_deletes_old_completed(self) -> None: + """Test that coordinator deletes completed items older than the cleanup age""" + backfill_status = create_backfill_status(self.detector.id, BulkJobState.COMPLETED) + + # Manually set date_updated to more than 30 days ago + old_time = datetime.now(UTC) - timedelta(days=31) + BulkJobStatus.objects.filter(id=backfill_status.id).update(date_updated=old_time) + + mock_schedule = MagicMock() + + coordinate_bulk_jobs( + ERROR_BACKFILL_JOB.job_type, + max_batch_size=100, + in_progress_timeout=timedelta(hours=1), + completed_cleanup_age=timedelta(days=30), + schedule_task_fn=mock_schedule, + target_running_tasks=50, + ) + + # Check that the record was deleted + assert not BulkJobStatus.objects.filter(id=backfill_status.id).exists() + + def test_coordinate_respects_batch_size(self) -> None: + """Test that coordinator only schedules up to max_batch_size items""" + max_batch_size = 10 + + # Create more than max_batch_size items + for _ in range(max_batch_size + 5): + project = self.create_project() + detector = self.create_detector(type=ErrorGroupType.slug, project=project) + create_backfill_status(detector.id, BulkJobState.NOT_STARTED) + + mock_schedule = MagicMock() + + coordinate_bulk_jobs( + ERROR_BACKFILL_JOB.job_type, + max_batch_size=max_batch_size, + in_progress_timeout=timedelta(hours=1), + completed_cleanup_age=timedelta(days=30), + schedule_task_fn=mock_schedule, + target_running_tasks=100, + ) + + # Should only schedule max_batch_size items (capped by max_batch_size) + assert mock_schedule.call_count == max_batch_size + + def test_coordinate_ignores_recent_in_progress(self) -> None: + """Test that coordinator doesn't reset recent in_progress items""" + backfill_status = create_backfill_status(self.detector.id, BulkJobState.IN_PROGRESS) + + mock_schedule = MagicMock() + + coordinate_bulk_jobs( + ERROR_BACKFILL_JOB.job_type, + max_batch_size=100, + in_progress_timeout=timedelta(hours=1), + completed_cleanup_age=timedelta(days=30), + schedule_task_fn=mock_schedule, + target_running_tasks=50, + ) + + # Check that status is still in_progress + backfill_status.refresh_from_db() + assert backfill_status.status == BulkJobState.IN_PROGRESS + + def test_coordinate_ignores_recent_completed(self) -> None: + """Test that coordinator doesn't delete recent completed items""" + backfill_status = create_backfill_status(self.detector.id, BulkJobState.COMPLETED) + + mock_schedule = MagicMock() + + coordinate_bulk_jobs( + ERROR_BACKFILL_JOB.job_type, + max_batch_size=100, + in_progress_timeout=timedelta(hours=1), + completed_cleanup_age=timedelta(days=30), + schedule_task_fn=mock_schedule, + target_running_tasks=50, + ) + + # Check that the record still exists + assert BulkJobStatus.objects.filter(id=backfill_status.id).exists() + + def test_coordinate_handles_schedule_failures(self) -> None: + """Test that coordinator continues scheduling even if one item fails""" + detector2 = self.create_detector(type=ErrorGroupType.slug, project=self.create_project()) + detector3 = self.create_detector(type=ErrorGroupType.slug, project=self.create_project()) + + create_backfill_status(self.detector.id, BulkJobState.NOT_STARTED) + create_backfill_status(detector2.id, BulkJobState.NOT_STARTED) + create_backfill_status(detector3.id, BulkJobState.NOT_STARTED) + + call_count = 0 + + def side_effect(backfill_status_id): + nonlocal call_count + call_count += 1 + if call_count == 2: + raise Exception("Scheduling failed") + + mock_schedule = MagicMock(side_effect=side_effect) + + coordinate_bulk_jobs( + ERROR_BACKFILL_JOB.job_type, + max_batch_size=100, + in_progress_timeout=timedelta(hours=1), + completed_cleanup_age=timedelta(days=30), + schedule_task_fn=mock_schedule, + target_running_tasks=50, + ) + + # Should have attempted to schedule all 3 items + assert mock_schedule.call_count == 3 + + def test_coordinate_logs_metrics(self) -> None: + """Test that coordinator logs appropriate metrics""" + detector2 = self.create_detector(type=ErrorGroupType.slug, project=self.create_project()) + + create_backfill_status(self.detector.id, BulkJobState.NOT_STARTED) + backfill_status_completed = create_backfill_status(detector2.id, BulkJobState.COMPLETED) + + # Set completed item to old date for cleanup + old_time = datetime.now(UTC) - timedelta(days=31) + BulkJobStatus.objects.filter(id=backfill_status_completed.id).update(date_updated=old_time) + + mock_schedule = MagicMock() + + with ( + patch("sentry.utils.metrics.incr") as mock_incr, + patch("sentry.utils.metrics.gauge") as mock_gauge, + ): + coordinate_bulk_jobs( + ERROR_BACKFILL_JOB.job_type, + max_batch_size=100, + in_progress_timeout=timedelta(hours=1), + completed_cleanup_age=timedelta(days=30), + schedule_task_fn=mock_schedule, + target_running_tasks=50, + ) + + # Check that metrics were recorded + assert mock_incr.called + assert mock_gauge.called + + def test_coordinate_respects_target_running_tasks(self) -> None: + """Test that coordinator only schedules tasks up to target_running_tasks""" + target_running_tasks = 5 + + # Create 10 pending items + for _ in range(10): + project = self.create_project() + detector = self.create_detector(type=ErrorGroupType.slug, project=project) + create_backfill_status(detector.id, BulkJobState.NOT_STARTED) + + # Create 3 items already in progress + for _ in range(3): + project = self.create_project() + detector = self.create_detector(type=ErrorGroupType.slug, project=project) + create_backfill_status(detector.id, BulkJobState.IN_PROGRESS) + + mock_schedule = MagicMock() + + coordinate_bulk_jobs( + ERROR_BACKFILL_JOB.job_type, + max_batch_size=100, + in_progress_timeout=timedelta(hours=1), + completed_cleanup_age=timedelta(days=30), + schedule_task_fn=mock_schedule, + target_running_tasks=target_running_tasks, + ) + + # Should only schedule 2 more tasks (5 target - 3 in progress = 2) + assert mock_schedule.call_count == 2 + + def test_coordinate_no_schedule_when_target_reached(self) -> None: + """Test that coordinator schedules no tasks when target is already reached""" + target_running_tasks = 5 + + # Create 10 pending items + for _ in range(10): + project = self.create_project() + detector = self.create_detector(type=ErrorGroupType.slug, project=project) + create_backfill_status(detector.id, BulkJobState.NOT_STARTED) + + # Create 5 items already in progress (at target) + for _ in range(5): + project = self.create_project() + detector = self.create_detector(type=ErrorGroupType.slug, project=project) + create_backfill_status(detector.id, BulkJobState.IN_PROGRESS) + + mock_schedule = MagicMock() + + coordinate_bulk_jobs( + ERROR_BACKFILL_JOB.job_type, + max_batch_size=100, + in_progress_timeout=timedelta(hours=1), + completed_cleanup_age=timedelta(days=30), + schedule_task_fn=mock_schedule, + target_running_tasks=target_running_tasks, + ) + + # Should not schedule any tasks + assert mock_schedule.call_count == 0 + + def test_coordinate_respects_max_batch_size_cap(self) -> None: + """Test that coordinator respects max_batch_size even when target is higher""" + target_running_tasks = 100 + max_batch_size = 10 + + # Create 50 pending items + for _ in range(50): + project = self.create_project() + detector = self.create_detector(type=ErrorGroupType.slug, project=project) + create_backfill_status(detector.id, BulkJobState.NOT_STARTED) + + mock_schedule = MagicMock() + + coordinate_bulk_jobs( + ERROR_BACKFILL_JOB.job_type, + max_batch_size=max_batch_size, + in_progress_timeout=timedelta(hours=1), + completed_cleanup_age=timedelta(days=30), + schedule_task_fn=mock_schedule, + target_running_tasks=target_running_tasks, + ) + + # Should only schedule max_batch_size tasks despite higher target + assert mock_schedule.call_count == max_batch_size + + +class PopulateBackfillStatusRecordsTest(BaseWorkflowTest): + def setUp(self) -> None: + super().setUp() + + def test_populate_creates_records(self) -> None: + """Test that populate creates BulkJobStatus records for all error detectors""" + detector1 = self.create_detector(type=ErrorGroupType.slug) + detector2 = self.create_detector(type=ErrorGroupType.slug, project=self.create_project()) + detector3 = self.create_detector(type=ErrorGroupType.slug, project=self.create_project()) + + result = create_bulk_job_records( + ERROR_BACKFILL_JOB.job_type, + ) + + assert result is None + assert BulkJobStatus.objects.filter(batch_key=f"error_detector:{detector1.id}").exists() + assert BulkJobStatus.objects.filter(batch_key=f"error_detector:{detector2.id}").exists() + assert BulkJobStatus.objects.filter(batch_key=f"error_detector:{detector3.id}").exists() + + def test_populate_idempotent(self) -> None: + """Test that populate is idempotent and doesn't create duplicates""" + detector = self.create_detector(type=ErrorGroupType.slug) + + create_bulk_job_records( + ERROR_BACKFILL_JOB.job_type, + ) + create_bulk_job_records( + ERROR_BACKFILL_JOB.job_type, + ) + + assert BulkJobStatus.objects.filter(batch_key=f"error_detector:{detector.id}").count() == 1 + + def test_populate_with_start_from(self) -> None: + """Test that populate respects start_from parameter""" + detector1 = self.create_detector(type=ErrorGroupType.slug) + detector2 = self.create_detector(type=ErrorGroupType.slug, project=self.create_project()) + detector3 = self.create_detector(type=ErrorGroupType.slug, project=self.create_project()) + + result = create_bulk_job_records( + ERROR_BACKFILL_JOB.job_type, start_from=f"error_detector:{detector2.id}" + ) + + assert result is None + assert not BulkJobStatus.objects.filter(batch_key=f"error_detector:{detector1.id}").exists() + assert BulkJobStatus.objects.filter(batch_key=f"error_detector:{detector2.id}").exists() + assert BulkJobStatus.objects.filter(batch_key=f"error_detector:{detector3.id}").exists() + + def test_populate_with_deadline(self) -> None: + """Test that populate returns resume point when deadline is reached""" + for _ in range(5): + self.create_detector(type=ErrorGroupType.slug, project=self.create_project()) + + past_deadline = datetime.now(UTC) - timedelta(seconds=1) + result = create_bulk_job_records(ERROR_BACKFILL_JOB.job_type, deadline=past_deadline) + + assert result is not None + assert BulkJobStatus.objects.count() == 0 + + def test_populate_resumes_from_last_processed(self) -> None: + """Test that populate can resume from where it left off""" + detectors = [ + self.create_detector(type=ErrorGroupType.slug, project=self.create_project()) + for _ in range(3) + ] + + create_bulk_job_records( + ERROR_BACKFILL_JOB.job_type, start_from=f"error_detector:{detectors[0].id}" + ) + assert BulkJobStatus.objects.count() == 3 + + detector4 = self.create_detector(type=ErrorGroupType.slug, project=self.create_project()) + create_bulk_job_records( + ERROR_BACKFILL_JOB.job_type, start_from=f"error_detector:{detector4.id}" + ) + + assert BulkJobStatus.objects.count() == 4 + assert BulkJobStatus.objects.filter(batch_key=f"error_detector:{detector4.id}").exists() diff --git a/tests/sentry/workflow_engine/tasks/test_error_backfill.py b/tests/sentry/workflow_engine/tasks/test_error_backfill.py new file mode 100644 index 00000000000000..402d9e13952bc6 --- /dev/null +++ b/tests/sentry/workflow_engine/tasks/test_error_backfill.py @@ -0,0 +1,31 @@ +"""Tests for task definitions - these primarily verify task names are stable.""" + +from sentry.workflow_engine.tasks.bulk_job import ( + coordinate_error_backfill, + populate_error_backfill_status, + process_error_backfill, +) +from tests.sentry.workflow_engine.test_base import BaseWorkflowTest + + +class TaskDefinitionTest(BaseWorkflowTest): + def test_process_error_backfill_name(self) -> None: + """Test that process_error_backfill has a stable name""" + assert ( + process_error_backfill.name + == "sentry.workflow_engine.tasks.error_backfill.process_error_backfill" + ) + + def test_coordinate_error_backfill_name(self) -> None: + """Test that coordinate_error_backfill has a stable name""" + assert ( + coordinate_error_backfill.name + == "sentry.workflow_engine.tasks.error_backfill.coordinate_error_backfill" + ) + + def test_populate_error_backfill_status_name(self) -> None: + """Test that populate_error_backfill_status has a stable name""" + assert ( + populate_error_backfill_status.name + == "sentry.workflow_engine.tasks.error_backfill.populate_error_backfill_status" + ) diff --git a/tests/sentry/workflow_engine/test_bulk_job_integration.py b/tests/sentry/workflow_engine/test_bulk_job_integration.py new file mode 100644 index 00000000000000..1cf96e0b31dcf4 --- /dev/null +++ b/tests/sentry/workflow_engine/test_bulk_job_integration.py @@ -0,0 +1,323 @@ +"""Integration tests for bulk job processing infrastructure.""" + +import logging +from collections.abc import Iterable +from datetime import timedelta +from typing import Any + +from pydantic import BaseModel + +from sentry.utils.query import RangeQuerySetWrapper +from sentry.workflow_engine.models import BulkJobState, BulkJobStatus, Workflow +from sentry.workflow_engine.processors.bulk_job import ( + BulkJobSpec, + bulk_job_registry, + coordinate_bulk_jobs, + create_bulk_job_records, + process_bulk_job, +) +from sentry.workflow_engine.tasks.bulk_job import ( + populate_bulk_job_records_task, + process_bulk_job_task, +) +from tests.sentry.workflow_engine.test_base import BaseWorkflowTest + +logger = logging.getLogger(__name__) + + +class WorkflowReverseWorkChunk(BaseModel): + """Work chunk for reversing workflow names.""" + + workflow_id: int + + +class WorkflowNameReverseJob(BulkJobSpec): + """Test bulk job that reverses workflow names.""" + + job_type = "workflow_name_reverse" + work_chunk_model = WorkflowReverseWorkChunk + + # Configuration optimized for testing + max_batch_size = 10 + target_running_tasks = 3 + in_progress_timeout = timedelta(minutes=15) + completed_cleanup_age = timedelta(days=1) + + def process_work_chunk(self, work_chunk: BaseModel) -> dict[str, Any]: + """Reverse the name of a single workflow.""" + assert isinstance(work_chunk, WorkflowReverseWorkChunk) + workflow = Workflow.objects.get(id=work_chunk.workflow_id) + old_name = workflow.name + workflow.name = old_name[::-1] # Reverse the string + workflow.save(update_fields=["name"]) + + logger.info( + "workflow_name_reverse.processed", + extra={ + "workflow_id": workflow.id, + "old_name": old_name, + "new_name": workflow.name, + }, + ) + + return { + "workflow_id": workflow.id, + "old_name": old_name, + "new_name": workflow.name, + } + + def generate_work_chunks(self, start_from: str | None) -> Iterable[BaseModel]: + """Generate work chunks for all workflows.""" + workflows = Workflow.objects.all() + if start_from is not None: + # Extract workflow_id from batch_key format "workflow:{id}" + workflow_id = int(start_from.split(":")[-1]) + workflows = workflows.filter(id__gte=workflow_id) + + # RangeQuerySetWrapper doesn't support ORDER BY + for workflow in RangeQuerySetWrapper(workflows, step=100): + yield WorkflowReverseWorkChunk(workflow_id=workflow.id) + + def get_batch_key(self, work_chunk: BaseModel) -> str: + """Get the batch key for a workflow work chunk.""" + assert isinstance(work_chunk, WorkflowReverseWorkChunk) + return f"workflow:{work_chunk.workflow_id}" + + +class BulkJobIntegrationTest(BaseWorkflowTest): + """Integration tests for the bulk job processing system.""" + + def setUp(self) -> None: + super().setUp() + # Register the test job spec (only if not already registered) + self.job_spec = WorkflowNameReverseJob() + if self.job_spec.job_type not in bulk_job_registry.registrations: + bulk_job_registry.register(self.job_spec.job_type)(self.job_spec) + + def tearDown(self) -> None: + # Clean up registry + if self.job_spec.job_type in bulk_job_registry.registrations: + del bulk_job_registry.registrations[self.job_spec.job_type] + # Also clean up reverse lookup if enabled + if ( + bulk_job_registry.enable_reverse_lookup + and self.job_spec in bulk_job_registry.reverse_lookup + ): + del bulk_job_registry.reverse_lookup[self.job_spec] + super().tearDown() + + def test_full_bulk_job_lifecycle(self) -> None: + """ + Integration test for the complete bulk job lifecycle. + + This test: + 1. Creates 5 workflows with names 'ABCDEFG' + 2. Runs populate task to create BulkJobStatus records + 3. Runs coordinate task to process jobs + 4. Verifies that all workflow names are reversed to 'GFEDCBA' + """ + # Create 5 workflows with the same name + workflows = [] + for i in range(5): + workflow = self.create_workflow(name="ABCDEFG") + workflows.append(workflow) + + # Step 1: Populate BulkJobStatus records + resume_key = create_bulk_job_records(self.job_spec.job_type) + assert resume_key is None, "Should complete in one pass for only 5 workflows" + + # Verify BulkJobStatus records were created + job_statuses = BulkJobStatus.objects.filter(job_type=self.job_spec.job_type) + assert job_statuses.count() == 5 + assert all(status.status == BulkJobState.NOT_STARTED for status in job_statuses) + + # Step 2: Process individual jobs using the low-level function + for job_status in job_statuses: + process_bulk_job(job_status.id) + + # Verify all jobs completed + job_statuses = BulkJobStatus.objects.filter(job_type=self.job_spec.job_type) + assert all(status.status == BulkJobState.COMPLETED for status in job_statuses) + + # Step 3: Verify workflow names were reversed + for workflow in workflows: + workflow.refresh_from_db() + assert workflow.name == "GFEDCBA", f"Expected 'GFEDCBA', got '{workflow.name}'" + + def test_coordinator_task_with_concurrency_control(self) -> None: + """ + Test that the coordinator respects target_running_tasks. + + This test: + 1. Creates 10 workflows + 2. Populates BulkJobStatus records + 3. Manually marks 2 as IN_PROGRESS + 4. Runs coordinator with target_running_tasks=3 + 5. Verifies only 1 new task was scheduled (to reach target of 3) + """ + # Create 10 workflows + workflows = [] + for i in range(10): + workflow = self.create_workflow(name="ABCDEFG") + workflows.append(workflow) + + # Populate records + create_bulk_job_records(self.job_spec.job_type) + + # Manually mark 2 as IN_PROGRESS + job_statuses = list( + BulkJobStatus.objects.filter(job_type=self.job_spec.job_type).order_by("id")[:2] + ) + for status in job_statuses: + status.status = BulkJobState.IN_PROGRESS + status.save(update_fields=["status"]) + + # Track which jobs get scheduled + scheduled_ids = [] + + def mock_schedule(job_status_id: int) -> None: + scheduled_ids.append(job_status_id) + # Actually mark as in progress for realistic simulation + status = BulkJobStatus.objects.get(id=job_status_id) + status.status = BulkJobState.IN_PROGRESS + status.save(update_fields=["status"]) + + # Run coordinator + coordinate_bulk_jobs( + self.job_spec.job_type, + max_batch_size=self.job_spec.max_batch_size, + in_progress_timeout=self.job_spec.in_progress_timeout, + completed_cleanup_age=self.job_spec.completed_cleanup_age, + schedule_task_fn=mock_schedule, + target_running_tasks=self.job_spec.target_running_tasks, + ) + + # Should only schedule 1 more task (target=3, already have 2 in progress) + assert len(scheduled_ids) == 1 + + # Verify we now have 3 in progress + in_progress_count = BulkJobStatus.objects.filter( + job_type=self.job_spec.job_type, status=BulkJobState.IN_PROGRESS + ).count() + assert in_progress_count == 3 + + def test_populate_with_cycles(self) -> None: + """ + Test populating records over multiple cycles with deadline interruption. + + This test simulates a scenario where populate hits a deadline and needs + to resume across multiple task invocations. + """ + from datetime import UTC, datetime + + # Create 15 workflows + workflows = [] + for i in range(15): + workflow = self.create_workflow(name=f"WORKFLOW_{i:02d}") + workflows.append(workflow) + + # First cycle: Set a past deadline so it processes only the first item + past_deadline = datetime.now(UTC) + resume_key = create_bulk_job_records( + self.job_spec.job_type, start_from=None, deadline=past_deadline + ) + + # Should have stopped early and returned a resume key + assert resume_key is not None + # Should have created 0 records since deadline was in the past + assert BulkJobStatus.objects.filter(job_type=self.job_spec.job_type).count() == 0 + + # Second cycle: Use a reasonable deadline to process some items + future_deadline = datetime.now(UTC) + timedelta(seconds=10) + resume_key = create_bulk_job_records( + self.job_spec.job_type, start_from=None, deadline=future_deadline + ) + + # Should have completed all workflows + assert resume_key is None + assert BulkJobStatus.objects.filter(job_type=self.job_spec.job_type).count() == 15 + + def test_end_to_end_with_task_functions(self) -> None: + """ + End-to-end test using the actual task functions. + + This test: + 1. Creates workflows with 'ABCDEFG' names + 2. Calls populate_bulk_job_records_task + 3. Calls process_bulk_job_task for each job + 4. Verifies names are reversed + """ + # Create 3 workflows + workflows = [] + for i in range(3): + workflow = self.create_workflow(name="ABCDEFG") + workflows.append(workflow) + + # Call populate task (this doesn't run in celery, just the function) + populate_bulk_job_records_task(self.job_spec.job_type) + + # Get all job statuses + job_statuses = BulkJobStatus.objects.filter(job_type=self.job_spec.job_type) + assert job_statuses.count() == 3 + + # Process each job using the task function + for job_status in job_statuses: + process_bulk_job_task(job_status.id, self.job_spec.job_type) + + # Verify all completed + assert all( + status.status == BulkJobState.COMPLETED + for status in BulkJobStatus.objects.filter(job_type=self.job_spec.job_type) + ) + + # Verify names reversed + for workflow in workflows: + workflow.refresh_from_db() + assert workflow.name == "GFEDCBA" + + def test_coordinator_task_function(self) -> None: + """ + Test the coordinator task function. + + This test verifies that coordinate_bulk_jobs_task properly reads + configuration from the job spec and schedules tasks. + """ + # Create 5 workflows + for i in range(5): + self.create_workflow(name="TESTNAME") + + # Populate records + create_bulk_job_records(self.job_spec.job_type) + + # Verify all are NOT_STARTED + assert ( + BulkJobStatus.objects.filter( + job_type=self.job_spec.job_type, status=BulkJobState.NOT_STARTED + ).count() + == 5 + ) + + # Call coordinate task - this will try to schedule via celery + # For testing, the tasks won't actually run in celery, but we can verify + # that the coordination logic executed + # Note: This would normally schedule process_bulk_job_task.apply_async() + # but in tests those won't run unless we have celery running + + # Instead, let's directly test the coordination logic + scheduled_count = 0 + + def counting_schedule(job_status_id: int) -> None: + nonlocal scheduled_count + scheduled_count += 1 + + coordinate_bulk_jobs( + self.job_spec.job_type, + max_batch_size=self.job_spec.max_batch_size, + in_progress_timeout=self.job_spec.in_progress_timeout, + completed_cleanup_age=self.job_spec.completed_cleanup_age, + schedule_task_fn=counting_schedule, + target_running_tasks=self.job_spec.target_running_tasks, + ) + + # Should schedule up to target_running_tasks (which is 3) + assert scheduled_count == 3