Skip to content

Commit 50062ff

Browse files
authored
[cmd/opampsupervisor] Use bufio.Reader instead of bufio.Scanner to avoid buffer limits (#44310)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description When passthrough logging is enabled, the supervisor reads in collector output and then writes it to its own logger instead of a dedicated log file. An error can occur when a single collector log is too big (>64kb) and cause the scanner to fail. Instead we'll use a reader splitting logs based on newlines to avoid issues with logs being too large. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes #44127 <!--Describe what testing was performed and which tests were added.--> #### Testing Hard to trigger with a unit test. Can verify using the hostmetrics receiver. I was able to trigger the bug when the hostmetrics receiver tried reading process metrics on my machine. This single log was greater than 64kb. On the previous version I saw the error `bufio.Scanner: token too long` but on this branch it prints the log no issue. <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.-->
1 parent d7bca31 commit 50062ff

File tree

5 files changed

+204
-34
lines changed

5 files changed

+204
-34
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
7+
component: cmd/opampsupervisor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Fix supervisor passthrough logs overflow by using bufio.Reader instead of bufio.Scanner
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [44127]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

cmd/opampsupervisor/e2e_test.go

Lines changed: 48 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1369,6 +1369,40 @@ func createHealthCheckCollectorConf(t *testing.T, nopPipeline bool) (cfg *bytes.
13691369
return &confmapBuf, h[:], 13133
13701370
}
13711371

1372+
func createHostMetricsCollectorConf(t *testing.T) (*bytes.Buffer, []byte) {
1373+
wd, err := os.Getwd()
1374+
require.NoError(t, err)
1375+
1376+
// Create output files
1377+
// The testing package will automatically clean these up after each test.
1378+
tempDir := t.TempDir()
1379+
outputFile, err := os.CreateTemp(tempDir, "output_*.json")
1380+
require.NoError(t, err)
1381+
t.Cleanup(func() { outputFile.Close() })
1382+
1383+
colCfgTpl, err := os.ReadFile(path.Join(wd, "testdata", "collector", "hostmetrics_pipeline.yaml"))
1384+
require.NoError(t, err)
1385+
1386+
templ, err := template.New("").Parse(string(colCfgTpl))
1387+
require.NoError(t, err)
1388+
1389+
var confmapBuf bytes.Buffer
1390+
err = templ.Execute(
1391+
&confmapBuf,
1392+
map[string]string{
1393+
"outputLogFile": outputFile.Name(),
1394+
},
1395+
)
1396+
require.NoError(t, err)
1397+
1398+
h := sha256.New()
1399+
if _, err := io.Copy(h, bytes.NewBuffer(confmapBuf.Bytes())); err != nil {
1400+
log.Fatal(err)
1401+
}
1402+
1403+
return &confmapBuf, h.Sum(nil)
1404+
}
1405+
13721406
// Wait for the Supervisor to connect to or disconnect from the OpAMP server
13731407
func waitForSupervisorConnection(connection chan bool, connected bool) {
13741408
select {
@@ -1978,7 +2012,7 @@ func TestSupervisorLogging(t *testing.T) {
19782012
storageDir := t.TempDir()
19792013
remoteCfgFilePath := filepath.Join(storageDir, "last_recv_remote_config.dat")
19802014

1981-
collectorCfg, hash, _, _ := createSimplePipelineCollectorConf(t)
2015+
collectorCfg, hash := createHostMetricsCollectorConf(t)
19822016
remoteCfgProto := &protobufs.AgentRemoteConfig{
19832017
Config: &protobufs.AgentConfigMap{
19842018
ConfigMap: map[string]*protobufs.AgentConfigFile{
@@ -1998,15 +2032,16 @@ func TestSupervisorLogging(t *testing.T) {
19982032
},
19992033
})
20002034
defer server.shutdown()
2035+
server.start()
20012036

2037+
// manually create supervisor and logger for this test
20022038
supervisorLogFilePath := filepath.Join(storageDir, "supervisor_log.log")
20032039
cfgFile := getSupervisorConfig(t, "logging", map[string]string{
20042040
"url": server.addr,
20052041
"storage_dir": storageDir,
20062042
"log_level": "0",
20072043
"log_file": supervisorLogFilePath,
20082044
})
2009-
20102045
cfg, err := config.Load(cfgFile.Name())
20112046
require.NoError(t, err)
20122047
logger, err := telemetry.NewLogger(cfg.Telemetry.Logs)
@@ -2016,23 +2051,27 @@ func TestSupervisorLogging(t *testing.T) {
20162051
require.NoError(t, err)
20172052
require.Nil(t, s.Start(t.Context()))
20182053

2019-
// Start the server and wait for the supervisor to connect
2020-
server.start()
20212054
waitForSupervisorConnection(server.supervisorConnected, true)
20222055
require.True(t, connected.Load(), "Supervisor failed to connect")
2023-
2056+
// give the collector some time to write to the log file
2057+
time.Sleep(5 * time.Second)
20242058
s.Shutdown()
20252059

20262060
// Read from log file checking for Info level logs
20272061
logFile, err := os.Open(supervisorLogFilePath)
20282062
require.NoError(t, err)
20292063

2030-
scanner := bufio.NewScanner(logFile)
2064+
reader := bufio.NewReader(logFile)
20312065
seenCollectorLog := false
2032-
for scanner.Scan() {
2033-
line := scanner.Bytes()
2066+
for {
2067+
line, err := reader.ReadString('\n')
2068+
if err != nil {
2069+
break
2070+
}
2071+
line = strings.TrimRight(line, "\r\n")
2072+
20342073
var log logEntry
2035-
err := json.Unmarshal(line, &log)
2074+
err = json.Unmarshal([]byte(line), &log)
20362075
require.NoError(t, err)
20372076

20382077
level, err := zapcore.ParseLevel(log.Level)

cmd/opampsupervisor/supervisor/commander/commander.go

Lines changed: 70 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@ import (
88
"context"
99
"errors"
1010
"fmt"
11+
"io"
1112
"os"
1213
"os/exec"
1314
"path/filepath"
1415
"slices"
16+
"strings"
1517
"sync/atomic"
1618
"syscall"
1719
"time"
@@ -157,24 +159,42 @@ func (c *Commander) startWithPassthroughLogging() error {
157159

158160
// capture agent output
159161
go func() {
160-
scanner := bufio.NewScanner(stdoutPipe)
161-
for scanner.Scan() {
162-
line := scanner.Text()
162+
reader := bufio.NewReader(stdoutPipe)
163+
for {
164+
line, err := reader.ReadString('\n')
165+
if err != nil {
166+
if err != io.EOF {
167+
c.logger.Error("Error reading agent stdout", zap.Error(err))
168+
}
169+
// Trim and log the last line if it exists
170+
if line != "" {
171+
line = strings.TrimRight(line, "\r\n")
172+
colLogger.Info(line)
173+
}
174+
break
175+
}
176+
line = strings.TrimRight(line, "\r\n")
163177
colLogger.Info(line)
164178
}
165-
if err := scanner.Err(); err != nil {
166-
c.logger.Error("Error reading agent stdout: %w", zap.Error(err))
167-
}
168179
}()
169180
go func() {
170-
scanner := bufio.NewScanner(stderrPipe)
171-
for scanner.Scan() {
172-
line := scanner.Text()
181+
reader := bufio.NewReader(stderrPipe)
182+
for {
183+
line, err := reader.ReadString('\n')
184+
if err != nil {
185+
if err != io.EOF {
186+
c.logger.Error("Error reading agent stderr", zap.Error(err))
187+
}
188+
// Trim and log the last line if it exists
189+
if line != "" {
190+
line = strings.TrimRight(line, "\r\n")
191+
colLogger.Error(line)
192+
}
193+
break
194+
}
195+
line = strings.TrimRight(line, "\r\n")
173196
colLogger.Error(line)
174197
}
175-
if err := scanner.Err(); err != nil {
176-
c.logger.Error("Error reading agent stderr: %w", zap.Error(err))
177-
}
178198
}()
179199

180200
c.logger.Debug("Agent process started", zap.Int("pid", c.cmd.Process.Pid))
@@ -226,24 +246,50 @@ func (c *Commander) StartOneShot() ([]byte, []byte, error) {
226246
}
227247
// capture agent output
228248
go func() {
229-
scanner := bufio.NewScanner(stdoutPipe)
230-
for scanner.Scan() {
231-
stdout = append(stdout, scanner.Bytes()...)
249+
reader := bufio.NewReader(stdoutPipe)
250+
for {
251+
line, err := reader.ReadString('\n')
252+
if err != nil {
253+
if err != io.EOF {
254+
c.logger.Error("Error reading agent stdout", zap.Error(err))
255+
}
256+
// Trim and append the last line if it exists
257+
// Normalize line endings to \n
258+
if line != "" {
259+
line = strings.TrimRight(line, "\r\n")
260+
stdout = append(stdout, []byte(line)...)
261+
stdout = append(stdout, byte('\n'))
262+
}
263+
break
264+
}
265+
// Normalize line endings to \n
266+
line = strings.TrimRight(line, "\r\n")
267+
stdout = append(stdout, []byte(line)...)
232268
stdout = append(stdout, byte('\n'))
233269
}
234-
if err := scanner.Err(); err != nil {
235-
c.logger.Error("Error reading agent stdout: %w", zap.Error(err))
236-
}
237270
}()
238271
go func() {
239-
scanner := bufio.NewScanner(stderrPipe)
240-
for scanner.Scan() {
241-
stderr = append(stderr, scanner.Bytes()...)
272+
reader := bufio.NewReader(stderrPipe)
273+
for {
274+
line, err := reader.ReadString('\n')
275+
if err != nil {
276+
if err != io.EOF {
277+
c.logger.Error("Error reading agent stderr", zap.Error(err))
278+
}
279+
// Trim and append the last line if it exists
280+
// Normalize line endings to \n
281+
if line != "" {
282+
line = strings.TrimRight(line, "\r\n")
283+
stderr = append(stderr, []byte(line)...)
284+
stderr = append(stderr, byte('\n'))
285+
}
286+
break
287+
}
288+
// Normalize line endings to \n
289+
line = strings.TrimRight(line, "\r\n")
290+
stderr = append(stderr, []byte(line)...)
242291
stderr = append(stderr, byte('\n'))
243292
}
244-
if err := scanner.Err(); err != nil {
245-
c.logger.Error("Error reading agent stderr: %w", zap.Error(err))
246-
}
247293
}()
248294

249295
c.logger.Debug("Agent process started", zap.Int("pid", cmd.Process.Pid))
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
receivers:
2+
hostmetrics:
3+
collection_interval: 1s
4+
scrapers:
5+
process:
6+
metrics:
7+
process.threads:
8+
enabled: true
9+
process.signals_pending:
10+
enabled: true
11+
process.paging.faults:
12+
enabled: true
13+
process.memory.utilization:
14+
enabled: true
15+
process.memory.usage:
16+
enabled: true
17+
process.open_file_descriptors:
18+
enabled: true
19+
process.memory.virtual:
20+
enabled: true
21+
process.handles:
22+
enabled: true
23+
process.disk.operations:
24+
enabled: true
25+
process.context_switches:
26+
enabled: true
27+
process.disk.io:
28+
enabled: true
29+
process.cpu.utilization:
30+
enabled: true
31+
process.cpu.time:
32+
enabled: true
33+
mute_process_name_error: true
34+
mute_process_exe_error: true
35+
mute_process_io_error: true
36+
mute_process_user_error: true
37+
mute_process_cgroup_error: true
38+
39+
exporters:
40+
file:
41+
path: {{.outputLogFile}}
42+
43+
extensions:
44+
health_check:
45+
endpoint: "localhost:13133"
46+
47+
service:
48+
extensions: [health_check]
49+
pipelines:
50+
logs:
51+
receivers: [hostmetrics]
52+
exporters: [file]
53+
telemetry:
54+
logs:
55+
level: debug

cmd/opampsupervisor/testdata/supervisor/supervisor_logging.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,7 @@ agent:
2121
telemetry:
2222
logs:
2323
level: {{.log_level}} # info level logs
24-
output_paths: ['{{.log_file}}']
24+
# keep stdout so we can see logs in test output
25+
output_paths:
26+
- '{{.log_file}}'
27+
- 'stdout'

0 commit comments

Comments
 (0)