Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
jobsdb.WithNumPartitions(config.GetIntVar(64, 1, "JobsDB.partitionCount")),
)
defer gatewayDB.Close()
if err = gatewayDB.Start(); err != nil {
Expand All @@ -185,6 +186,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
jobsdb.WithSkipMaintenanceErr(config.GetBool("Router.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
jobsdb.WithNumPartitions(config.GetIntVar(64, 1, "JobsDB.partitionCount")),
)
defer routerDB.Close()
batchRouterDB := jobsdb.NewForReadWrite(
Expand All @@ -194,6 +196,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
jobsdb.WithSkipMaintenanceErr(config.GetBool("BatchRouter.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
jobsdb.WithNumPartitions(config.GetIntVar(64, 1, "JobsDB.partitionCount")),
)
defer batchRouterDB.Close()

Expand Down
1 change: 1 addition & 0 deletions app/apphandlers/gatewayAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options)
jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
jobsdb.WithNumPartitions(config.GetIntVar(64, 1, "JobsDB.partitionCount")),
)
defer gatewayDB.Close()

Expand Down
2 changes: 2 additions & 0 deletions app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
jobsdb.WithSkipMaintenanceErr(config.GetBool("Router.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
jobsdb.WithNumPartitions(config.GetIntVar(64, 1, "JobsDB.partitionCount")),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be in sync with partitions in scheduler rt? How are we planning to do that? operator changes might be required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now I will be overriding it in the customer objects that we have overridden scheduler's env variable. But we do need a better approach. Should handle it during scheduler-v2 development

)
defer routerDB.Close()
batchRouterDB := jobsdb.NewForReadWrite(
Expand All @@ -182,6 +183,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
jobsdb.WithSkipMaintenanceErr(config.GetBool("BatchRouter.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
jobsdb.WithNumPartitions(config.GetIntVar(64, 1, "JobsDB.partitionCount")),
)
defer batchRouterDB.Close()
schemaDB := jobsdb.NewForReadWrite(
Expand Down
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/lambda v1.76.0
github.com/aws/aws-sdk-go-v2/service/personalizeevents v1.29.0
github.com/aws/aws-sdk-go-v2/service/s3 v1.90.2
github.com/aws/aws-sdk-go-v2/service/sts v1.40.2
github.com/aws/smithy-go v1.23.2
github.com/bufbuild/httplb v0.4.1
github.com/cenkalti/backoff v2.2.1+incompatible
Expand All @@ -56,6 +57,7 @@ require (
github.com/dgraph-io/badger/v4 v4.8.0
github.com/dlclark/regexp2 v1.11.5
github.com/docker/docker v28.3.3+incompatible
github.com/evanphx/json-patch v0.5.2
github.com/evanphx/json-patch/v5 v5.9.11
github.com/fsouza/fake-gcs-server v1.52.2
github.com/go-chi/chi/v5 v5.2.3
Expand Down Expand Up @@ -96,7 +98,7 @@ require (
github.com/rudderlabs/bing-ads-go-sdk v0.2.3
github.com/rudderlabs/compose-test v0.1.3
github.com/rudderlabs/keydb v1.3.0
github.com/rudderlabs/rudder-go-kit v0.66.0
github.com/rudderlabs/rudder-go-kit v0.66.1
github.com/rudderlabs/rudder-observability-kit v0.0.6
github.com/rudderlabs/rudder-schemas v0.7.0
github.com/rudderlabs/rudder-transformer/go v0.0.0-20250707171833-9cd525669b1b
Expand Down Expand Up @@ -187,7 +189,6 @@ require (
github.com/aws/aws-sdk-go-v2/service/redshiftdata v1.33.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.30.3 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.7 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.40.2
github.com/beorn7/perks v1.0.1 // indirect
github.com/bitfield/gotestdox v0.2.2 // indirect
github.com/bits-and-blooms/bitset v1.14.3 // indirect
Expand All @@ -214,7 +215,6 @@ require (
github.com/dvsekhvalnov/jose2go v1.7.0 // indirect
github.com/envoyproxy/go-control-plane/envoy v1.35.0 // indirect
github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect
github.com/evanphx/json-patch v5.9.11+incompatible
github.com/fatih/color v1.18.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.9.0 // indirect
Expand Down Expand Up @@ -326,6 +326,7 @@ require (
github.com/tinylib/msgp v1.3.0 // indirect
github.com/tklauser/go-sysconf v0.3.14 // indirect
github.com/tklauser/numcpus v0.8.0 // indirect
github.com/twmb/murmur3 v1.1.8 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
Expand Down
11 changes: 7 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -531,8 +531,8 @@ github.com/envoyproxy/go-control-plane/ratelimit v0.1.0/go.mod h1:Wk+tMFAFbCXaJP
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/envoyproxy/protoc-gen-validate v1.2.1 h1:DEo3O99U8j4hBFwbJfrz9VtgcDfUKS7KJ7spH3d86P8=
github.com/envoyproxy/protoc-gen-validate v1.2.1/go.mod h1:d/C80l/jxXLdfEIhX1W2TmLfsJ31lvEjwamM4DxlWXU=
github.com/evanphx/json-patch v5.9.11+incompatible h1:ixHHqfcGvxhWkniF1tWxBHA0yb4Z+d1UQi45df52xW8=
github.com/evanphx/json-patch v5.9.11+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch v0.5.2 h1:xVCHIVMUu1wtM/VkR9jVZ45N3FhZfYMMYGorLCR8P3k=
github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ=
github.com/evanphx/json-patch/v5 v5.9.11 h1:/8HVnzMq13/3x9TPvjG08wUGqBTmZBsCWzjTM0wiaDU=
github.com/evanphx/json-patch/v5 v5.9.11/go.mod h1:3j+LviiESTElxA4p3EMKAB9HXj3/XEtnUf6OZxqIQTM=
github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM=
Expand Down Expand Up @@ -909,6 +909,7 @@ github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZ
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/jeremywohl/flatten v1.0.1 h1:LrsxmB3hfwJuE+ptGOijix1PIfOoKLJ3Uee/mzbgtrs=
github.com/jeremywohl/flatten v1.0.1/go.mod h1:4AmD/VxjWcI5SRB0n6szE2A6s2fsNHDLO0nAlMHgfLQ=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
Expand Down Expand Up @@ -1205,8 +1206,8 @@ github.com/rudderlabs/keydb v1.3.0 h1:lbVFYhMAW8DUR79i3p49AyGQ1ytsH2uuay9bA0doH6
github.com/rudderlabs/keydb v1.3.0/go.mod h1:PHHhQmlHVgIJht22E0Sr1ythB3NBiO2jUkr/5aBf0IA=
github.com/rudderlabs/parquet-go v0.0.3 h1:/zgRj929pGKHsthc0kw8stVEcFu1JUcpxDRlhxjSLic=
github.com/rudderlabs/parquet-go v0.0.3/go.mod h1:WmwBOdvwpXl2aZGRk3NxxgzC/DaWGfax3jrCRhKhtSo=
github.com/rudderlabs/rudder-go-kit v0.66.0 h1:gZbPUEhjgrWOl+K91/yX7rgMa/t9ymf/TZjCNg8OAFc=
github.com/rudderlabs/rudder-go-kit v0.66.0/go.mod h1:Ty2372mkenxb52j9ia3Qdo8d/ea/qffxMaKPdWxDPUQ=
github.com/rudderlabs/rudder-go-kit v0.66.1 h1:yWLN/2QkwgTsr+ZA6hBfKO3BSq1aHuUKQhzlpLZ5DIc=
github.com/rudderlabs/rudder-go-kit v0.66.1/go.mod h1:5w7w1e38HB3bAYh7d4+jrSrk57pYNvLcCadFI0+R26o=
github.com/rudderlabs/rudder-observability-kit v0.0.6 h1:xIA/1Sp38B542EYzxR7qUfNGJwsQCpmBnl6h5ul0AHA=
github.com/rudderlabs/rudder-observability-kit v0.0.6/go.mod h1:nR3GvY7HvuBaBqOKFfzLP9uYZu7OpzMqW2eeT2ikXtU=
github.com/rudderlabs/rudder-schemas v0.7.0 h1:hKShHYpbIldE1Q591vodI6iaAZ/IUOyC1DqUUJZysNU=
Expand Down Expand Up @@ -1345,6 +1346,8 @@ github.com/tonistiigi/vt100 v0.0.0-20240514184818-90bafcd6abab/go.mod h1:ulncasL
github.com/trinodb/trino-go-client v0.328.0 h1:X6hrGGysA3nvyVcz8kJbBS98srLNTNsnNYwRkMC1atA=
github.com/trinodb/trino-go-client v0.328.0/go.mod h1:e/nck9W6hy+9bbyZEpXKFlNsufn3lQGpUgDL1d5f1FI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/twmb/murmur3 v1.1.8 h1:8Yt9taO/WN3l08xErzjeschgZU2QSrwm1kclYq+0aRg=
github.com/twmb/murmur3 v1.1.8/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
github.com/urfave/cli/v2 v2.27.7 h1:bH59vdhbjLv3LAvIu6gd0usJHgoTTPhCFib8qqOwXYU=
github.com/urfave/cli/v2 v2.27.7/go.mod h1:CyNAG/xg+iAOg0N4MPGZqVmv2rCoP267496AOXUZjA4=
Expand Down
1 change: 1 addition & 0 deletions jobsdb/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1042,6 +1042,7 @@ func TestCreateDS(t *testing.T) {
workspace_id TEXT NOT NULL DEFAULT '',
uuid UUID NOT NULL,
user_id TEXT NOT NULL,
partition_id TEXT NOT NULL DEFAULT '',
parameters JSONB NOT NULL,
custom_val VARCHAR(64) NOT NULL,
event_payload JSONB NOT NULL,
Expand Down
74 changes: 67 additions & 7 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/bytesize"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/partmap"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-go-kit/stats/collectors"

Expand Down Expand Up @@ -424,10 +425,28 @@ type JobT struct {
LastJobStatus JobStatusT `json:"LastJobStatus"`
Parameters json.RawMessage `json:"Parameters"`
WorkspaceId string `json:"WorkspaceId"`
PartitionID string `json:"PartitionId"`
}

func (job *JobT) String() string {
return fmt.Sprintf("JobID=%v, UserID=%v, CreatedAt=%v, ExpireAt=%v, CustomVal=%v, Parameters=%v, EventPayload=%v EventCount=%d", job.JobID, job.UserID, job.CreatedAt, job.ExpireAt, job.CustomVal, string(job.Parameters), string(job.EventPayload), job.EventCount)
var sb strings.Builder
sb.WriteString("JobID=")
sb.WriteString(strconv.FormatInt(job.JobID, 10))
sb.WriteString(", UserID=")
sb.WriteString(job.UserID)
sb.WriteString(", CreatedAt=")
sb.WriteString(job.CreatedAt.String())
sb.WriteString(", ExpireAt=")
sb.WriteString(job.ExpireAt.String())
sb.WriteString(", CustomVal=")
sb.WriteString(job.CustomVal)
sb.WriteString(", Parameters=")
sb.WriteString(string(job.Parameters))
sb.WriteString(", EventPayload=")
sb.WriteString(string(job.EventPayload))
sb.WriteString(" EventCount=")
sb.WriteString(strconv.Itoa(job.EventCount))
return sb.String()
}

func (job *JobT) sanitizeJSON() error {
Expand Down Expand Up @@ -571,6 +590,9 @@ type Handle struct {
maxOpenConnections int
analyzeThreshold config.ValueLoader[int]
MaxDSSize config.ValueLoader[int]
numPartitions int // if zero or negative, no partitioning
partitionFunction func(job *JobT) string
dbTablesVersion int // version of the database tables schema (0 means latest)

migration struct {
maxMigrateOnce, maxMigrateDSProbe config.ValueLoader[int]
Expand Down Expand Up @@ -784,6 +806,35 @@ func WithJobMaxAge(jobMaxAge config.ValueLoader[time.Duration]) OptsFunc {
}
}

func WithNumPartitions(numPartitions int) OptsFunc {
{
return func(jd *Handle) {
// numPartitions must be greater than 0 and power-of-two
if numPartitions < 1 || (numPartitions&(numPartitions-1)) != 0 {
panic(fmt.Errorf("invalid number of jobsdb partitions, needs to be power of two: %d", numPartitions))
}
jd.conf.numPartitions = numPartitions
// default partition function using a 32-bit key space and Murmur3 hash
if jd.conf.partitionFunction == nil {
jd.conf.partitionFunction = func(job *JobT) string {
var partitionIdx uint32
if jd.conf.numPartitions > 0 {
partitionIdx, _ = partmap.Murmur3Partition32(job.UserID, uint32(jd.conf.numPartitions))
}
return job.WorkspaceId + "-" + strconv.Itoa(int(partitionIdx))
}
}
}
}
}

// withDatabaseTablesVersion sets the database tables version to use (internal use only for verifying database table migrations)
func withDatabaseTablesVersion(dbVersion int) OptsFunc {
return func(jd *Handle) {
jd.conf.dbTablesVersion = dbVersion
}
}

func NewForRead(tablePrefix string, opts ...OptsFunc) *Handle {
return newOwnerType(Read, tablePrefix, opts...)
}
Expand Down Expand Up @@ -1509,6 +1560,7 @@ func (jd *Handle) createDSTablesInTx(ctx context.Context, tx *Tx, newDS dataSetT
workspace_id TEXT NOT NULL DEFAULT '',
uuid UUID NOT NULL,
user_id TEXT NOT NULL,
partition_id TEXT NOT NULL DEFAULT '',
parameters JSONB NOT NULL,
custom_val VARCHAR(64) NOT NULL,
event_payload `+string(columnType)+` NOT NULL,
Expand All @@ -1535,7 +1587,12 @@ func (jd *Handle) createDSTablesInTx(ctx context.Context, tx *Tx, newDS dataSetT

func (jd *Handle) createDSIndicesInTx(ctx context.Context, tx *Tx, newDS dataSetT) error {
if _, err := tx.ExecContext(ctx, fmt.Sprintf(`CREATE INDEX "idx_%[1]s_ws" ON %[1]q (workspace_id)`, newDS.JobTable)); err != nil {
return fmt.Errorf("creating workspace index: %w", err)
return fmt.Errorf("creating workspace_id index: %w", err)
}
if jd.conf.numPartitions > 0 {
if _, err := tx.ExecContext(ctx, fmt.Sprintf(`CREATE INDEX "idx_%[1]s_partid" ON %[1]q (partition_id)`, newDS.JobTable)); err != nil {
return fmt.Errorf("creating partition_id index: %w", err)
}
}
if _, err := tx.ExecContext(ctx, fmt.Sprintf(`CREATE INDEX "idx_%[1]s_cv" ON %[1]q (custom_val)`, newDS.JobTable)); err != nil {
return fmt.Errorf("creating custom_val index: %w", err)
Expand Down Expand Up @@ -2038,7 +2095,7 @@ func (jd *Handle) doStoreJobsInTx(ctx context.Context, tx *Tx, ds dataSetT, jobL
var stmt *sql.Stmt
var err error

stmt, err = tx.PrepareContext(ctx, pq.CopyIn(ds.JobTable, "uuid", "user_id", "custom_val", "parameters", "event_payload", "event_count", "workspace_id"))
stmt, err = tx.PrepareContext(ctx, pq.CopyIn(ds.JobTable, "uuid", "user_id", "custom_val", "parameters", "event_payload", "event_count", "workspace_id", "partition_id"))
if err != nil {
return err
}
Expand All @@ -2049,8 +2106,11 @@ func (jd *Handle) doStoreJobsInTx(ctx context.Context, tx *Tx, ds dataSetT, jobL
if job.EventCount > 1 {
eventCount = job.EventCount
}

if _, err = stmt.ExecContext(ctx, job.UUID, job.UserID, job.CustomVal, string(job.Parameters), string(job.EventPayload), eventCount, job.WorkspaceId); err != nil {
// Assign partition ID if not already assigned
if job.PartitionID == "" && jd.conf.numPartitions > 0 {
job.PartitionID = jd.conf.partitionFunction(job)
}
if _, err = stmt.ExecContext(ctx, job.UUID, job.UserID, job.CustomVal, string(job.Parameters), string(job.EventPayload), eventCount, job.WorkspaceId, job.PartitionID); err != nil {
return err
}
}
Expand Down Expand Up @@ -2198,7 +2258,7 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, lastDS bool, param
var rows *sql.Rows
sqlStatement := fmt.Sprintf(`SELECT
jobs.job_id, jobs.uuid, jobs.user_id, jobs.parameters, jobs.custom_val, jobs.event_payload, jobs.event_count,
jobs.created_at, jobs.expire_at, jobs.workspace_id,
jobs.created_at, jobs.expire_at, jobs.workspace_id, jobs.partition_id,
octet_length(jobs.event_payload::text) as payload_size,
sum(jobs.event_count) over (order by jobs.job_id asc) as running_event_counts,
sum(octet_length(jobs.event_payload::text)) over (order by jobs.job_id) as running_payload_size,
Expand Down Expand Up @@ -2265,7 +2325,7 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, lastDS bool, param
var jsErrorResponse []byte
var jsParameters []byte
err := rows.Scan(&job.JobID, &job.UUID, &job.UserID, &job.Parameters, &job.CustomVal,
&payload, &job.EventCount, &job.CreatedAt, &job.ExpireAt, &job.WorkspaceId, &payloadSize, &runningEventCount, &runningPayloadSize,
&payload, &job.EventCount, &job.CreatedAt, &job.ExpireAt, &job.WorkspaceId, &job.PartitionID, &payloadSize, &runningEventCount, &runningPayloadSize,
&jsState, &jsAttemptNum,
&jsExecTime, &jsRetryTime,
&jsErrorCode, &jsErrorResponse, &jsParameters)
Expand Down
Loading
Loading