Skip to content

Commit 6cb409e

Browse files
committed
fixup! feat(jobsdb): introduce new partition_id column
🔒 Scanned for secrets using gitleaks 8.28.0
1 parent ca67c97 commit 6cb409e

File tree

5 files changed

+125
-1
lines changed

5 files changed

+125
-1
lines changed

jobsdb/jobsdb.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -594,6 +594,7 @@ type Handle struct {
594594
MaxDSSize config.ValueLoader[int]
595595
numPartitions int // if zero or negative, no partitioning
596596
partitionFunction func(job *JobT) string
597+
dbTablesVersion int // version of the database tables schema (0 means latest)
597598

598599
migration struct {
599600
maxMigrateOnce, maxMigrateDSProbe config.ValueLoader[int]
@@ -826,6 +827,13 @@ func WithNumPartitions(numPartitions int) OptsFunc {
826827
}
827828
}
828829

830+
// withDatabaseTablesVersion sets the database tables version to use (internal use only for verifying database table migrations)
831+
func withDatabaseTablesVersion(dbVersion int) OptsFunc {
832+
return func(jd *Handle) {
833+
jd.conf.dbTablesVersion = dbVersion
834+
}
835+
}
836+
829837
func NewForRead(tablePrefix string, opts ...OptsFunc) *Handle {
830838
return newOwnerType(Read, tablePrefix, opts...)
831839
}

jobsdb/jobsdb_tablemigrate_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package jobsdb
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/google/uuid"
8+
"github.com/stretchr/testify/require"
9+
10+
"github.com/rudderlabs/rudder-go-kit/config"
11+
)
12+
13+
func TestJobsDBTableMigrations(t *testing.T) {
14+
t.Run("13_partition_id_column", func(t *testing.T) {
15+
config.Reset()
16+
postgres := startPostgres(t)
17+
18+
// Start JobsDB with DB version 11 (without partition_id column) to create initial tables
19+
jd := NewForWrite("test", WithDBHandle(postgres.DB), withDatabaseTablesVersion(12))
20+
require.NoError(t, jd.Start(), "it should be able to start JobsDB")
21+
jd.Close()
22+
23+
// Add some data to ensure migration works fine with existing data
24+
_, err := postgres.DB.Exec(`
25+
INSERT INTO test_jobs_1
26+
(uuid, workspace_id, user_id, custom_val, parameters, event_payload, event_count)
27+
VALUES
28+
(gen_random_uuid(), 'worskpace-1', 'user-1', 'gw', '{"source_id": "src1"}', '{}', 1)`)
29+
require.NoError(t, err, "it should be able to insert initial data")
30+
31+
// Start JobsDB again to trigger migration to add partition_id column
32+
jd = NewForReadWrite("test", WithDBHandle(postgres.DB))
33+
require.NoError(t, jd.Start(), "it should be able to start JobsDB with latest table versions")
34+
defer jd.Close()
35+
36+
// Verify that partition_id column exists, but is empty for existing rows
37+
unprocessed, err := jd.GetUnprocessed(context.Background(), GetQueryParams{JobsLimit: 1})
38+
require.NoError(t, err, "it should be able to get unprocessed jobs")
39+
require.Len(t, unprocessed.Jobs, 1)
40+
require.Empty(t, unprocessed.Jobs[0].PartitionID)
41+
42+
// Store a new job and verify partition_id is set
43+
require.NoError(t, jd.Store(context.Background(), []*JobT{{
44+
UUID: uuid.New(),
45+
WorkspaceId: "workspace-1",
46+
UserID: "user-1",
47+
PartitionID: "partition-1",
48+
CustomVal: "gw",
49+
Parameters: []byte(`{"source_id": "src2"}`),
50+
EventPayload: []byte(`{}`),
51+
EventCount: 1,
52+
}}))
53+
54+
unprocessed, err = jd.GetUnprocessed(context.Background(), GetQueryParams{JobsLimit: 2})
55+
require.NoError(t, err, "it should be able to get unprocessed jobs")
56+
require.Len(t, unprocessed.Jobs, 2)
57+
require.Equal(t, "partition-1", unprocessed.Jobs[1].PartitionID)
58+
})
59+
}

jobsdb/jobsdb_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1774,6 +1774,55 @@ func TestUpdateJobStatus(t *testing.T) {
17741774
})
17751775
}
17761776

1777+
func TestPartitionedJobsDB(t *testing.T) {
1778+
t.Run("without partitioning", func(t *testing.T) {
1779+
postgres := startPostgres(t)
1780+
db := NewForReadWrite("partitioning", WithDBHandle(postgres.DB))
1781+
require.NoError(t, db.Start(), "it should be able to start the jobs db handle")
1782+
defer db.TearDown()
1783+
err := db.Store(context.Background(), []*JobT{
1784+
{
1785+
UUID: uuid.New(),
1786+
WorkspaceId: "workspace-1",
1787+
UserID: "user-1",
1788+
CustomVal: "custom-val-1",
1789+
Parameters: []byte(`{}`),
1790+
EventPayload: []byte(`{}`),
1791+
EventCount: 1,
1792+
},
1793+
})
1794+
require.NoError(t, err, "it should be able to store job")
1795+
unprocessed, err := db.GetUnprocessed(context.Background(), GetQueryParams{JobsLimit: 1})
1796+
require.NoError(t, err, "it should be able to get unprocessed jobs")
1797+
require.Equal(t, 1, len(unprocessed.Jobs), "it should return the stored job")
1798+
require.Empty(t, unprocessed.Jobs[0].PartitionID, "it should have empty partition ID")
1799+
})
1800+
1801+
t.Run("with partitioning", func(t *testing.T) {
1802+
postgres := startPostgres(t)
1803+
db := NewForReadWrite("partitioning", WithNumPartitions(64), WithDBHandle(postgres.DB))
1804+
require.NoError(t, db.Start(), "it should be able to start the jobs db handle")
1805+
defer db.TearDown()
1806+
err := db.Store(context.Background(), []*JobT{
1807+
{
1808+
UUID: uuid.New(),
1809+
WorkspaceId: "workspace-1",
1810+
UserID: "user-1",
1811+
CustomVal: "custom-val-1",
1812+
Parameters: []byte(`{}`),
1813+
EventPayload: []byte(`{}`),
1814+
EventCount: 1,
1815+
},
1816+
})
1817+
require.NoError(t, err, "it should be able to store job")
1818+
unprocessed, err := db.GetUnprocessed(context.Background(), GetQueryParams{JobsLimit: 1})
1819+
require.NoError(t, err, "it should be able to get unprocessed jobs")
1820+
require.Equal(t, 1, len(unprocessed.Jobs), "it should return the stored job")
1821+
require.NotEmpty(t, unprocessed.Jobs[0].PartitionID, "it should have non-empty partition ID")
1822+
require.Equal(t, "workspace-1-62", unprocessed.Jobs[0].PartitionID, "it should have correct partition ID including the workspace ID and partition number")
1823+
})
1824+
}
1825+
17771826
type testingT interface {
17781827
Errorf(format string, args ...interface{})
17791828
FailNow()

jobsdb/setup.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ func (jd *Handle) setupDatabaseTables(templateData map[string]interface{}) {
2828
Handle: jd.dbHandle,
2929
MigrationsTable: jd.SchemaMigrationTable(),
3030
ShouldForceSetLowerVersion: jd.config.GetBool("SQLMigrator.forceSetLowerVersion", true),
31+
Version: uint(jd.conf.dbTablesVersion),
3132
}
3233
// execute any necessary migrations
3334
if err := m.MigrateFromTemplates("jobsdb", templateData); err != nil {

services/sql-migrator/migrator.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ type Migrator struct {
3535

3636
// Indicates if all migrations should be run ignoring the current version in MigrationsTable
3737
RunAlways bool
38+
39+
// Migration target version override. 0 means latest.
40+
Version uint
3841
}
3942

4043
var pkgLogger logger.Logger
@@ -161,7 +164,11 @@ func (m *Migrator) MigrateFromTemplates(templatesDir string, context interface{}
161164
}
162165
}
163166

164-
err = migration.Up()
167+
if m.Version != 0 {
168+
err = migration.Migrate(m.Version)
169+
} else {
170+
err = migration.Up()
171+
}
165172
if err != nil && err != migrate.ErrNoChange { // migrate library reports that no change was required, using ErrNoChange
166173
return fmt.Errorf("run migration from template directory %q, %w", templatesDir, err)
167174
}

0 commit comments

Comments
 (0)