Skip to content

Commit 5b2e519

Browse files
authored
feat(router): dynamic sizing of worker job buffers (#6463)
🔒 Scanned for secrets using gitleaks 8.28.0 # Description This update introduces **dynamic buffer sizing** for job queues within router worker channels. Previously, the buffer capacity was fixed using the `noOfJobsPerChannel` configuration. With this change, the effective buffer capacity can now grow or shrink (up to `maxNoOfJobsPerChannel`, default: 10,000) based on the following calculation strategies: ### **1. Standard** (default) Buffer capacity is set to: `max(noOfJobsPerChannel, noOfJobsToBatchInAWorker)` > `noOfJobsToBatchInAWorker` remains a reloadable value. ### **2. Experimental** (enabled through `enableExperimentalBufferSizeCalculator`) The buffer capacity is determined using the following metrics: 1. **Work loop throughput** — average number of jobs processed per second by the worker's loop (`workLoopThroughput`). 2. **Query batch size per worker** — calculated as `jobQueryBatchSize / noOfWorkers` — note that `jobQueryBatchSize` can change dynamically if pickup throttling is enabled. 3. **Configured batch size** — `noOfJobsToBatchInAWorker`. The **maximum** of the three values above is selected and then **multiplied by a scaling factor of 2.0** to compute the dynamic buffer size. **Special Case:** If `workLoopThroughput < 1`, the buffer size is forced to `1` to intentionally start small and apply backpressure in scenarios of low processing throughput. --- <img width="1742" height="753" alt="router 101-Router throughput drawio (2)" src="https://github.com/user-attachments/assets/499cb02a-96e0-416c-a09f-d84dfea087fc" /> ## Additional Changes - During router shutdown, workers no longer wait to drain their job buffers. They exit early instead. Any in-progress jobs are marked as failed by the router on the next startup. - New worker statistics are now recorded: 1. buffer capacity: `router_worker_buffer_capacity ` 2. buffer size: `router_worker_buffer_size ` 3. average work-loop throughput: `router_worker_work_loop_throughput ` ## Linear Ticket resolves PIPE-2500 ## Security - [x] The code changed/added as part of this pull request won't create any security issues with how the software is being used.
1 parent 8effa78 commit 5b2e519

16 files changed

+1667
-105
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ require (
8888
github.com/rudderlabs/bing-ads-go-sdk v0.2.3
8989
github.com/rudderlabs/compose-test v0.1.3
9090
github.com/rudderlabs/keydb v1.2.0
91-
github.com/rudderlabs/rudder-go-kit v0.63.5
91+
github.com/rudderlabs/rudder-go-kit v0.64.0
9292
github.com/rudderlabs/rudder-observability-kit v0.0.5
9393
github.com/rudderlabs/rudder-schemas v0.7.0
9494
github.com/rudderlabs/rudder-transformer/go v0.0.0-20250707171833-9cd525669b1b

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1201,8 +1201,8 @@ github.com/rudderlabs/keydb v1.2.0 h1:LMwWezUh3C+xheNirHNMhUlcb09ZH0Alo6bDDwwMaN
12011201
github.com/rudderlabs/keydb v1.2.0/go.mod h1:ZYouneft71uF85OTUGS5cI9DoVdsfKDItgUTGNli9Mk=
12021202
github.com/rudderlabs/parquet-go v0.0.3 h1:/zgRj929pGKHsthc0kw8stVEcFu1JUcpxDRlhxjSLic=
12031203
github.com/rudderlabs/parquet-go v0.0.3/go.mod h1:WmwBOdvwpXl2aZGRk3NxxgzC/DaWGfax3jrCRhKhtSo=
1204-
github.com/rudderlabs/rudder-go-kit v0.63.5 h1:SpeaZY89Q7okKP/cN2O09I+7m/Z9XhRwMAX+LvPcVtA=
1205-
github.com/rudderlabs/rudder-go-kit v0.63.5/go.mod h1:58UhsuOVycglmqOhfBHMppAowoX4ANrdUeugS8qG0EM=
1204+
github.com/rudderlabs/rudder-go-kit v0.64.0 h1:DuRDsgcn4bpEbh4/gPJ44mqgvjrjqzIJFfD4r8cm/o8=
1205+
github.com/rudderlabs/rudder-go-kit v0.64.0/go.mod h1:58UhsuOVycglmqOhfBHMppAowoX4ANrdUeugS8qG0EM=
12061206
github.com/rudderlabs/rudder-observability-kit v0.0.5 h1:s/+zsqdmpYG2LuWitFqQ2aIYFf67B7akJ3yxX4/KtXc=
12071207
github.com/rudderlabs/rudder-observability-kit v0.0.5/go.mod h1:rL0zi374TMMx6YHzFxYyPItjl90iOaKxy1fdFCcq2DQ=
12081208
github.com/rudderlabs/rudder-schemas v0.7.0 h1:hKShHYpbIldE1Q591vodI6iaAZ/IUOyC1DqUUJZysNU=

router/handle.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ type Handle struct {
7373
eventOrderHalfEnabledStateDuration config.ValueLoader[time.Duration]
7474
deliveryThrottlerTimeout config.ValueLoader[time.Duration]
7575
drainConcurrencyLimit config.ValueLoader[int]
76-
workerInputBufferSize int
76+
maxNoOfJobsPerChannel int // maximum capacity of each worker channel (hard capacity limit of the underlying go channel)
77+
noOfJobsPerChannel int // requested capacity of each worker channel (important when job buffering is being calculated using the standard method)
7778
saveDestinationResponse bool
7879
saveDestinationResponseOverride config.ValueLoader[bool]
7980
reportJobsdbPayload config.ValueLoader[bool]
@@ -151,7 +152,7 @@ func (rt *Handle) activePartitions(ctx context.Context) []string {
151152

152153
// pickup picks up jobs from the jobsDB for the provided partition and returns the number of jobs picked up and whether the limits were reached or not
153154
// picked up jobs are distributed to the workers
154-
func (rt *Handle) pickup(ctx context.Context, partition string, workers []*worker, pickupBatchSizeGauge stats.Gauge) (pickupCount int, limitsReached bool) {
155+
func (rt *Handle) pickup(ctx context.Context, partition string, workers []*worker, pickupBatchSizeGauge Gauge[int]) (pickupCount int, limitsReached bool) {
155156
// pickup limiter with dynamic priority
156157
start := time.Now()
157158
var discardedCount int
@@ -210,7 +211,7 @@ func (rt *Handle) pickup(ctx context.Context, partition string, workers []*worke
210211
}
211212

212213
type reservedJob struct {
213-
slot *workerSlot
214+
slot *reservedSlot
214215
job *jobsdb.JobT
215216
drainReason string
216217
parameters routerutils.JobParameters
@@ -599,7 +600,7 @@ func (rt *Handle) getQueryParams(partition string, pickUpCount int) jobsdb.GetQu
599600
}
600601

601602
type workerJobSlot struct {
602-
slot *workerSlot
603+
slot *reservedSlot
603604
drainReason string
604605
}
605606

router/handle_lifecycle.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,8 @@ func (rt *Handle) Setup(
106106
}
107107
rt.guaranteeUserEventOrder = getRouterConfigBool("guaranteeUserEventOrder", rt.destType, true)
108108
rt.noOfWorkers = getRouterConfigInt("noOfWorkers", destType, 64)
109-
rt.workerInputBufferSize = getRouterConfigInt("noOfJobsPerChannel", destType, 1000)
109+
rt.maxNoOfJobsPerChannel = getRouterConfigInt("maxNoOfJobsPerChannel", destType, 10000)
110+
rt.noOfJobsPerChannel = getRouterConfigInt("noOfJobsPerChannel", destType, 1000)
110111
// Explicitly control destination types for which we want to support batching
111112
// Avoiding stale configurations still having KAFKA batching enabled to cause issues with later versions of rudder-server
112113
batchingSupportedDestinations := config.GetStringSliceVar([]string{"AM"}, "Router.batchingSupportedDestinations")
@@ -333,6 +334,8 @@ func (rt *Handle) setupReloadableVars() {
333334
rt.reloadableConfig.failingJobsPenaltySleep = config.GetReloadableDurationVar(2000, time.Millisecond, getRouterConfigKeys("failingJobsPenaltySleep", rt.destType)...)
334335
rt.reloadableConfig.failingJobsPenaltyThreshold = config.GetReloadableFloat64Var(0.6, getRouterConfigKeys("failingJobsPenaltyThreshold", rt.destType)...)
335336
rt.reloadableConfig.oauthV2ExpirationTimeDiff = config.GetReloadableDurationVar(5, time.Minute, getRouterConfigKeys("oauth.expirationTimeDiff", rt.destType)...)
337+
rt.reloadableConfig.enableExperimentalBufferSizeCalculator = config.GetReloadableBoolVar(false, getRouterConfigKeys("enableExperimentalBufferSizeCalculator", rt.destType)...)
338+
rt.reloadableConfig.experimentalBufferSizeScalingFactor = config.GetReloadableFloat64Var(2.0, getRouterConfigKeys("experimentalBufferSizeScalingFactor", rt.destType)...)
336339
rt.diagnosisTickerTime = config.GetDurationVar(60, time.Second, "Diagnostics.routerTimePeriod", "Diagnostics.routerTimePeriodInS")
337340
rt.netClientTimeout = config.GetDurationVar(10, time.Second,
338341
"Router."+rt.destType+".httpTimeout",

router/partition_worker.go

Lines changed: 51 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99

1010
"github.com/rudderlabs/rudder-go-kit/logger"
1111
"github.com/rudderlabs/rudder-go-kit/stats"
12+
"github.com/rudderlabs/rudder-go-kit/stats/metric"
13+
kitsync "github.com/rudderlabs/rudder-go-kit/sync"
1214
"github.com/rudderlabs/rudder-server/utils/cache"
1315
"github.com/rudderlabs/rudder-server/utils/crash"
1416
"github.com/rudderlabs/rudder-server/utils/misc"
@@ -18,33 +20,63 @@ import (
1820
// A partition worker uses multiple workers internally to process the jobs that are being picked up asynchronously.
1921
func newPartitionWorker(ctx context.Context, rt *Handle, partition string) *partitionWorker {
2022
pw := &partitionWorker{
21-
logger: rt.logger.Child("p-" + partition),
22-
rt: rt,
23-
partition: partition,
24-
ctx: ctx,
23+
logger: rt.logger.Child("p-" + partition),
24+
rt: rt,
25+
partition: partition,
26+
ctx: ctx,
27+
pickupBatchSizeGauge: newGaugeWithLastValue[int](stats.Default.NewTaggedStat("router_pickup_batch_size_gauge", stats.GaugeType, stats.Tags{"destType": rt.destType, "partition": partition})),
2528
}
2629
pw.g, _ = errgroup.WithContext(context.Background())
2730
pw.workers = make([]*worker, rt.noOfWorkers)
31+
deliveryTimeStat := stats.Default.NewTaggedStat("router_delivery_time", stats.TimerType, stats.Tags{"destType": rt.destType})
32+
routerDeliveryLatencyStat := stats.Default.NewTaggedStat("router_delivery_latency", stats.TimerType, stats.Tags{"destType": rt.destType})
33+
routerProxyStat := stats.Default.NewTaggedStat("router_proxy_latency", stats.TimerType, stats.Tags{"destType": rt.destType})
34+
35+
bufferCapacityStat := stats.Default.NewTaggedStat("router_worker_buffer_capacity", stats.HistogramType, stats.Tags{"destType": rt.destType, "partition": partition})
36+
bufferSizeStat := stats.Default.NewTaggedStat("router_worker_buffer_size", stats.HistogramType, stats.Tags{"destType": rt.destType, "partition": partition})
37+
2838
for i := 0; i < rt.noOfWorkers; i++ {
2939
ctx, cancelFunc := context.WithCancel(context.Background())
40+
workLoopThroughput := metric.NewSimpleMovingAverage(20)
41+
workLoopThroughputStat := stats.Default.NewTaggedStat("router_worker_work_loop_throughput", stats.HistogramType, stats.Tags{"destType": rt.destType, "partition": partition})
3042
worker := &worker{
31-
logger: pw.logger.Child("w-" + strconv.Itoa(i)),
32-
partition: partition,
33-
id: i,
34-
ctx: ctx,
35-
cancelFunc: cancelFunc,
36-
inputCh: make(chan workerJob, rt.workerInputBufferSize),
43+
logger: pw.logger.Child("w-" + strconv.Itoa(i)),
44+
partition: partition,
45+
id: i,
46+
ctx: ctx,
47+
cancelFunc: cancelFunc,
48+
workerBuffer: newWorkerBuffer(
49+
rt.maxNoOfJobsPerChannel,
50+
newBufferSizeCalculatorSwitcher(
51+
rt.reloadableConfig.enableExperimentalBufferSizeCalculator,
52+
pw.pickupBatchSizeGauge,
53+
rt.noOfWorkers,
54+
rt.reloadableConfig.noOfJobsToBatchInAWorker,
55+
workLoopThroughput,
56+
rt.reloadableConfig.experimentalBufferSizeScalingFactor,
57+
rt.noOfJobsPerChannel,
58+
),
59+
&workerBufferStats{
60+
onceEvery: kitsync.NewOnceEvery(5 * time.Second),
61+
currentCapacity: bufferCapacityStat,
62+
currentSize: bufferSizeStat,
63+
}),
3764
barrier: rt.barrier,
3865
rt: rt,
39-
deliveryTimeStat: stats.Default.NewTaggedStat("router_delivery_time", stats.TimerType, stats.Tags{"destType": rt.destType}),
40-
routerDeliveryLatencyStat: stats.Default.NewTaggedStat("router_delivery_latency", stats.TimerType, stats.Tags{"destType": rt.destType}),
41-
routerProxyStat: stats.Default.NewTaggedStat("router_proxy_latency", stats.TimerType, stats.Tags{"destType": rt.destType}),
66+
deliveryTimeStat: deliveryTimeStat,
67+
routerDeliveryLatencyStat: routerDeliveryLatencyStat,
68+
routerProxyStat: routerProxyStat,
4269
deliveryLatencyStatsCache: cache.NewStatsCache(func(labels deliveryMetricLabels) stats.Measurement {
4370
return stats.Default.NewTaggedStat("transformer_outgoing_request_latency", stats.TimerType, labels.ToStatTags())
4471
}),
4572
deliveryCountStatsCache: cache.NewStatsCache(func(labels deliveryMetricLabels) stats.Measurement {
4673
return stats.Default.NewTaggedStat("transformer_outgoing_request_count", stats.CountType, labels.ToStatTags())
4774
}),
75+
workLoopThroughput: newSmaHistogram(
76+
workLoopThroughput,
77+
workLoopThroughputStat,
78+
kitsync.NewOnceEvery(10*time.Second),
79+
),
4880
}
4981
pw.workers[i] = worker
5082

@@ -66,9 +98,10 @@ type partitionWorker struct {
6698
partition string
6799

68100
// state
69-
ctx context.Context
70-
g *errgroup.Group // group against which all the workers are spawned
71-
workers []*worker // workers that are responsible for processing the jobs
101+
ctx context.Context
102+
g *errgroup.Group // group against which all the workers are spawned
103+
pickupBatchSizeGauge GaugeWithLastValue[int] // gauge to track the pickup batch size used in the last pickup iteration
104+
workers []*worker // workers that are responsible for processing the jobs
72105

73106
pickupCount int // number of jobs picked up by the workers in the last iteration
74107
limitsReached bool // whether the limits were reached in the last iteration
@@ -77,8 +110,7 @@ type partitionWorker struct {
77110
// Work picks up jobs for the partitioned worker and returns whether it worked or not
78111
func (pw *partitionWorker) Work() bool {
79112
start := time.Now()
80-
var pickupBatchSizeGauge stats.Gauge = stats.Default.NewTaggedStat("router_pickup_batch_size_gauge", stats.GaugeType, stats.Tags{"destType": pw.rt.destType, "partition": pw.partition})
81-
pw.pickupCount, pw.limitsReached = pw.rt.pickup(pw.ctx, pw.partition, pw.workers, pickupBatchSizeGauge)
113+
pw.pickupCount, pw.limitsReached = pw.rt.pickup(pw.ctx, pw.partition, pw.workers, pw.pickupBatchSizeGauge)
82114
// the following stats are used to track the total time taken for the pickup process and the number of jobs picked up
83115
stats.Default.NewTaggedStat("router_generator_loop", stats.TimerType, stats.Tags{"destType": pw.rt.destType}).Since(start)
84116
stats.Default.NewTaggedStat("router_generator_events", stats.CountType, stats.Tags{"destType": pw.rt.destType, "partition": pw.partition}).Count(pw.pickupCount)
@@ -101,7 +133,7 @@ func (pw *partitionWorker) SleepDurations() (min, max time.Duration) {
101133
func (pw *partitionWorker) Stop() {
102134
for _, worker := range pw.workers {
103135
worker.cancelFunc()
104-
close(worker.inputCh)
136+
worker.workerBuffer.Close()
105137
}
106138
_ = pw.g.Wait()
107139
}

router/router_test.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020

2121
"github.com/rudderlabs/rudder-go-kit/config"
2222
"github.com/rudderlabs/rudder-go-kit/logger"
23+
"github.com/rudderlabs/rudder-go-kit/stats/metric"
2324
"github.com/rudderlabs/rudder-server/admin"
2425
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
2526
"github.com/rudderlabs/rudder-server/enterprise/reporting"
@@ -262,7 +263,8 @@ func TestBackoff(t *testing.T) {
262263
logger: logger.NOP,
263264
backgroundCtx: context.Background(),
264265
noOfWorkers: 1,
265-
workerInputBufferSize: 3,
266+
maxNoOfJobsPerChannel: 3,
267+
noOfJobsPerChannel: 3,
266268
barrier: barrier,
267269
reloadableConfig: &reloadableConfig{
268270
maxFailedCountForJob: config.SingleValueLoader(3),
@@ -278,13 +280,14 @@ func TestBackoff(t *testing.T) {
278280
},
279281
}
280282
workers := []*worker{{
281-
logger: logger.NOP,
282-
inputCh: make(chan workerJob, 3),
283-
barrier: barrier,
283+
logger: logger.NOP,
284+
workerBuffer: newSimpleWorkerBuffer(3),
285+
barrier: barrier,
286+
workLoopThroughput: metric.NewSimpleMovingAverage(1),
284287
}}
285288
t.Run("eventorder disabled", func(t *testing.T) {
286289
r.guaranteeUserEventOrder = false
287-
workers[0].inputReservations = 0
290+
workers[0].workerBuffer = newSimpleWorkerBuffer(3)
288291

289292
slot, err := r.findWorkerSlot(context.Background(), workers, backoffJob, parameters, map[eventorder.BarrierKey]struct{}{})
290293
require.Nil(t, slot)
@@ -317,7 +320,7 @@ func TestBackoff(t *testing.T) {
317320

318321
t.Run("eventorder enabled", func(t *testing.T) {
319322
r.guaranteeUserEventOrder = true
320-
workers[0].inputReservations = 0
323+
workers[0].workerBuffer = newSimpleWorkerBuffer(3)
321324

322325
slot, err := r.findWorkerSlot(context.Background(), workers, backoffJob, parameters, map[eventorder.BarrierKey]struct{}{})
323326
require.Nil(t, slot)
@@ -353,7 +356,7 @@ func TestBackoff(t *testing.T) {
353356
t.Run("eventorder enabled with drain job", func(t *testing.T) {
354357
r.drainer = &drainer{drain: true, reason: "drain job due to some reason"}
355358
r.guaranteeUserEventOrder = true
356-
workers[0].inputReservations = 0
359+
workers[0].workerBuffer = newSimpleWorkerBuffer(3)
357360

358361
slot, err := r.findWorkerSlot(context.Background(), workers, backoffJob, parameters, map[eventorder.BarrierKey]struct{}{})
359362
require.NotNil(t, slot)
@@ -422,7 +425,7 @@ func TestBackoff(t *testing.T) {
422425

423426
t.Run("job not blocked after event ordering is disabled(destinationID level)", func(t *testing.T) {
424427
r.guaranteeUserEventOrder = true
425-
workers[0].inputReservations = 0
428+
workers[0].workerBuffer = newSimpleWorkerBuffer(3)
426429
job := &jobsdb.JobT{
427430
JobID: 1,
428431
Parameters: []byte(`{"destination_id": "destination"}`),
@@ -458,7 +461,7 @@ func TestBackoff(t *testing.T) {
458461

459462
t.Run("job not blocked after event ordering is disabled(workspaceID level)", func(t *testing.T) {
460463
r.guaranteeUserEventOrder = true
461-
workers[0].inputReservations = 0
464+
workers[0].workerBuffer = newSimpleWorkerBuffer(3)
462465
job := &jobsdb.JobT{
463466
JobID: 1,
464467
Parameters: []byte(`{"destination_id": "destination"}`),

router/types.go

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -47,28 +47,30 @@ type JobResponse struct {
4747
}
4848

4949
type reloadableConfig struct {
50-
jobQueryBatchSize config.ValueLoader[int]
51-
maxJobQueryBatchSize config.ValueLoader[int] // absolute max limit on job query batch size when adapting based on throttling limits
52-
updateStatusBatchSize config.ValueLoader[int]
53-
readSleep config.ValueLoader[time.Duration]
54-
maxStatusUpdateWait config.ValueLoader[time.Duration]
55-
minRetryBackoff config.ValueLoader[time.Duration]
56-
maxRetryBackoff config.ValueLoader[time.Duration]
57-
jobsBatchTimeout config.ValueLoader[time.Duration]
58-
failingJobsPenaltyThreshold config.ValueLoader[float64]
59-
failingJobsPenaltySleep config.ValueLoader[time.Duration]
60-
noOfJobsToBatchInAWorker config.ValueLoader[int]
61-
jobsDBCommandTimeout config.ValueLoader[time.Duration]
62-
jobdDBMaxRetries config.ValueLoader[int]
63-
maxFailedCountForJob config.ValueLoader[int]
64-
maxFailedCountForSourcesJob config.ValueLoader[int]
65-
payloadLimit config.ValueLoader[int64]
66-
retryTimeWindow config.ValueLoader[time.Duration]
67-
sourcesRetryTimeWindow config.ValueLoader[time.Duration]
68-
pickupFlushInterval config.ValueLoader[time.Duration]
69-
maxDSQuerySize config.ValueLoader[int]
70-
transformerProxy config.ValueLoader[bool]
71-
skipRtAbortAlertForTransformation config.ValueLoader[bool] // represents if event delivery(via transformerProxy) should be alerted via router-aborted-count alert def
72-
skipRtAbortAlertForDelivery config.ValueLoader[bool] // represents if transformation(router or batch) should be alerted via router-aborted-count alert def
73-
oauthV2ExpirationTimeDiff config.ValueLoader[time.Duration]
50+
jobQueryBatchSize config.ValueLoader[int]
51+
maxJobQueryBatchSize config.ValueLoader[int] // absolute max limit on job query batch size when adapting based on throttling limits
52+
updateStatusBatchSize config.ValueLoader[int]
53+
readSleep config.ValueLoader[time.Duration]
54+
maxStatusUpdateWait config.ValueLoader[time.Duration]
55+
minRetryBackoff config.ValueLoader[time.Duration]
56+
maxRetryBackoff config.ValueLoader[time.Duration]
57+
jobsBatchTimeout config.ValueLoader[time.Duration]
58+
failingJobsPenaltyThreshold config.ValueLoader[float64]
59+
failingJobsPenaltySleep config.ValueLoader[time.Duration]
60+
noOfJobsToBatchInAWorker config.ValueLoader[int]
61+
jobsDBCommandTimeout config.ValueLoader[time.Duration]
62+
jobdDBMaxRetries config.ValueLoader[int]
63+
maxFailedCountForJob config.ValueLoader[int]
64+
maxFailedCountForSourcesJob config.ValueLoader[int]
65+
payloadLimit config.ValueLoader[int64]
66+
retryTimeWindow config.ValueLoader[time.Duration]
67+
sourcesRetryTimeWindow config.ValueLoader[time.Duration]
68+
pickupFlushInterval config.ValueLoader[time.Duration]
69+
maxDSQuerySize config.ValueLoader[int]
70+
transformerProxy config.ValueLoader[bool]
71+
skipRtAbortAlertForTransformation config.ValueLoader[bool] // represents if event delivery(via transformerProxy) should be alerted via router-aborted-count alert def
72+
skipRtAbortAlertForDelivery config.ValueLoader[bool] // represents if transformation(router or batch) should be alerted via router-aborted-count alert def
73+
oauthV2ExpirationTimeDiff config.ValueLoader[time.Duration]
74+
enableExperimentalBufferSizeCalculator config.ValueLoader[bool] // whether to use the experimental worker buffer size calculator or not
75+
experimentalBufferSizeScalingFactor config.ValueLoader[float64] // scaling factor to scale up the buffer size in the experimental calculator
7476
}

0 commit comments

Comments
 (0)