Skip to content

Commit ab90a2c

Browse files
committed
refactor(adaptive_throttler, resource_monitor): enhance configuration documentation and improve code clarity
1 parent 60b20a4 commit ab90a2c

File tree

2 files changed

+125
-80
lines changed

2 files changed

+125
-80
lines changed

flow/adaptive_throttler.go

Lines changed: 110 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -20,32 +20,24 @@ const (
2020

2121
// AdaptiveThrottlerConfig configures the adaptive throttler behavior
2222
type AdaptiveThrottlerConfig struct {
23-
// Resource thresholds (0-100 percentage)
23+
// Resource monitoring configuration
24+
//
25+
// These settings control how the throttler monitors system resources
26+
// and determines when to throttle throughput.
27+
28+
// MaxMemoryPercent is the maximum memory usage threshold (0-100 percentage).
29+
// When memory usage exceeds this threshold, throughput will be reduced.
2430
MaxMemoryPercent float64
25-
MaxCPUPercent float64
2631

27-
// Throughput bounds (elements per second)
28-
MinThroughput int
29-
MaxThroughput int
32+
// MaxCPUPercent is the maximum CPU usage threshold (0-100 percentage).
33+
// When CPU usage exceeds this threshold, throughput will be reduced.
34+
MaxCPUPercent float64
3035

31-
// How often to sample resources
36+
// SampleInterval is how often to sample system resources.
37+
// More frequent sampling provides faster response but increases CPU overhead.
3238
SampleInterval time.Duration
3339

34-
// Buffer size to hold incoming elements
35-
BufferSize int
36-
MaxBufferSize int
37-
38-
// Adaptation parameters (How aggressively to adapt. 0.1 = slow, 0.5 = fast).
39-
//
40-
// Allowed values: 0.0 to 1.0
41-
AdaptationFactor float64
42-
43-
// Rate transition smoothing.
44-
//
45-
// If true, the throughput rate will be smoothed over time to avoid abrupt changes.
46-
SmoothTransitions bool
47-
48-
// CPU usage sampling mode.
40+
// CPUUsageMode controls how CPU usage is measured.
4941
//
5042
// CPUUsageModeHeuristic: Estimates CPU usage using a simple heuristic (goroutine count),
5143
// suitable for platforms where accurate process CPU measurement is not supported.
@@ -54,40 +46,101 @@ type AdaptiveThrottlerConfig struct {
5446
// (when supported), providing more accurate CPU usage readings.
5547
CPUUsageMode CPUUsageMode
5648

57-
// Hysteresis buffer to prevent rapid state changes (percentage points).
58-
// Requires this much additional headroom before increasing rate.
59-
// Default: 5.0
60-
HysteresisBuffer float64
61-
62-
// Maximum rate change factor per adaptation cycle (0.0-1.0).
63-
// Limits how much the rate can change in a single step to prevent instability.
64-
// Default: 0.3 (max 30% change per cycle)
65-
MaxRateChangeFactor float64
66-
6749
// MemoryReader is a user-provided custom function that returns memory usage percentage.
6850
// This can be particularly useful for containerized deployments or other environments
6951
// where standard system memory readings may not accurately reflect container-specific
7052
// usage.
7153
// If nil, system memory will be read via mem.VirtualMemory().
7254
// Must return memory used percentage (0-100).
7355
MemoryReader func() (float64, error)
56+
57+
// Throughput bounds (in elements per second)
58+
//
59+
// These settings define the minimum and maximum throughput rates.
60+
61+
// MinThroughput is the minimum throughput in elements per second.
62+
// The throttler will never reduce throughput below this value.
63+
MinThroughput int
64+
65+
// MaxThroughput is the maximum throughput in elements per second.
66+
// The throttler will never increase throughput above this value.
67+
MaxThroughput int
68+
69+
// Buffer configuration
70+
//
71+
// These settings control the internal buffering of elements.
72+
73+
// BufferSize is the initial buffer size in number of elements.
74+
// This buffer holds incoming elements when throughput is throttled.
75+
BufferSize int
76+
77+
// MaxBufferSize is the maximum buffer size in number of elements.
78+
// This prevents unbounded memory allocation during sustained throttling.
79+
MaxBufferSize int
80+
81+
// Adaptation behavior
82+
//
83+
// These settings control how aggressively and smoothly the throttler
84+
// adapts to changing resource conditions.
85+
86+
// AdaptationFactor controls how aggressively the throttler adapts (0.0-1.0).
87+
// Lower values (e.g., 0.1) result in slower, more conservative adaptation.
88+
// Higher values (e.g., 0.5) result in faster, more aggressive adaptation.
89+
AdaptationFactor float64
90+
91+
// SmoothTransitions enables rate transition smoothing.
92+
// If true, the throughput rate will be smoothed over time to avoid abrupt changes.
93+
// This helps prevent oscillations and provides more stable behavior.
94+
SmoothTransitions bool
95+
96+
// HysteresisBuffer prevents rapid state changes (in percentage points).
97+
// Requires this much additional headroom before increasing rate.
98+
// This prevents oscillations around resource thresholds.
99+
HysteresisBuffer float64
100+
101+
// MaxRateChangeFactor limits the maximum rate change per adaptation cycle (0.0-1.0).
102+
// Limits how much the rate can change in a single step to prevent instability.
103+
MaxRateChangeFactor float64
74104
}
75105

76-
// DefaultAdaptiveThrottlerConfig returns sensible defaults for most use cases
106+
// DefaultAdaptiveThrottlerConfig returns sensible defaults for most use cases.
107+
//
108+
// Default configuration parameters:
109+
//
110+
// Resource Monitoring:
111+
// - MaxMemoryPercent: 80.0% - Conservative memory threshold to prevent OOM
112+
// - MaxCPUPercent: 70.0% - Conservative CPU threshold to maintain responsiveness
113+
// - SampleInterval: 200ms - Balanced sampling frequency to minimize overhead
114+
// - CPUUsageMode: CPUUsageModeMeasured - Uses native process CPU measurement
115+
// - MemoryReader: nil - Uses system memory via mem.VirtualMemory()
116+
//
117+
// Throughput Bounds:
118+
// - MinThroughput: 10 elements/second - Ensures minimum processing rate
119+
// - MaxThroughput: 500 elements/second - Conservative maximum for stability
120+
//
121+
// Buffer Configuration:
122+
// - BufferSize: 500 elements - Matches max throughput for 1 second buffer at max rate
123+
// - MaxBufferSize: 10,000 elements - Prevents unbounded memory allocation
124+
//
125+
// Adaptation Behavior:
126+
// - AdaptationFactor: 0.15 - Conservative adaptation speed (15% adjustment per cycle)
127+
// - SmoothTransitions: true - Enables rate smoothing to avoid abrupt changes
128+
// - HysteresisBuffer: 5.0% - Prevents oscillations around resource thresholds
129+
// - MaxRateChangeFactor: 0.3 - Limits rate changes to 30% per cycle for stability
77130
func DefaultAdaptiveThrottlerConfig() *AdaptiveThrottlerConfig {
78131
return &AdaptiveThrottlerConfig{
79-
MaxMemoryPercent: 80.0, // Conservative memory threshold
80-
MaxCPUPercent: 70.0, // Conservative CPU threshold
81-
MinThroughput: 10, // Reasonable minimum throughput
82-
MaxThroughput: 500, // More conservative maximum
83-
SampleInterval: 200 * time.Millisecond, // Less frequent sampling
84-
BufferSize: 500, // Match max throughput for 1 second buffer at max rate
85-
MaxBufferSize: 10000, // Reasonable maximum to prevent unbounded memory allocation
86-
AdaptationFactor: 0.15, // Slightly more conservative adaptation
87-
SmoothTransitions: true, // Keep smooth transitions enabled by default
88-
CPUUsageMode: CPUUsageModeMeasured, // Use actual process CPU usage natively
89-
HysteresisBuffer: 5.0, // Prevent oscillations around threshold
90-
MaxRateChangeFactor: 0.3, // More conservative rate changes
132+
MaxMemoryPercent: 80.0,
133+
MaxCPUPercent: 70.0,
134+
MinThroughput: 10,
135+
MaxThroughput: 500,
136+
SampleInterval: 200 * time.Millisecond,
137+
BufferSize: 500,
138+
MaxBufferSize: 10000,
139+
AdaptationFactor: 0.15,
140+
SmoothTransitions: true,
141+
CPUUsageMode: CPUUsageModeMeasured,
142+
HysteresisBuffer: 5.0,
143+
MaxRateChangeFactor: 0.3,
91144
}
92145
}
93146

@@ -115,24 +168,24 @@ type AdaptiveThrottler struct {
115168
config AdaptiveThrottlerConfig
116169
monitor resourceMonitor
117170

118-
// Current rate (elements per second)
171+
// Current throughput rate in elements per second
119172
currentRate atomic.Int64
120173

121-
// Rate control
122-
period time.Duration // Calculated from currentRate
123-
maxElements atomic.Int64 // Elements per period
124-
counter atomic.Int64
174+
// Rate control: period-based quota enforcement
175+
period time.Duration // Time period for quota calculation (typically 1 second)
176+
maxElements atomic.Int64 // Maximum elements allowed per period
177+
counter atomic.Int64 // Current element count in the period
125178

126-
// Channels
127-
in chan any
128-
out chan any
129-
quotaSignal chan struct{}
130-
done chan struct{}
179+
// Communication channels
180+
in chan any // Input channel for incoming elements
181+
out chan any // Output channel for throttled elements
182+
quotaSignal chan struct{} // Signal channel to notify when quota resets
183+
done chan struct{} // Shutdown signal channel
131184

132-
// Rate adaptation
133-
lastAdaptation time.Time
185+
// Rate adaptation tracking
186+
lastAdaptation time.Time // Timestamp of last rate adaptation
134187

135-
stopOnce sync.Once
188+
stopOnce sync.Once // Ensures cleanup happens only once
136189
}
137190

138191
var _ streams.Flow = (*AdaptiveThrottler)(nil)

flow/resource_monitor.go

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -31,25 +31,23 @@ type ResourceStats struct {
3131

3232
// ResourceMonitor monitors system resources and provides current statistics
3333
type ResourceMonitor struct {
34-
sampleInterval time.Duration
35-
memoryThreshold float64
36-
cpuThreshold float64
37-
cpuMode CPUUsageMode
34+
// Configuration thresholds
35+
sampleInterval time.Duration // How often to sample system resources
36+
memoryThreshold float64 // Memory usage threshold (0-100 percentage)
37+
cpuThreshold float64 // CPU usage threshold (0-100 percentage)
38+
cpuMode CPUUsageMode // CPU usage measurement mode
3839

39-
// Current stats (atomic for thread-safe reads)
40+
// Current resource statistics
4041
stats atomic.Pointer[ResourceStats]
4142

42-
// CPU sampling
43-
sampler sysmonitor.ProcessCPUSampler
43+
// Resource sampling components
44+
sampler sysmonitor.ProcessCPUSampler // CPU usage sampler
45+
memStats runtime.MemStats // Reusable buffer for memory statistics
46+
memoryReader func() (float64, error) // Custom memory reader for containerized deployments
4447

45-
// Reusable buffer for memory stats
46-
memStats runtime.MemStats
47-
48-
// Memory reader for containerized deployments
49-
memoryReader func() (float64, error)
50-
51-
mu sync.RWMutex
52-
done chan struct{}
48+
// Synchronization and lifecycle
49+
closeOnce sync.Once // Ensures cleanup happens only once
50+
done chan struct{} // Shutdown signal channel
5351
}
5452

5553
// NewResourceMonitor creates a new resource monitor.
@@ -255,15 +253,9 @@ func (rm *ResourceMonitor) monitor() {
255253

256254
// Close stops the resource monitor
257255
func (rm *ResourceMonitor) Close() {
258-
rm.mu.Lock()
259-
defer rm.mu.Unlock()
260-
261-
select {
262-
case <-rm.done:
263-
return
264-
default:
256+
rm.closeOnce.Do(func() {
265257
close(rm.done)
266-
}
258+
})
267259
}
268260

269261
// clampPercent clamps a percentage value between 0 and 100.

0 commit comments

Comments
 (0)