Skip to content

Commit 26843ca

Browse files
authored
fix: worker job buffer increased memory requirements causing excessive cpu for garbage collection (#6500)
🔒 Scanned for secrets using gitleaks 8.28.0 # Description With the recent worker buffer modifications, each worker’s channel capacity has increased from **1K** to **10K**. Each channel stores struct values that occupy **56 bytes per slot**, even when the channel is empty. ```go type workerJob struct { job *jobsdb.JobT // 8 bytes (pointer) parameters *routerutils.JobParameters// 8 bytes assignedAt time.Time // 24 bytes (2 int64s + alignment) drainReason string // 16 bytes (2 words: pointer + length) } ``` To optimize memory usage, struct pointers are now used instead of struct values. This reduces the memory footprint of empty channel slots from **56 bytes per slot** to **8 bytes per slot**. ## Additional changes - If `noOfJobsPerChannel` > `maxNoOfJobsPerChannel`, then `maxNoOfJobsPerChannel` will automatically increase to match `noOfJobsPerChannel`. This eliminates the need to manually adjust `maxNoOfJobsPerChannel` when increasing the channel size. - Introduced `experimentalBufferSizeMinimum` (default: **500**) to prevent the experimental calculator from going below this threshold, provided the worker throughput exceeds 1 job/sec. Extremely low buffer sizes have been observed to negatively impact overall throughput. ## Linear Ticket resolves PIPE-2554 ## Security - [x] The code changed/added as part of this pull request won't create any security issues with how the software is being used.
1 parent 68bbab9 commit 26843ca

9 files changed

+68
-53
lines changed

router/handle_lifecycle.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,14 @@ func (rt *Handle) Setup(
108108
rt.noOfWorkers = getRouterConfigInt("noOfWorkers", destType, 64)
109109
rt.maxNoOfJobsPerChannel = getRouterConfigInt("maxNoOfJobsPerChannel", destType, 10000)
110110
rt.noOfJobsPerChannel = getRouterConfigInt("noOfJobsPerChannel", destType, 1000)
111+
if rt.noOfJobsPerChannel > rt.maxNoOfJobsPerChannel { // if noOfJobsPerChannel is more than max, set it as the new max
112+
rt.logger.Warnn("noOfJobsPerChannel is more than maxNoOfJobsPerChannel, setting maxNoOfJobsPerChannel to noOfJobsPerChannel value",
113+
obskit.DestinationType(rt.destType),
114+
logger.NewIntField("maxNoOfJobsPerChannel", int64(rt.maxNoOfJobsPerChannel)),
115+
logger.NewIntField("noOfJobsPerChannel", int64(rt.noOfJobsPerChannel)),
116+
)
117+
rt.maxNoOfJobsPerChannel = rt.noOfJobsPerChannel
118+
}
111119
// Explicitly control destination types for which we want to support batching
112120
// Avoiding stale configurations still having KAFKA batching enabled to cause issues with later versions of rudder-server
113121
batchingSupportedDestinations := config.GetStringSliceVar([]string{"AM"}, "Router.batchingSupportedDestinations")
@@ -336,6 +344,7 @@ func (rt *Handle) setupReloadableVars() {
336344
rt.reloadableConfig.oauthV2ExpirationTimeDiff = config.GetReloadableDurationVar(5, time.Minute, getRouterConfigKeys("oauth.expirationTimeDiff", rt.destType)...)
337345
rt.reloadableConfig.enableExperimentalBufferSizeCalculator = config.GetReloadableBoolVar(false, getRouterConfigKeys("enableExperimentalBufferSizeCalculator", rt.destType)...)
338346
rt.reloadableConfig.experimentalBufferSizeScalingFactor = config.GetReloadableFloat64Var(2.0, getRouterConfigKeys("experimentalBufferSizeScalingFactor", rt.destType)...)
347+
rt.reloadableConfig.experimentalBufferSizeMinimum = config.GetReloadableIntVar(500, 1, getRouterConfigKeys("experimentalBufferSizeMinimum", rt.destType)...)
339348
rt.diagnosisTickerTime = config.GetDurationVar(60, time.Second, "Diagnostics.routerTimePeriod", "Diagnostics.routerTimePeriodInS")
340349
rt.netClientTimeout = config.GetDurationVar(10, time.Second,
341350
"Router."+rt.destType+".httpTimeout",

router/partition_worker.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ func newPartitionWorker(ctx context.Context, rt *Handle, partition string) *part
5555
workLoopThroughput,
5656
rt.reloadableConfig.experimentalBufferSizeScalingFactor,
5757
rt.noOfJobsPerChannel,
58+
rt.reloadableConfig.experimentalBufferSizeMinimum,
5859
),
5960
&workerBufferStats{
6061
onceEvery: kitsync.NewOnceEvery(5 * time.Second),

router/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,5 @@ type reloadableConfig struct {
7373
oauthV2ExpirationTimeDiff config.ValueLoader[time.Duration]
7474
enableExperimentalBufferSizeCalculator config.ValueLoader[bool] // whether to use the experimental worker buffer size calculator or not
7575
experimentalBufferSizeScalingFactor config.ValueLoader[float64] // scaling factor to scale up the buffer size in the experimental calculator
76+
experimentalBufferSizeMinimum config.ValueLoader[int] // minimum buffer size in the experimental calculator
7677
}

router/worker_batch_loop.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ type workerBatchLoop struct {
1313
ctx context.Context // context for managing the lifecycle of the loop
1414
jobsBatchTimeout config.ValueLoader[time.Duration] // timeout for processing jobs in a batch
1515
noOfJobsToBatchInAWorker config.ValueLoader[int] // maximum number of jobs to batch in a worker before processing
16-
inputCh <-chan workerJob // channel to receive jobs for processing
16+
inputCh <-chan *workerJob // channel to receive jobs for processing
1717
enableBatching bool // whether to enable batching of jobs
1818
batchTransform func(routerJobs []types.RouterJobT) []types.DestinationJobT // function to transform router jobs into destination jobs in batch mode
1919
transform func(routerJobs []types.RouterJobT) []types.DestinationJobT // function to transform router jobs into destination jobs in non-batch mode
@@ -69,7 +69,7 @@ func (wl *workerBatchLoop) runLoop() {
6969
doProcessRouterJobs()
7070
}
7171
start := time.Now()
72-
if routerJob := wl.acceptWorkerJob(workerJob); routerJob != nil {
72+
if routerJob := wl.acceptWorkerJob(*workerJob); routerJob != nil {
7373
routerJobs = append(routerJobs, *routerJob)
7474
if wl.noOfJobsToBatchInAWorker.Load() <= len(routerJobs) {
7575
doProcessRouterJobs() // process the batch if it reaches the limit

router/worker_batch_loop_test.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
func TestWorkerBatchLoop(t *testing.T) {
2222
t.Run("no batching and no transform at router, acceptWorkerJob returning always nil", func(t *testing.T) {
2323
// Setup
24-
inputCh := make(chan workerJob, 10)
24+
inputCh := make(chan *workerJob, 10)
2525
batchSize := 5
2626

2727
var processedJobs []types.DestinationJobT
@@ -75,7 +75,7 @@ func TestWorkerBatchLoop(t *testing.T) {
7575
params := &routerutils.JobParameters{
7676
TransformAt: "processor", // Not "router"
7777
}
78-
inputCh <- workerJob{job: job, parameters: params}
78+
inputCh <- &workerJob{job: job, parameters: params}
7979
}
8080

8181
// Wait a bit to ensure jobs are processed and some timeouts are triggered too
@@ -104,7 +104,7 @@ func TestWorkerBatchLoop(t *testing.T) {
104104

105105
t.Run("no batching and transform at router, then switch to no router transform", func(t *testing.T) {
106106
// Setup
107-
inputCh := make(chan workerJob, 10)
107+
inputCh := make(chan *workerJob, 10)
108108
batchSize := 5
109109

110110
var processedJobs []types.DestinationJobT
@@ -184,7 +184,7 @@ func TestWorkerBatchLoop(t *testing.T) {
184184
params := &routerutils.JobParameters{
185185
TransformAt: "router",
186186
}
187-
inputCh <- workerJob{job: job, parameters: params}
187+
inputCh <- &workerJob{job: job, parameters: params}
188188
}
189189

190190
// Send a job with transform_at != "router" - this should trigger batch processing
@@ -195,7 +195,7 @@ func TestWorkerBatchLoop(t *testing.T) {
195195
params := &routerutils.JobParameters{
196196
TransformAt: "processor", // Not "router"
197197
}
198-
inputCh <- workerJob{job: job, parameters: params}
198+
inputCh <- &workerJob{job: job, parameters: params}
199199

200200
// Wait a bit to ensure processing
201201
time.Sleep(200 * time.Millisecond)
@@ -226,7 +226,7 @@ func TestWorkerBatchLoop(t *testing.T) {
226226

227227
t.Run("with batching enabled", func(t *testing.T) {
228228
// Setup
229-
inputCh := make(chan workerJob, 10)
229+
inputCh := make(chan *workerJob, 10)
230230
batchSize := 3
231231

232232
var processedJobs []types.DestinationJobT
@@ -292,7 +292,7 @@ func TestWorkerBatchLoop(t *testing.T) {
292292
params := &routerutils.JobParameters{
293293
TransformAt: "router",
294294
}
295-
inputCh <- workerJob{job: job, parameters: params}
295+
inputCh <- &workerJob{job: job, parameters: params}
296296
}
297297

298298
// Wait for batch processing
@@ -306,7 +306,7 @@ func TestWorkerBatchLoop(t *testing.T) {
306306
params := &routerutils.JobParameters{
307307
TransformAt: "router",
308308
}
309-
inputCh <- workerJob{job: job, parameters: params}
309+
inputCh <- &workerJob{job: job, parameters: params}
310310

311311
// Wait for timeout to trigger processing of remaining jobs
312312
time.Sleep(200*time.Millisecond + 50*time.Millisecond)
@@ -338,7 +338,7 @@ func TestWorkerBatchLoop(t *testing.T) {
338338

339339
t.Run("jobsBatchTimeout resets even when no jobs during timeout", func(t *testing.T) {
340340
// Setup
341-
inputCh := make(chan workerJob, 10)
341+
inputCh := make(chan *workerJob, 10)
342342
batchSize := 5
343343

344344
var timeoutFireCount int
@@ -400,7 +400,7 @@ func TestWorkerBatchLoop(t *testing.T) {
400400
params := &routerutils.JobParameters{
401401
TransformAt: "router",
402402
}
403-
inputCh <- workerJob{job: job, parameters: params}
403+
inputCh <- &workerJob{job: job, parameters: params}
404404

405405
// Wait longer than timeout to ensure timeout fires
406406
time.Sleep(60 * time.Millisecond)
@@ -433,7 +433,7 @@ func TestWorkerBatchLoop(t *testing.T) {
433433

434434
t.Run("batch processing when batch size is reached", func(t *testing.T) {
435435
// Setup
436-
inputCh := make(chan workerJob, 10)
436+
inputCh := make(chan *workerJob, 10)
437437
batchSize := 2 // Small batch size for testing
438438

439439
var processedBatches [][]types.DestinationJobT
@@ -493,7 +493,7 @@ func TestWorkerBatchLoop(t *testing.T) {
493493
params := &routerutils.JobParameters{
494494
TransformAt: "router",
495495
}
496-
inputCh <- workerJob{job: job, parameters: params}
496+
inputCh <- &workerJob{job: job, parameters: params}
497497
}
498498

499499
// Wait for processing
@@ -508,7 +508,7 @@ func TestWorkerBatchLoop(t *testing.T) {
508508
params := &routerutils.JobParameters{
509509
TransformAt: "router",
510510
}
511-
inputCh <- workerJob{job: job, parameters: params}
511+
inputCh <- &workerJob{job: job, parameters: params}
512512
}
513513

514514
// Wait for processing

router/worker_buffer.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
type workerBuffer struct {
1414
maxCapacity int
1515
targetCapacity func() int
16-
jobs chan workerJob
16+
jobs chan *workerJob
1717

1818
stats *workerBufferStats
1919
mu sync.RWMutex
@@ -36,7 +36,7 @@ func newWorkerBuffer(maxCapacity int, targetCapacity func() int, stats *workerBu
3636
wb := &workerBuffer{
3737
maxCapacity: maxCapacity,
3838
targetCapacity: targetCapacity,
39-
jobs: make(chan workerJob, maxCapacity),
39+
jobs: make(chan *workerJob, maxCapacity),
4040
stats: stats,
4141
}
4242
return wb
@@ -50,13 +50,13 @@ func newSimpleWorkerBuffer(capacity int) *workerBuffer {
5050
wb := &workerBuffer{
5151
maxCapacity: capacity,
5252
targetCapacity: func() int { return capacity },
53-
jobs: make(chan workerJob, capacity),
53+
jobs: make(chan *workerJob, capacity),
5454
stats: nil,
5555
}
5656
return wb
5757
}
5858

59-
func (wb *workerBuffer) Jobs() <-chan workerJob {
59+
func (wb *workerBuffer) Jobs() <-chan *workerJob {
6060
return wb.jobs
6161
}
6262

@@ -127,7 +127,7 @@ func (rs *reservedSlot) Use(wj workerJob) {
127127
rs.wb.mu.Lock()
128128
defer rs.wb.mu.Unlock()
129129
rs.wb.reservations--
130-
rs.wb.jobs <- wj
130+
rs.wb.jobs <- &wj
131131
}
132132

133133
// Release releases the reserved slot from the worker's buffer

router/worker_buffer_calculator.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,13 @@ func newExperimentalBufferSizeCalculator(
4444
noOfJobsToBatchInAWorker config.ValueLoader[int], // number of jobs that a worker can batch together
4545
workLoopThroughput metric.SimpleMovingAverage, // sliding average of work loop throughput
4646
scalingFactor config.ValueLoader[float64], // scaling factor to scale up the buffer size
47+
minBufferSize config.ValueLoader[int], // minimum buffer size
4748
) bufferSizeCalculator {
4849
return func() int {
49-
const minBufferSize = 1
50+
const one = 1
5051
m1 := workLoopThroughput.Load() // at least the average throughput of the work loop
5152
if m1 < 1 { // if there is no throughput yet, the throughput is less than 1 per second, set buffer to minBufferSize
52-
return minBufferSize
53+
return one
5354
}
5455
m2 := float64(jobQueryBatchSize.Load() / noOfWorkers) // at least the average number of jobs per worker during pickup
5556
m3 := float64(noOfJobsToBatchInAWorker.Load()) // at least equal to the number of jobs to batch in a worker
@@ -59,7 +60,7 @@ func newExperimentalBufferSizeCalculator(
5960
// calculate the maximum of the three metrics to determine the buffer size
6061
math.Max(
6162
math.Max(math.Max(m1, m2), m3)*scalingFactor.Load(), // scale up to provide some buffer
62-
minBufferSize, // ensure buffer size is at least minBufferSize
63+
math.Max(one, float64(minBufferSize.Load())), // ensure buffer size is at least one or the configured minimum
6364
)))
6465
}
6566
}
@@ -74,13 +75,15 @@ func newBufferSizeCalculatorSwitcher(
7475
workLoopThroughput metric.SimpleMovingAverage, // sliding average of work loop throughput
7576
scalingFactor config.ValueLoader[float64], // scaling factor to scale up the buffer size
7677
noOfJobsPerChannel int, // number of jobs per channel
78+
minBufferSize config.ValueLoader[int], // minimum buffer size
7779
) bufferSizeCalculator {
7880
new := newExperimentalBufferSizeCalculator(
7981
jobQueryBatchSize,
8082
noOfWorkers,
8183
noOfJobsToBatchInAWorker,
8284
workLoopThroughput,
8385
scalingFactor,
86+
minBufferSize,
8487
)
8588
legacy := newStandardBufferSizeCalculator(
8689
noOfJobsToBatchInAWorker,

0 commit comments

Comments
 (0)