Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion migrations_lockfile.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,5 @@ tempest: 0003_use_encrypted_char_field

uptime: 0049_cleanup_failed_safe_deletes

workflow_engine: 0103_add_unique_constraint

workflow_engine: 0104_bulkjobstatus
6 changes: 6 additions & 0 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -3233,6 +3233,12 @@
default=50,
flags=FLAG_AUTOMATOR_MODIFIABLE,
)
register(
"workflow_engine.error_backfill.target_running_tasks",
type=Int,
default=1,
flags=FLAG_AUTOMATOR_MODIFIABLE,
)

# Restrict uptime issue creation for specific host provider identifiers. Items
# in this list map to the `host_provider_id` column in the UptimeSubscription
Expand Down
56 changes: 56 additions & 0 deletions src/sentry/workflow_engine/migrations/0104_bulkjobstatus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Generated by Django 5.2.1 on 2025-11-13 00:23

from django.db import migrations, models

import sentry.db.models.fields.bounded
import sentry.db.models.fields.jsonfield
from sentry.new_migrations.migrations import CheckedMigration


class Migration(CheckedMigration):
# This flag is used to mark that a migration shouldn't be automatically run in production.
# This should only be used for operations where it's safe to run the migration after your
# code has deployed. So this should not be used for most operations that alter the schema
# of a table.
# Here are some things that make sense to mark as post deployment:
# - Large data migrations. Typically we want these to be run manually so that they can be
# monitored and not block the deploy for a long period of time while they run.
# - Adding indexes to large tables. Since this can take a long time, we'd generally prefer to
# run this outside deployments so that we don't block them. Note that while adding an index
# is a schema change, it's completely safe to run the operation after the code has deployed.
# Once deployed, run these manually via: https://develop.sentry.dev/database-migrations/#migration-deployment

is_post_deployment = False

dependencies = [
("workflow_engine", "0103_add_unique_constraint"),
]

operations = [
migrations.CreateModel(
name="BulkJobStatus",
fields=[
(
"id",
sentry.db.models.fields.bounded.BoundedBigAutoField(
primary_key=True, serialize=False
),
),
("date_updated", models.DateTimeField(auto_now=True)),
("date_added", models.DateTimeField(auto_now_add=True)),
("job_type", models.CharField(db_index=True, max_length=100)),
("batch_key", models.CharField(max_length=200, unique=True)),
("work_chunk_info", sentry.db.models.fields.jsonfield.JSONField()),
("status", models.CharField(db_index=True, default="not_started", max_length=20)),
],
options={
"db_table": "workflow_engine_bulk_job_status",
"indexes": [
models.Index(
fields=["job_type", "status", "date_updated"],
name="bulkjob_type_stat_upd_idx",
)
],
},
),
]
3 changes: 3 additions & 0 deletions src/sentry/workflow_engine/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
"ActionAlertRuleTriggerAction",
"AlertRuleDetector",
"AlertRuleWorkflow",
"BulkJobState",
"BulkJobStatus",
"Condition",
"DataCondition",
"DataConditionAlertRuleTrigger",
Expand All @@ -26,6 +28,7 @@
from .action_alertruletriggeraction import ActionAlertRuleTriggerAction
from .alertrule_detector import AlertRuleDetector
from .alertrule_workflow import AlertRuleWorkflow
from .bulk_job_status import BulkJobState, BulkJobStatus
from .data_condition import Condition, DataCondition
from .data_condition_group import DataConditionGroup
from .data_condition_group_action import DataConditionGroupAction
Expand Down
62 changes: 62 additions & 0 deletions src/sentry/workflow_engine/models/bulk_job_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from enum import StrEnum

from django.db import models

import sentry
from sentry.backup.scopes import RelocationScope
from sentry.db.models import DefaultFieldsModel, region_silo_model
from sentry.db.models.fields.jsonfield import JSONField


class BulkJobState(StrEnum):
NOT_STARTED = "not_started"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"


@region_silo_model
class BulkJobStatus(DefaultFieldsModel):
"""
Generic tracking model for bulk job execution.

This model tracks the execution state of bulk jobs. It is completely decoupled from
job implementation details - all job-specific behavior is defined in BulkJobSpec
instances that are looked up via the job_type field.
"""

__relocation_scope__ = RelocationScope.Excluded

# Identifies which BulkJobSpec to use (e.g., "error_backfill").
# Used to look up the job implementation from bulk_job_registry.
job_type = models.CharField(max_length=100, db_index=True)

# Unique identifier for this work chunk (e.g., "error_detector:123").
# Prevents duplicate job records. Format is job-specific but typically
# includes the resource ID being processed.
batch_key = models.CharField(max_length=200, unique=True)

# JSON-serialized work chunk data (Pydantic model).
# Contains all information needed to process this specific chunk
# (e.g., {"detector_id": 123}). Deserialized using the job's
# work_chunk_model when processing.
work_chunk_info = JSONField()

# Current execution state (NOT_STARTED, IN_PROGRESS, COMPLETED).
# Coordinator uses this to schedule pending jobs and reset stuck ones.
status = models.CharField(
max_length=20,
choices=[(status.value, status.name.replace("_", " ").title()) for status in BulkJobState],
default=BulkJobState.NOT_STARTED,
db_index=True,
)

class Meta:
db_table = "workflow_engine_bulk_job_status"
app_label = "workflow_engine"
indexes = [
models.Index(
fields=["job_type", "status", "date_updated"], name="bulkjob_type_stat_upd_idx"
),
]

__repr__ = sentry.db.models.sane_repr("job_type", "batch_key", "status")
Loading
Loading