Skip to content

Commit 833b929

Browse files
committed
feat(jobsdb): introduce new partition_id column
🔒 Scanned for secrets using gitleaks 8.28.0
1 parent 0b868c1 commit 833b929

File tree

3 files changed

+63
-9
lines changed

3 files changed

+63
-9
lines changed

jobsdb/jobsdb.go

Lines changed: 58 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"encoding/json"
2929
"errors"
3030
"fmt"
31+
"math"
3132
"slices"
3233
"sort"
3334
"strconv"
@@ -37,6 +38,8 @@ import (
3738
"time"
3839
"unicode/utf8"
3940

41+
"github.com/spaolacci/murmur3"
42+
4043
obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
4144

4245
"github.com/google/uuid"
@@ -424,10 +427,28 @@ type JobT struct {
424427
LastJobStatus JobStatusT `json:"LastJobStatus"`
425428
Parameters json.RawMessage `json:"Parameters"`
426429
WorkspaceId string `json:"WorkspaceId"`
430+
PartitionID string `json:"PartitionId"`
427431
}
428432

429433
func (job *JobT) String() string {
430-
return fmt.Sprintf("JobID=%v, UserID=%v, CreatedAt=%v, ExpireAt=%v, CustomVal=%v, Parameters=%v, EventPayload=%v EventCount=%d", job.JobID, job.UserID, job.CreatedAt, job.ExpireAt, job.CustomVal, string(job.Parameters), string(job.EventPayload), job.EventCount)
434+
var sb strings.Builder
435+
sb.WriteString("JobID=")
436+
sb.WriteString(strconv.FormatInt(job.JobID, 10))
437+
sb.WriteString(", UserID=")
438+
sb.WriteString(job.UserID)
439+
sb.WriteString(", CreatedAt=")
440+
sb.WriteString(job.CreatedAt.String())
441+
sb.WriteString(", ExpireAt=")
442+
sb.WriteString(job.ExpireAt.String())
443+
sb.WriteString(", CustomVal=")
444+
sb.WriteString(job.CustomVal)
445+
sb.WriteString(", Parameters=")
446+
sb.WriteString(string(job.Parameters))
447+
sb.WriteString(", EventPayload=")
448+
sb.WriteString(string(job.EventPayload))
449+
sb.WriteString(" EventCount=")
450+
sb.WriteString(strconv.Itoa(job.EventCount))
451+
return sb.String()
431452
}
432453

433454
func (job *JobT) sanitizeJSON() error {
@@ -571,6 +592,8 @@ type Handle struct {
571592
maxOpenConnections int
572593
analyzeThreshold config.ValueLoader[int]
573594
MaxDSSize config.ValueLoader[int]
595+
numPartitions int // if zero or negative, no partitioning
596+
partitionFunction func(job *JobT) string
574597

575598
migration struct {
576599
maxMigrateOnce, maxMigrateDSProbe config.ValueLoader[int]
@@ -784,6 +807,25 @@ func WithJobMaxAge(jobMaxAge config.ValueLoader[time.Duration]) OptsFunc {
784807
}
785808
}
786809

810+
func WithNumPartitions(numPartitions int) OptsFunc {
811+
{
812+
return func(jd *Handle) {
813+
jd.conf.numPartitions = numPartitions
814+
if jd.conf.numPartitions > 0 && jd.conf.partitionFunction == nil {
815+
jd.conf.partitionFunction = func(job *JobT) string {
816+
var partitionIdx int
817+
if jd.conf.numPartitions > 1 {
818+
partitionIdx = int(math.Floor(
819+
float64(murmur3.Sum32([]byte(job.UserID))) /
820+
float64((1<<32)/jd.conf.numPartitions)))
821+
}
822+
return job.WorkspaceId + "-" + strconv.Itoa(partitionIdx)
823+
}
824+
}
825+
}
826+
}
827+
}
828+
787829
func NewForRead(tablePrefix string, opts ...OptsFunc) *Handle {
788830
return newOwnerType(Read, tablePrefix, opts...)
789831
}
@@ -1509,6 +1551,7 @@ func (jd *Handle) createDSTablesInTx(ctx context.Context, tx *Tx, newDS dataSetT
15091551
workspace_id TEXT NOT NULL DEFAULT '',
15101552
uuid UUID NOT NULL,
15111553
user_id TEXT NOT NULL,
1554+
partition_id TEXT NOT NULL DEFAULT '',
15121555
parameters JSONB NOT NULL,
15131556
custom_val VARCHAR(64) NOT NULL,
15141557
event_payload `+string(columnType)+` NOT NULL,
@@ -1535,7 +1578,12 @@ func (jd *Handle) createDSTablesInTx(ctx context.Context, tx *Tx, newDS dataSetT
15351578

15361579
func (jd *Handle) createDSIndicesInTx(ctx context.Context, tx *Tx, newDS dataSetT) error {
15371580
if _, err := tx.ExecContext(ctx, fmt.Sprintf(`CREATE INDEX "idx_%[1]s_ws" ON %[1]q (workspace_id)`, newDS.JobTable)); err != nil {
1538-
return fmt.Errorf("creating workspace index: %w", err)
1581+
return fmt.Errorf("creating workspace_id index: %w", err)
1582+
}
1583+
if jd.conf.numPartitions > 0 {
1584+
if _, err := tx.ExecContext(ctx, fmt.Sprintf(`CREATE INDEX "idx_%[1]s_partid" ON %[1]q (partition_id)`, newDS.JobTable)); err != nil {
1585+
return fmt.Errorf("creating partition_id index: %w", err)
1586+
}
15391587
}
15401588
if _, err := tx.ExecContext(ctx, fmt.Sprintf(`CREATE INDEX "idx_%[1]s_cv" ON %[1]q (custom_val)`, newDS.JobTable)); err != nil {
15411589
return fmt.Errorf("creating custom_val index: %w", err)
@@ -2038,7 +2086,7 @@ func (jd *Handle) doStoreJobsInTx(ctx context.Context, tx *Tx, ds dataSetT, jobL
20382086
var stmt *sql.Stmt
20392087
var err error
20402088

2041-
stmt, err = tx.PrepareContext(ctx, pq.CopyIn(ds.JobTable, "uuid", "user_id", "custom_val", "parameters", "event_payload", "event_count", "workspace_id"))
2089+
stmt, err = tx.PrepareContext(ctx, pq.CopyIn(ds.JobTable, "uuid", "user_id", "custom_val", "parameters", "event_payload", "event_count", "workspace_id", "partition_id"))
20422090
if err != nil {
20432091
return err
20442092
}
@@ -2049,8 +2097,11 @@ func (jd *Handle) doStoreJobsInTx(ctx context.Context, tx *Tx, ds dataSetT, jobL
20492097
if job.EventCount > 1 {
20502098
eventCount = job.EventCount
20512099
}
2052-
2053-
if _, err = stmt.ExecContext(ctx, job.UUID, job.UserID, job.CustomVal, string(job.Parameters), string(job.EventPayload), eventCount, job.WorkspaceId); err != nil {
2100+
// Assign partition ID if not already assigned
2101+
if job.PartitionID == "" && jd.conf.numPartitions > 0 {
2102+
job.PartitionID = jd.conf.partitionFunction(job)
2103+
}
2104+
if _, err = stmt.ExecContext(ctx, job.UUID, job.UserID, job.CustomVal, string(job.Parameters), string(job.EventPayload), eventCount, job.WorkspaceId, job.PartitionID); err != nil {
20542105
return err
20552106
}
20562107
}
@@ -2198,7 +2249,7 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, lastDS bool, param
21982249
var rows *sql.Rows
21992250
sqlStatement := fmt.Sprintf(`SELECT
22002251
jobs.job_id, jobs.uuid, jobs.user_id, jobs.parameters, jobs.custom_val, jobs.event_payload, jobs.event_count,
2201-
jobs.created_at, jobs.expire_at, jobs.workspace_id,
2252+
jobs.created_at, jobs.expire_at, jobs.workspace_id, jobs.partition_id,
22022253
octet_length(jobs.event_payload::text) as payload_size,
22032254
sum(jobs.event_count) over (order by jobs.job_id asc) as running_event_counts,
22042255
sum(octet_length(jobs.event_payload::text)) over (order by jobs.job_id) as running_payload_size,
@@ -2265,7 +2316,7 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, lastDS bool, param
22652316
var jsErrorResponse []byte
22662317
var jsParameters []byte
22672318
err := rows.Scan(&job.JobID, &job.UUID, &job.UserID, &job.Parameters, &job.CustomVal,
2268-
&payload, &job.EventCount, &job.CreatedAt, &job.ExpireAt, &job.WorkspaceId, &payloadSize, &runningEventCount, &runningPayloadSize,
2319+
&payload, &job.EventCount, &job.CreatedAt, &job.ExpireAt, &job.WorkspaceId, &job.PartitionID, &payloadSize, &runningEventCount, &runningPayloadSize,
22692320
&jsState, &jsAttemptNum,
22702321
&jsExecTime, &jsRetryTime,
22712322
&jsErrorCode, &jsErrorResponse, &jsParameters)

jobsdb/migration.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -551,8 +551,8 @@ func (jd *Handle) migrateJobsInTx(ctx context.Context, tx *Tx, srcDS, destDS dat
551551
`with last_status as (select * from "v_last_%[1]s"),
552552
inserted_jobs as
553553
(
554-
insert into %[3]q (job_id, workspace_id, uuid, user_id, custom_val, parameters, event_payload, event_count, created_at, expire_at)
555-
(select j.job_id, j.workspace_id, j.uuid, j.user_id, j.custom_val, j.parameters, %[6]s, j.event_count, j.created_at, j.expire_at from %[2]q j left join last_status js on js.job_id = j.job_id
554+
insert into %[3]q (job_id, workspace_id, uuid, user_id, partition_id, custom_val, parameters, event_payload, event_count, created_at, expire_at)
555+
(select j.job_id, j.workspace_id, j.uuid, j.user_id, j.partition_id, j.custom_val, j.parameters, %[6]s, j.event_count, j.created_at, j.expire_at from %[2]q j left join last_status js on js.job_id = j.job_id
556556
where js.job_id is null or js.job_state = ANY('{%[5]s}') order by j.job_id) returning job_id
557557
),
558558
insertedStatuses as
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{{range .Datasets}}
2+
ALTER TABLE "{{$.Prefix}}_jobs_{{.}}" ADD COLUMN IF NOT EXISTS partition_id TEXT NOT NULL DEFAULT '';
3+
{{end}}

0 commit comments

Comments
 (0)