From 34586a9c807788c0a42b073d88dba5cb5fe16b29 Mon Sep 17 00:00:00 2001 From: Aris Tzoumas Date: Fri, 14 Nov 2025 13:15:41 +0200 Subject: [PATCH] feat(jobsdb): introduce new partition_id column MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🔒 Scanned for secrets using gitleaks 8.28.0 --- app/apphandlers/embeddedAppHandler.go | 3 + app/apphandlers/gatewayAppHandler.go | 1 + app/apphandlers/processorAppHandler.go | 2 + go.mod | 7 +- go.sum | 11 ++- jobsdb/integration_test.go | 1 + jobsdb/jobsdb.go | 74 +++++++++++++++-- jobsdb/jobsdb_parameters_cache_test.go | 79 +++++++++---------- jobsdb/jobsdb_tablemigrate_test.go | 56 +++++++++++++ jobsdb/jobsdb_test.go | 49 ++++++++++++ jobsdb/migration.go | 4 +- jobsdb/setup.go | 1 + services/sql-migrator/migrator.go | 9 ++- .../jobsdb/000013_partition_id_column_up.tmpl | 3 + 14 files changed, 240 insertions(+), 60 deletions(-) create mode 100644 jobsdb/jobsdb_tablemigrate_test.go create mode 100644 sql/migrations/jobsdb/000013_partition_id_column_up.tmpl diff --git a/app/apphandlers/embeddedAppHandler.go b/app/apphandlers/embeddedAppHandler.go index 2437ae6b2..29f8a3d89 100644 --- a/app/apphandlers/embeddedAppHandler.go +++ b/app/apphandlers/embeddedAppHandler.go @@ -160,6 +160,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) jobsdb.WithClearDB(options.ClearDB), jobsdb.WithStats(statsFactory), jobsdb.WithDBHandle(dbPool), + jobsdb.WithNumPartitions(config.GetIntVar(64, 1, "JobsDB.partitionCount")), ) defer gatewayDB.Close() if err = gatewayDB.Start(); err != nil { @@ -185,6 +186,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) jobsdb.WithSkipMaintenanceErr(config.GetBool("Router.jobsDB.skipMaintenanceError", false)), jobsdb.WithStats(statsFactory), jobsdb.WithDBHandle(dbPool), + jobsdb.WithNumPartitions(config.GetIntVar(64, 1, "JobsDB.partitionCount")), ) defer routerDB.Close() batchRouterDB := jobsdb.NewForReadWrite( @@ -194,6 +196,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) jobsdb.WithSkipMaintenanceErr(config.GetBool("BatchRouter.jobsDB.skipMaintenanceError", false)), jobsdb.WithStats(statsFactory), jobsdb.WithDBHandle(dbPool), + jobsdb.WithNumPartitions(config.GetIntVar(64, 1, "JobsDB.partitionCount")), ) defer batchRouterDB.Close() diff --git a/app/apphandlers/gatewayAppHandler.go b/app/apphandlers/gatewayAppHandler.go index 28f46260a..59d6f811c 100644 --- a/app/apphandlers/gatewayAppHandler.go +++ b/app/apphandlers/gatewayAppHandler.go @@ -82,6 +82,7 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options) jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)), jobsdb.WithStats(statsFactory), jobsdb.WithDBHandle(dbPool), + jobsdb.WithNumPartitions(config.GetIntVar(64, 1, "JobsDB.partitionCount")), ) defer gatewayDB.Close() diff --git a/app/apphandlers/processorAppHandler.go b/app/apphandlers/processorAppHandler.go index 27896c666..c713d57e9 100644 --- a/app/apphandlers/processorAppHandler.go +++ b/app/apphandlers/processorAppHandler.go @@ -173,6 +173,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options jobsdb.WithSkipMaintenanceErr(config.GetBool("Router.jobsDB.skipMaintenanceError", false)), jobsdb.WithStats(statsFactory), jobsdb.WithDBHandle(dbPool), + jobsdb.WithNumPartitions(config.GetIntVar(64, 1, "JobsDB.partitionCount")), ) defer routerDB.Close() batchRouterDB := jobsdb.NewForReadWrite( @@ -182,6 +183,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options jobsdb.WithSkipMaintenanceErr(config.GetBool("BatchRouter.jobsDB.skipMaintenanceError", false)), jobsdb.WithStats(statsFactory), jobsdb.WithDBHandle(dbPool), + jobsdb.WithNumPartitions(config.GetIntVar(64, 1, "JobsDB.partitionCount")), ) defer batchRouterDB.Close() schemaDB := jobsdb.NewForReadWrite( diff --git a/go.mod b/go.mod index b6ca54ddc..569f47ab9 100644 --- a/go.mod +++ b/go.mod @@ -47,6 +47,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/lambda v1.76.0 github.com/aws/aws-sdk-go-v2/service/personalizeevents v1.29.0 github.com/aws/aws-sdk-go-v2/service/s3 v1.90.2 + github.com/aws/aws-sdk-go-v2/service/sts v1.40.2 github.com/aws/smithy-go v1.23.2 github.com/bufbuild/httplb v0.4.1 github.com/cenkalti/backoff v2.2.1+incompatible @@ -56,6 +57,7 @@ require ( github.com/dgraph-io/badger/v4 v4.8.0 github.com/dlclark/regexp2 v1.11.5 github.com/docker/docker v28.3.3+incompatible + github.com/evanphx/json-patch v0.5.2 github.com/evanphx/json-patch/v5 v5.9.11 github.com/fsouza/fake-gcs-server v1.52.2 github.com/go-chi/chi/v5 v5.2.3 @@ -96,7 +98,7 @@ require ( github.com/rudderlabs/bing-ads-go-sdk v0.2.3 github.com/rudderlabs/compose-test v0.1.3 github.com/rudderlabs/keydb v1.3.0 - github.com/rudderlabs/rudder-go-kit v0.66.0 + github.com/rudderlabs/rudder-go-kit v0.66.1 github.com/rudderlabs/rudder-observability-kit v0.0.6 github.com/rudderlabs/rudder-schemas v0.7.0 github.com/rudderlabs/rudder-transformer/go v0.0.0-20250707171833-9cd525669b1b @@ -187,7 +189,6 @@ require ( github.com/aws/aws-sdk-go-v2/service/redshiftdata v1.33.1 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.30.3 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.7 // indirect - github.com/aws/aws-sdk-go-v2/service/sts v1.40.2 github.com/beorn7/perks v1.0.1 // indirect github.com/bitfield/gotestdox v0.2.2 // indirect github.com/bits-and-blooms/bitset v1.14.3 // indirect @@ -214,7 +215,6 @@ require ( github.com/dvsekhvalnov/jose2go v1.7.0 // indirect github.com/envoyproxy/go-control-plane/envoy v1.35.0 // indirect github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect - github.com/evanphx/json-patch v5.9.11+incompatible github.com/fatih/color v1.18.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fsnotify/fsnotify v1.9.0 // indirect @@ -326,6 +326,7 @@ require ( github.com/tinylib/msgp v1.3.0 // indirect github.com/tklauser/go-sysconf v0.3.14 // indirect github.com/tklauser/numcpus v0.8.0 // indirect + github.com/twmb/murmur3 v1.1.8 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect diff --git a/go.sum b/go.sum index 79df4d4c3..302b3c5b5 100644 --- a/go.sum +++ b/go.sum @@ -531,8 +531,8 @@ github.com/envoyproxy/go-control-plane/ratelimit v0.1.0/go.mod h1:Wk+tMFAFbCXaJP github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v1.2.1 h1:DEo3O99U8j4hBFwbJfrz9VtgcDfUKS7KJ7spH3d86P8= github.com/envoyproxy/protoc-gen-validate v1.2.1/go.mod h1:d/C80l/jxXLdfEIhX1W2TmLfsJ31lvEjwamM4DxlWXU= -github.com/evanphx/json-patch v5.9.11+incompatible h1:ixHHqfcGvxhWkniF1tWxBHA0yb4Z+d1UQi45df52xW8= -github.com/evanphx/json-patch v5.9.11+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= +github.com/evanphx/json-patch v0.5.2 h1:xVCHIVMUu1wtM/VkR9jVZ45N3FhZfYMMYGorLCR8P3k= +github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ= github.com/evanphx/json-patch/v5 v5.9.11 h1:/8HVnzMq13/3x9TPvjG08wUGqBTmZBsCWzjTM0wiaDU= github.com/evanphx/json-patch/v5 v5.9.11/go.mod h1:3j+LviiESTElxA4p3EMKAB9HXj3/XEtnUf6OZxqIQTM= github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= @@ -909,6 +909,7 @@ github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZ github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/jeremywohl/flatten v1.0.1 h1:LrsxmB3hfwJuE+ptGOijix1PIfOoKLJ3Uee/mzbgtrs= github.com/jeremywohl/flatten v1.0.1/go.mod h1:4AmD/VxjWcI5SRB0n6szE2A6s2fsNHDLO0nAlMHgfLQ= +github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= @@ -1205,8 +1206,8 @@ github.com/rudderlabs/keydb v1.3.0 h1:lbVFYhMAW8DUR79i3p49AyGQ1ytsH2uuay9bA0doH6 github.com/rudderlabs/keydb v1.3.0/go.mod h1:PHHhQmlHVgIJht22E0Sr1ythB3NBiO2jUkr/5aBf0IA= github.com/rudderlabs/parquet-go v0.0.3 h1:/zgRj929pGKHsthc0kw8stVEcFu1JUcpxDRlhxjSLic= github.com/rudderlabs/parquet-go v0.0.3/go.mod h1:WmwBOdvwpXl2aZGRk3NxxgzC/DaWGfax3jrCRhKhtSo= -github.com/rudderlabs/rudder-go-kit v0.66.0 h1:gZbPUEhjgrWOl+K91/yX7rgMa/t9ymf/TZjCNg8OAFc= -github.com/rudderlabs/rudder-go-kit v0.66.0/go.mod h1:Ty2372mkenxb52j9ia3Qdo8d/ea/qffxMaKPdWxDPUQ= +github.com/rudderlabs/rudder-go-kit v0.66.1 h1:yWLN/2QkwgTsr+ZA6hBfKO3BSq1aHuUKQhzlpLZ5DIc= +github.com/rudderlabs/rudder-go-kit v0.66.1/go.mod h1:5w7w1e38HB3bAYh7d4+jrSrk57pYNvLcCadFI0+R26o= github.com/rudderlabs/rudder-observability-kit v0.0.6 h1:xIA/1Sp38B542EYzxR7qUfNGJwsQCpmBnl6h5ul0AHA= github.com/rudderlabs/rudder-observability-kit v0.0.6/go.mod h1:nR3GvY7HvuBaBqOKFfzLP9uYZu7OpzMqW2eeT2ikXtU= github.com/rudderlabs/rudder-schemas v0.7.0 h1:hKShHYpbIldE1Q591vodI6iaAZ/IUOyC1DqUUJZysNU= @@ -1345,6 +1346,8 @@ github.com/tonistiigi/vt100 v0.0.0-20240514184818-90bafcd6abab/go.mod h1:ulncasL github.com/trinodb/trino-go-client v0.328.0 h1:X6hrGGysA3nvyVcz8kJbBS98srLNTNsnNYwRkMC1atA= github.com/trinodb/trino-go-client v0.328.0/go.mod h1:e/nck9W6hy+9bbyZEpXKFlNsufn3lQGpUgDL1d5f1FI= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= +github.com/twmb/murmur3 v1.1.8 h1:8Yt9taO/WN3l08xErzjeschgZU2QSrwm1kclYq+0aRg= +github.com/twmb/murmur3 v1.1.8/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= github.com/urfave/cli/v2 v2.27.7 h1:bH59vdhbjLv3LAvIu6gd0usJHgoTTPhCFib8qqOwXYU= github.com/urfave/cli/v2 v2.27.7/go.mod h1:CyNAG/xg+iAOg0N4MPGZqVmv2rCoP267496AOXUZjA4= diff --git a/jobsdb/integration_test.go b/jobsdb/integration_test.go index c85bdf315..e6d0f49b8 100644 --- a/jobsdb/integration_test.go +++ b/jobsdb/integration_test.go @@ -1042,6 +1042,7 @@ func TestCreateDS(t *testing.T) { workspace_id TEXT NOT NULL DEFAULT '', uuid UUID NOT NULL, user_id TEXT NOT NULL, + partition_id TEXT NOT NULL DEFAULT '', parameters JSONB NOT NULL, custom_val VARCHAR(64) NOT NULL, event_payload JSONB NOT NULL, diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index 48a05c0a0..ef65d07e3 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -48,6 +48,7 @@ import ( "github.com/rudderlabs/rudder-go-kit/bytesize" "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-go-kit/partmap" "github.com/rudderlabs/rudder-go-kit/stats" "github.com/rudderlabs/rudder-go-kit/stats/collectors" @@ -424,10 +425,28 @@ type JobT struct { LastJobStatus JobStatusT `json:"LastJobStatus"` Parameters json.RawMessage `json:"Parameters"` WorkspaceId string `json:"WorkspaceId"` + PartitionID string `json:"PartitionId"` } func (job *JobT) String() string { - return fmt.Sprintf("JobID=%v, UserID=%v, CreatedAt=%v, ExpireAt=%v, CustomVal=%v, Parameters=%v, EventPayload=%v EventCount=%d", job.JobID, job.UserID, job.CreatedAt, job.ExpireAt, job.CustomVal, string(job.Parameters), string(job.EventPayload), job.EventCount) + var sb strings.Builder + sb.WriteString("JobID=") + sb.WriteString(strconv.FormatInt(job.JobID, 10)) + sb.WriteString(", UserID=") + sb.WriteString(job.UserID) + sb.WriteString(", CreatedAt=") + sb.WriteString(job.CreatedAt.String()) + sb.WriteString(", ExpireAt=") + sb.WriteString(job.ExpireAt.String()) + sb.WriteString(", CustomVal=") + sb.WriteString(job.CustomVal) + sb.WriteString(", Parameters=") + sb.WriteString(string(job.Parameters)) + sb.WriteString(", EventPayload=") + sb.WriteString(string(job.EventPayload)) + sb.WriteString(" EventCount=") + sb.WriteString(strconv.Itoa(job.EventCount)) + return sb.String() } func (job *JobT) sanitizeJSON() error { @@ -571,6 +590,9 @@ type Handle struct { maxOpenConnections int analyzeThreshold config.ValueLoader[int] MaxDSSize config.ValueLoader[int] + numPartitions int // if zero or negative, no partitioning + partitionFunction func(job *JobT) string + dbTablesVersion int // version of the database tables schema (0 means latest) migration struct { maxMigrateOnce, maxMigrateDSProbe config.ValueLoader[int] @@ -784,6 +806,35 @@ func WithJobMaxAge(jobMaxAge config.ValueLoader[time.Duration]) OptsFunc { } } +func WithNumPartitions(numPartitions int) OptsFunc { + { + return func(jd *Handle) { + // numPartitions must be greater than 0 and power-of-two + if numPartitions < 1 || (numPartitions&(numPartitions-1)) != 0 { + panic(fmt.Errorf("invalid number of jobsdb partitions, needs to be power of two: %d", numPartitions)) + } + jd.conf.numPartitions = numPartitions + // default partition function using a 32-bit key space and Murmur3 hash + if jd.conf.partitionFunction == nil { + jd.conf.partitionFunction = func(job *JobT) string { + var partitionIdx uint32 + if jd.conf.numPartitions > 0 { + partitionIdx, _ = partmap.Murmur3Partition32(job.UserID, uint32(jd.conf.numPartitions)) + } + return job.WorkspaceId + "-" + strconv.Itoa(int(partitionIdx)) + } + } + } + } +} + +// withDatabaseTablesVersion sets the database tables version to use (internal use only for verifying database table migrations) +func withDatabaseTablesVersion(dbVersion int) OptsFunc { + return func(jd *Handle) { + jd.conf.dbTablesVersion = dbVersion + } +} + func NewForRead(tablePrefix string, opts ...OptsFunc) *Handle { return newOwnerType(Read, tablePrefix, opts...) } @@ -1509,6 +1560,7 @@ func (jd *Handle) createDSTablesInTx(ctx context.Context, tx *Tx, newDS dataSetT workspace_id TEXT NOT NULL DEFAULT '', uuid UUID NOT NULL, user_id TEXT NOT NULL, + partition_id TEXT NOT NULL DEFAULT '', parameters JSONB NOT NULL, custom_val VARCHAR(64) NOT NULL, event_payload `+string(columnType)+` NOT NULL, @@ -1535,7 +1587,12 @@ func (jd *Handle) createDSTablesInTx(ctx context.Context, tx *Tx, newDS dataSetT func (jd *Handle) createDSIndicesInTx(ctx context.Context, tx *Tx, newDS dataSetT) error { if _, err := tx.ExecContext(ctx, fmt.Sprintf(`CREATE INDEX "idx_%[1]s_ws" ON %[1]q (workspace_id)`, newDS.JobTable)); err != nil { - return fmt.Errorf("creating workspace index: %w", err) + return fmt.Errorf("creating workspace_id index: %w", err) + } + if jd.conf.numPartitions > 0 { + if _, err := tx.ExecContext(ctx, fmt.Sprintf(`CREATE INDEX "idx_%[1]s_partid" ON %[1]q (partition_id)`, newDS.JobTable)); err != nil { + return fmt.Errorf("creating partition_id index: %w", err) + } } if _, err := tx.ExecContext(ctx, fmt.Sprintf(`CREATE INDEX "idx_%[1]s_cv" ON %[1]q (custom_val)`, newDS.JobTable)); err != nil { return fmt.Errorf("creating custom_val index: %w", err) @@ -2038,7 +2095,7 @@ func (jd *Handle) doStoreJobsInTx(ctx context.Context, tx *Tx, ds dataSetT, jobL var stmt *sql.Stmt var err error - stmt, err = tx.PrepareContext(ctx, pq.CopyIn(ds.JobTable, "uuid", "user_id", "custom_val", "parameters", "event_payload", "event_count", "workspace_id")) + stmt, err = tx.PrepareContext(ctx, pq.CopyIn(ds.JobTable, "uuid", "user_id", "custom_val", "parameters", "event_payload", "event_count", "workspace_id", "partition_id")) if err != nil { return err } @@ -2049,8 +2106,11 @@ func (jd *Handle) doStoreJobsInTx(ctx context.Context, tx *Tx, ds dataSetT, jobL if job.EventCount > 1 { eventCount = job.EventCount } - - if _, err = stmt.ExecContext(ctx, job.UUID, job.UserID, job.CustomVal, string(job.Parameters), string(job.EventPayload), eventCount, job.WorkspaceId); err != nil { + // Assign partition ID if not already assigned + if job.PartitionID == "" && jd.conf.numPartitions > 0 { + job.PartitionID = jd.conf.partitionFunction(job) + } + if _, err = stmt.ExecContext(ctx, job.UUID, job.UserID, job.CustomVal, string(job.Parameters), string(job.EventPayload), eventCount, job.WorkspaceId, job.PartitionID); err != nil { return err } } @@ -2198,7 +2258,7 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, lastDS bool, param var rows *sql.Rows sqlStatement := fmt.Sprintf(`SELECT jobs.job_id, jobs.uuid, jobs.user_id, jobs.parameters, jobs.custom_val, jobs.event_payload, jobs.event_count, - jobs.created_at, jobs.expire_at, jobs.workspace_id, + jobs.created_at, jobs.expire_at, jobs.workspace_id, jobs.partition_id, octet_length(jobs.event_payload::text) as payload_size, sum(jobs.event_count) over (order by jobs.job_id asc) as running_event_counts, sum(octet_length(jobs.event_payload::text)) over (order by jobs.job_id) as running_payload_size, @@ -2265,7 +2325,7 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, lastDS bool, param var jsErrorResponse []byte var jsParameters []byte err := rows.Scan(&job.JobID, &job.UUID, &job.UserID, &job.Parameters, &job.CustomVal, - &payload, &job.EventCount, &job.CreatedAt, &job.ExpireAt, &job.WorkspaceId, &payloadSize, &runningEventCount, &runningPayloadSize, + &payload, &job.EventCount, &job.CreatedAt, &job.ExpireAt, &job.WorkspaceId, &job.PartitionID, &payloadSize, &runningEventCount, &runningPayloadSize, &jsState, &jsAttemptNum, &jsExecTime, &jsRetryTime, &jsErrorCode, &jsErrorResponse, &jsParameters) diff --git a/jobsdb/jobsdb_parameters_cache_test.go b/jobsdb/jobsdb_parameters_cache_test.go index 4910bff07..ff4510c79 100644 --- a/jobsdb/jobsdb_parameters_cache_test.go +++ b/jobsdb/jobsdb_parameters_cache_test.go @@ -4,6 +4,7 @@ import ( "context" "strconv" "sync" + "sync/atomic" "testing" "time" @@ -13,12 +14,12 @@ import ( ) type cpvMockJobsdb struct { - calls int + calls atomic.Int64 JobsDB } func (j *cpvMockJobsdb) GetDistinctParameterValues(ctx context.Context, parameter ParameterName, customVal string) ([]string, error) { - j.calls++ + j.calls.Add(1) if customVal == "" { return []string{"value1", "value2"}, nil } @@ -41,74 +42,66 @@ func TestCachingDistinctParameterValuesJobsdb(t *testing.T) { // First call should fetch from the mock JobsDB values, err := cachingJobsdb.GetDistinctParameterValues(ctx, testParameter, "") require.NoError(t, err) - require.Equal(t, 1, jobsdb.calls) + require.EqualValues(t, 1, jobsdb.calls.Load()) require.Equal(t, []string{"value1", "value2"}, values) // Second call should hit the cache values, err = cachingJobsdb.GetDistinctParameterValues(ctx, testParameter, "") require.NoError(t, err) - require.Equal(t, 1, jobsdb.calls) + require.EqualValues(t, 1, jobsdb.calls.Load()) require.Equal(t, []string{"value1", "value2"}, values) values, err = cachingJobsdb.GetDistinctParameterValues(ctx, testParameter, "someCustomVal") require.NoError(t, err) - require.Equal(t, 2, jobsdb.calls) + require.EqualValues(t, 2, jobsdb.calls.Load()) require.Equal(t, []string{"value3", "value4"}, values) time.Sleep(100 * time.Millisecond) values, err = cachingJobsdb.GetDistinctParameterValues(ctx, testParameter, "") require.NoError(t, err) - require.Equal(t, 3, jobsdb.calls) + require.EqualValues(t, 3, jobsdb.calls.Load()) require.Equal(t, []string{"value1", "value2"}, values) }) t.Run("multiple goroutines and parameters", func(t *testing.T) { jobsdb := &cpvMockJobsdb{} - + ttl := 100 * time.Millisecond cachingJobsdb := NewCachingDistinctParameterValuesJobsdb( - config.SingleValueLoader(100*time.Millisecond), + config.SingleValueLoader(ttl), jobsdb, ) ctx := context.Background() var wg sync.WaitGroup - wg.Add(20) - for i := range 10 { - go func(i int) { - defer wg.Done() - values, err := cachingJobsdb.GetDistinctParameterValues(ctx, parameterName("test_parameter_"+strconv.Itoa(i)), "") - require.NoError(t, err) - require.Equal(t, []string{"value1", "value2"}, values) - values, err = cachingJobsdb.GetDistinctParameterValues(ctx, parameterName("test_parameter_"+strconv.Itoa(i)), "someCustomVal") - require.NoError(t, err) - require.Equal(t, []string{"value3", "value4"}, values) - time.Sleep(100 * time.Millisecond) - values, err = cachingJobsdb.GetDistinctParameterValues(ctx, parameterName("test_parameter_"+strconv.Itoa(i)), "") - require.NoError(t, err) - require.Equal(t, []string{"value1", "value2"}, values) - values, err = cachingJobsdb.GetDistinctParameterValues(ctx, parameterName("test_parameter_"+strconv.Itoa(i)), "someCustomVal") - require.NoError(t, err) - require.Equal(t, []string{"value3", "value4"}, values) - }(i) - go func(i int) { - defer wg.Done() - values, err := cachingJobsdb.GetDistinctParameterValues(ctx, parameterName("test_parameter_"+strconv.Itoa(i)), "") - require.NoError(t, err) - require.Equal(t, []string{"value1", "value2"}, values) - values, err = cachingJobsdb.GetDistinctParameterValues(ctx, parameterName("test_parameter_"+strconv.Itoa(i)), "someCustomVal") - require.NoError(t, err) - require.Equal(t, []string{"value3", "value4"}, values) - time.Sleep(100 * time.Millisecond) - values, err = cachingJobsdb.GetDistinctParameterValues(ctx, parameterName("test_parameter_"+strconv.Itoa(i)), "") - require.NoError(t, err) - require.Equal(t, []string{"value1", "value2"}, values) - values, err = cachingJobsdb.GetDistinctParameterValues(ctx, parameterName("test_parameter_"+strconv.Itoa(i)), "someCustomVal") - require.NoError(t, err) - require.Equal(t, []string{"value3", "value4"}, values) - }(i) + + // each goroutine will call GetDistinctParameterValues 4 times + run := func(i int) { + values, err := cachingJobsdb.GetDistinctParameterValues(ctx, parameterName("test_parameter_"+strconv.Itoa(i)), "") + require.NoError(t, err) + require.Equal(t, []string{"value1", "value2"}, values) + values, err = cachingJobsdb.GetDistinctParameterValues(ctx, parameterName("test_parameter_"+strconv.Itoa(i)), "someCustomVal") + require.NoError(t, err) + require.Equal(t, []string{"value3", "value4"}, values) + time.Sleep(2 * ttl) + + values, err = cachingJobsdb.GetDistinctParameterValues(ctx, parameterName("test_parameter_"+strconv.Itoa(i)), "") + require.NoError(t, err) + require.Equal(t, []string{"value1", "value2"}, values) + values, err = cachingJobsdb.GetDistinctParameterValues(ctx, parameterName("test_parameter_"+strconv.Itoa(i)), "someCustomVal") + require.NoError(t, err) + require.Equal(t, []string{"value3", "value4"}, values) + } + iterations := 10 + for i := range iterations { + wg.Go(func() { + run(i) + }) + wg.Go(func() { + run(i) + }) } wg.Wait() - require.Equal(t, 40, jobsdb.calls) + require.EqualValues(t, 4*iterations, jobsdb.calls.Load()) }) } diff --git a/jobsdb/jobsdb_tablemigrate_test.go b/jobsdb/jobsdb_tablemigrate_test.go new file mode 100644 index 000000000..f96825e1e --- /dev/null +++ b/jobsdb/jobsdb_tablemigrate_test.go @@ -0,0 +1,56 @@ +package jobsdb + +import ( + "context" + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" +) + +func TestJobsDBTableMigrations(t *testing.T) { + t.Run("13_partition_id_column", func(t *testing.T) { + postgres := startPostgres(t) + + // Start JobsDB with DB version 12 (without partition_id column) to create initial tables + jd := NewForWrite("test", WithDBHandle(postgres.DB), withDatabaseTablesVersion(12)) + require.NoError(t, jd.Start(), "it should be able to start JobsDB") + jd.TearDown() + + // Add some data to ensure migration works fine with existing data + _, err := postgres.DB.Exec(` + INSERT INTO test_jobs_1 + (uuid, workspace_id, user_id, custom_val, parameters, event_payload, event_count) + VALUES + (gen_random_uuid(), 'worskpace-1', 'user-1', 'gw', '{"source_id": "src1"}', '{}', 1)`) + require.NoError(t, err, "it should be able to insert initial data") + + // Start JobsDB again to trigger migration to add partition_id column + jd = NewForReadWrite("test", WithDBHandle(postgres.DB)) + require.NoError(t, jd.Start(), "it should be able to start JobsDB with latest table versions") + defer jd.TearDown() + + // Verify that partition_id column exists, but is empty for existing rows + unprocessed, err := jd.GetUnprocessed(context.Background(), GetQueryParams{JobsLimit: 1}) + require.NoError(t, err, "it should be able to get unprocessed jobs") + require.Len(t, unprocessed.Jobs, 1) + require.Empty(t, unprocessed.Jobs[0].PartitionID) + + // Store a new job and verify partition_id is set + require.NoError(t, jd.Store(context.Background(), []*JobT{{ + UUID: uuid.New(), + WorkspaceId: "workspace-1", + UserID: "user-1", + PartitionID: "partition-1", + CustomVal: "gw", + Parameters: []byte(`{"source_id": "src2"}`), + EventPayload: []byte(`{}`), + EventCount: 1, + }})) + + unprocessed, err = jd.GetUnprocessed(context.Background(), GetQueryParams{JobsLimit: 2}) + require.NoError(t, err, "it should be able to get unprocessed jobs") + require.Len(t, unprocessed.Jobs, 2) + require.Equal(t, "partition-1", unprocessed.Jobs[1].PartitionID) + }) +} diff --git a/jobsdb/jobsdb_test.go b/jobsdb/jobsdb_test.go index dc4c8b888..da06ea4b3 100644 --- a/jobsdb/jobsdb_test.go +++ b/jobsdb/jobsdb_test.go @@ -1774,6 +1774,55 @@ func TestUpdateJobStatus(t *testing.T) { }) } +func TestPartitionedJobsDB(t *testing.T) { + t.Run("without partitioning", func(t *testing.T) { + postgres := startPostgres(t) + db := NewForReadWrite("partitioning", WithDBHandle(postgres.DB)) + require.NoError(t, db.Start(), "it should be able to start the jobs db handle") + defer db.TearDown() + err := db.Store(context.Background(), []*JobT{ + { + UUID: uuid.New(), + WorkspaceId: "workspace-1", + UserID: "user-1", + CustomVal: "custom-val-1", + Parameters: []byte(`{}`), + EventPayload: []byte(`{}`), + EventCount: 1, + }, + }) + require.NoError(t, err, "it should be able to store job") + unprocessed, err := db.GetUnprocessed(context.Background(), GetQueryParams{JobsLimit: 1}) + require.NoError(t, err, "it should be able to get unprocessed jobs") + require.Equal(t, 1, len(unprocessed.Jobs), "it should return the stored job") + require.Empty(t, unprocessed.Jobs[0].PartitionID, "it should have empty partition ID") + }) + + t.Run("with partitioning", func(t *testing.T) { + postgres := startPostgres(t) + db := NewForReadWrite("partitioning", WithNumPartitions(64), WithDBHandle(postgres.DB)) + require.NoError(t, db.Start(), "it should be able to start the jobs db handle") + defer db.TearDown() + err := db.Store(context.Background(), []*JobT{ + { + UUID: uuid.New(), + WorkspaceId: "workspace-1", + UserID: "user-1", + CustomVal: "custom-val-1", + Parameters: []byte(`{}`), + EventPayload: []byte(`{}`), + EventCount: 1, + }, + }) + require.NoError(t, err, "it should be able to store job") + unprocessed, err := db.GetUnprocessed(context.Background(), GetQueryParams{JobsLimit: 1}) + require.NoError(t, err, "it should be able to get unprocessed jobs") + require.Equal(t, 1, len(unprocessed.Jobs), "it should return the stored job") + require.NotEmpty(t, unprocessed.Jobs[0].PartitionID, "it should have non-empty partition ID") + require.Equal(t, "workspace-1-62", unprocessed.Jobs[0].PartitionID, "it should have correct partition ID including the workspace ID and partition number") + }) +} + type testingT interface { Errorf(format string, args ...interface{}) FailNow() diff --git a/jobsdb/migration.go b/jobsdb/migration.go index 2e6fbbe10..e9cd63cc4 100644 --- a/jobsdb/migration.go +++ b/jobsdb/migration.go @@ -551,8 +551,8 @@ func (jd *Handle) migrateJobsInTx(ctx context.Context, tx *Tx, srcDS, destDS dat `with last_status as (select * from "v_last_%[1]s"), inserted_jobs as ( - insert into %[3]q (job_id, workspace_id, uuid, user_id, custom_val, parameters, event_payload, event_count, created_at, expire_at) - (select j.job_id, j.workspace_id, j.uuid, j.user_id, j.custom_val, j.parameters, %[6]s, j.event_count, j.created_at, j.expire_at from %[2]q j left join last_status js on js.job_id = j.job_id + insert into %[3]q (job_id, workspace_id, uuid, user_id, partition_id, custom_val, parameters, event_payload, event_count, created_at, expire_at) + (select j.job_id, j.workspace_id, j.uuid, j.user_id, j.partition_id, j.custom_val, j.parameters, %[6]s, j.event_count, j.created_at, j.expire_at from %[2]q j left join last_status js on js.job_id = j.job_id where js.job_id is null or js.job_state = ANY('{%[5]s}') order by j.job_id) returning job_id ), insertedStatuses as diff --git a/jobsdb/setup.go b/jobsdb/setup.go index 1deb11954..b86befa8a 100644 --- a/jobsdb/setup.go +++ b/jobsdb/setup.go @@ -28,6 +28,7 @@ func (jd *Handle) setupDatabaseTables(templateData map[string]interface{}) { Handle: jd.dbHandle, MigrationsTable: jd.SchemaMigrationTable(), ShouldForceSetLowerVersion: jd.config.GetBool("SQLMigrator.forceSetLowerVersion", true), + Version: uint(jd.conf.dbTablesVersion), } // execute any necessary migrations if err := m.MigrateFromTemplates("jobsdb", templateData); err != nil { diff --git a/services/sql-migrator/migrator.go b/services/sql-migrator/migrator.go index f5c1c59f3..851a45123 100644 --- a/services/sql-migrator/migrator.go +++ b/services/sql-migrator/migrator.go @@ -35,6 +35,9 @@ type Migrator struct { // Indicates if all migrations should be run ignoring the current version in MigrationsTable RunAlways bool + + // Migration target version override. 0 means latest. + Version uint } var pkgLogger logger.Logger @@ -161,7 +164,11 @@ func (m *Migrator) MigrateFromTemplates(templatesDir string, context interface{} } } - err = migration.Up() + if m.Version != 0 { + err = migration.Migrate(m.Version) + } else { + err = migration.Up() + } if err != nil && err != migrate.ErrNoChange { // migrate library reports that no change was required, using ErrNoChange return fmt.Errorf("run migration from template directory %q, %w", templatesDir, err) } diff --git a/sql/migrations/jobsdb/000013_partition_id_column_up.tmpl b/sql/migrations/jobsdb/000013_partition_id_column_up.tmpl new file mode 100644 index 000000000..189e52391 --- /dev/null +++ b/sql/migrations/jobsdb/000013_partition_id_column_up.tmpl @@ -0,0 +1,3 @@ +{{range .Datasets}} + ALTER TABLE "{{$.Prefix}}_jobs_{{.}}" ADD COLUMN IF NOT EXISTS partition_id TEXT NOT NULL DEFAULT ''; +{{end}} \ No newline at end of file