Skip to content

Commit fd5cd32

Browse files
harp-intelCopilot
andauthored
refactor metrics event processing (#456)
* change event processing from timer based to event interval based Signed-off-by: Harper, Jason M <[email protected]> * remove nil code Signed-off-by: Harper, Jason M <[email protected]> * clean up error log Signed-off-by: Harper, Jason M <[email protected]> * Add extractInterval function and corresponding tests for JSON parsing Signed-off-by: Harper, Jason M <[email protected]> * const for initial batch size estimate Co-authored-by: Copilot <[email protected]> * swap slice approach Signed-off-by: Harper, Jason M <[email protected]> * reduce copies Signed-off-by: Harper, Jason M <[email protected]> * fix: correct buffer source in runLocalCommandWithInputWithTimeoutAsync Signed-off-by: Harper, Jason M <[email protected]> --------- Signed-off-by: Harper, Jason M <[email protected]> Co-authored-by: Copilot <[email protected]>
1 parent 1ec66dd commit fd5cd32

File tree

8 files changed

+218
-69
lines changed

8 files changed

+218
-69
lines changed

cmd/metrics/event_frame.go

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package metrics
66
// Linux perf event output, i.e., from 'perf stat' parsing and processing helper functions
77

88
import (
9+
"bytes"
910
"encoding/json"
1011
"fmt"
1112
"log/slog"
@@ -233,9 +234,6 @@ func coalesceEvents(allEvents []Event, scope string, granularity string, metadat
233234
}
234235
} else {
235236
numCPUs = metadata.SocketCount * metadata.CoresPerSocket * metadata.ThreadsPerCore
236-
if err != nil {
237-
return nil, fmt.Errorf("failed to parse cpu range: %w", err)
238-
}
239237
cpuMap = make(map[int]int, numCPUs)
240238
for i := 0; i < numCPUs; i++ {
241239
cpuMap[i] = i
@@ -404,7 +402,7 @@ func collapseUncoreGroups(inGroups []EventGroup, firstIdx int, count int) (outGr
404402
func parseEventJSON(rawEvent []byte) (Event, error) {
405403
var event Event
406404
if err := json.Unmarshal(rawEvent, &event); err != nil {
407-
err = fmt.Errorf("unrecognized event format: \"%s\"", rawEvent)
405+
err = fmt.Errorf("unrecognized event format")
408406
return event, err
409407
}
410408
if !strings.Contains(event.CounterValue, "not counted") && !strings.Contains(event.CounterValue, "not supported") {
@@ -416,3 +414,41 @@ func parseEventJSON(rawEvent []byte) (Event, error) {
416414
}
417415
return event, nil
418416
}
417+
418+
// extractInterval parses the interval value from a JSON perf event line
419+
// Returns the interval as a float64, or -1 if parsing fails
420+
func extractInterval(line []byte) float64 {
421+
// Look for the interval field in the JSON: "interval" : 5.005073756
422+
intervalPattern := []byte(`"interval" : `)
423+
intervalStart := bytes.Index(line, intervalPattern)
424+
if intervalStart == -1 {
425+
return -1
426+
}
427+
428+
// Move to the start of the number
429+
intervalStart += len(intervalPattern)
430+
if intervalStart >= len(line) {
431+
return -1
432+
}
433+
434+
// Find the end of the number (comma, space, or closing brace)
435+
intervalEnd := intervalStart
436+
for intervalEnd < len(line) {
437+
ch := line[intervalEnd]
438+
if ch == ',' || ch == ' ' || ch == '}' {
439+
break
440+
}
441+
intervalEnd++
442+
}
443+
if intervalEnd == intervalStart {
444+
return -1
445+
}
446+
447+
// Parse the number directly from bytes
448+
interval, err := strconv.ParseFloat(string(line[intervalStart:intervalEnd]), 64)
449+
if err != nil {
450+
return -1
451+
}
452+
453+
return interval
454+
}

cmd/metrics/event_frame_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package metrics
2+
3+
// Copyright (C) 2021-2025 Intel Corporation
4+
// SPDX-License-Identifier: BSD-3-Clause
5+
6+
import "testing"
7+
8+
func TestExtractInterval(t *testing.T) {
9+
tests := []struct {
10+
name string
11+
line []byte
12+
want float64
13+
}{
14+
{
15+
name: "ValidJSON",
16+
line: []byte(`{"interval" : 5.005073756, "cpu": "0"}`),
17+
want: 5.005073756,
18+
},
19+
{
20+
name: "ValidJSONWithSpaces",
21+
line: []byte(`{ "interval" : 42.12345 }`),
22+
want: 42.12345,
23+
},
24+
{
25+
name: "MissingInterval",
26+
line: []byte(`{"cpu": "0"}`),
27+
want: -1,
28+
},
29+
{
30+
name: "EmptyLine",
31+
line: []byte(``),
32+
want: -1,
33+
},
34+
{
35+
name: "InvalidNumber",
36+
line: []byte(`{"interval" : not_a_number, "cpu": "0"}`),
37+
want: -1,
38+
},
39+
{
40+
name: "IntervalAtEnd",
41+
line: []byte(`{"interval" : 123.456}`),
42+
want: 123.456,
43+
},
44+
{
45+
name: "IntervalWithTrailingSpace",
46+
line: []byte(`{"interval" : 77.88 , "cpu": "0"}`),
47+
want: 77.88,
48+
},
49+
}
50+
51+
for _, tt := range tests {
52+
t.Run(tt.name, func(t *testing.T) {
53+
got := extractInterval(tt.line)
54+
if got != tt.want {
55+
t.Errorf("extractInterval() = %v, want %v", got, tt.want)
56+
}
57+
})
58+
}
59+
}

cmd/metrics/metrics.go

Lines changed: 106 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1383,8 +1383,8 @@ func collectOnTarget(targetContext *targetContext, localTempDir string, localOut
13831383
func runPerf(myTarget target.Target, noRoot bool, processes []Process, cmd *exec.Cmd, eventGroupDefinitions []GroupDefinition, metricDefinitions []MetricDefinition, metadata Metadata, localTempDir string, outputDir string, frameChannel chan []MetricFrame, errorChannel chan error) {
13841384
// start perf
13851385
perfCommand := strings.Join(cmd.Args, " ")
1386-
stdoutChannel := make(chan string)
1387-
stderrChannel := make(chan string)
1386+
stdoutChannel := make(chan []byte)
1387+
stderrChannel := make(chan []byte)
13881388
exitcodeChannel := make(chan int)
13891389
scriptErrorChannel := make(chan error)
13901390
cmdChannel := make(chan *exec.Cmd)
@@ -1421,14 +1421,9 @@ func runPerf(myTarget target.Target, noRoot bool, processes []Process, cmd *exec
14211421
}
14221422
}
14231423
// Start a goroutine to wait for and then process perf output
1424-
// Use a timer to determine when we received an entire frame of events from perf
1425-
// The timer will expire when no lines (events) have been received from perf for more than 100ms. This
1426-
// works because perf writes the events to stderr in a burst every collection interval, e.g., 5 seconds.
1427-
// When the timer expires, this code assumes that perf is done writing events to stderr.
1428-
const perfEventWaitTime = time.Duration(100 * time.Millisecond) // 100ms is somewhat arbitrary, but is long enough for perf to print a frame of events
1429-
perfOutputTimer := time.NewTimer(time.Duration(2 * flagPerfPrintInterval * 1000)) // #nosec G115
1424+
const perfEventWaitTime = time.Duration(100 * time.Millisecond) // fallback timer for final interval
14301425
perfProcessingContext, cancelPerfProcessing := context.WithCancel(context.Background())
1431-
outputLines := make([][]byte, 0)
1426+
intervalBatchChannel := make(chan [][]byte, 10) // channel to send interval batches (all events for a particular interval) for processing
14321427
donePerfProcessingChannel := make(chan struct{}) // channel to wait for processPerfOutput to finish
14331428
go processPerfOutput(
14341429
perfProcessingContext,
@@ -1440,32 +1435,65 @@ func runPerf(myTarget target.Target, noRoot bool, processes []Process, cmd *exec
14401435
processes,
14411436
cgroupTimeout,
14421437
startPerfTimestamp,
1443-
perfOutputTimer,
1444-
&outputLines,
1438+
intervalBatchChannel,
14451439
frameChannel,
14461440
donePerfProcessingChannel,
14471441
)
1448-
// receive perf output
1442+
1443+
// receive perf output and batch by interval
1444+
var currentInterval float64 = -1
1445+
const initialBatchSize = 1000 // initial size of the batch
1446+
currentBatch := make([][]byte, 0, initialBatchSize)
14491447
done := false
14501448
for !done {
14511449
select {
1452-
case line := <-stderrChannel: // perf output comes in on this channel, one line at a time
1453-
perfOutputTimer.Stop()
1454-
perfOutputTimer.Reset(perfEventWaitTime)
1455-
// accumulate the lines, they will be processed in the goroutine when the timer expires
1456-
outputLines = append(outputLines, []byte(line))
1450+
case lineBytes := <-stderrChannel: // perf output comes in on this channel, one line at a time
1451+
interval := extractInterval(lineBytes)
1452+
if interval < 0 {
1453+
// If the interval is negative, it means the line is not a valid perf event line, skip it
1454+
slog.Warn("skipping invalid perf event line", slog.String("line", string(lineBytes)))
1455+
continue
1456+
}
1457+
// Handle interval change or first line
1458+
if interval != currentInterval {
1459+
// If we have accumulated lines from a previous interval, send them for processing
1460+
if len(currentBatch) > 0 {
1461+
// Send the batch and create a new one to avoid race conditions
1462+
batchToSend := currentBatch
1463+
currentBatch = make([][]byte, 0, len(currentBatch)) // batch sizes are typically the same
1464+
select {
1465+
case intervalBatchChannel <- batchToSend:
1466+
case <-perfProcessingContext.Done():
1467+
done = true
1468+
continue
1469+
}
1470+
}
1471+
// Start a new batch for the new interval
1472+
currentInterval = interval
1473+
}
1474+
// Add the line to the current batch
1475+
currentBatch = append(currentBatch, lineBytes)
14571476
case exitCode := <-exitcodeChannel: // when perf exits, the exit code comes to this channel
14581477
slog.Debug("perf exited", slog.Int("exit code", exitCode))
1459-
time.Sleep(perfEventWaitTime) // wait for timer to expire so that last events can be processed
1460-
done = true // exit the loop
1478+
// Send the final batch if we have accumulated lines
1479+
if len(currentBatch) > 0 {
1480+
batchToSend := currentBatch
1481+
currentBatch = nil // clear reference since we're done
1482+
select {
1483+
case intervalBatchChannel <- batchToSend:
1484+
case <-time.After(perfEventWaitTime): // timeout to avoid blocking
1485+
}
1486+
}
1487+
done = true // exit the loop
14611488
case err := <-scriptErrorChannel: // if there is an error running perf, it comes here
14621489
if err != nil {
14631490
slog.Error("error from perf", slog.String("error", err.Error()))
14641491
}
14651492
done = true // exit the loop
14661493
}
14671494
}
1468-
perfOutputTimer.Stop()
1495+
// Close the interval batch channel to signal no more batches
1496+
close(intervalBatchChannel)
14691497
// cancel the context to stop processPerfOutput
14701498
cancelPerfProcessing()
14711499
// wait for processPerfOutput to finish
@@ -1489,8 +1517,7 @@ func processPerfOutput(
14891517
processes []Process,
14901518
cgroupTimeout int,
14911519
startPerfTimestamp time.Time,
1492-
perfOutputTimer *time.Timer,
1493-
outputLines *[][]byte,
1520+
intervalBatchChannel chan [][]byte,
14941521
frameChannel chan []MetricFrame,
14951522
doneChannel chan struct{},
14961523
) {
@@ -1501,44 +1528,47 @@ func processPerfOutput(
15011528
const maxConsecutiveProcessEventErrors = 2
15021529
for !contextCancelled {
15031530
select {
1504-
case <-perfOutputTimer.C: // waits for timer to expire the process the events in outputLines
1505-
case <-ctx.Done(): // context cancellation
1506-
contextCancelled = true // exit the loop after one more pass
1507-
}
1508-
if contextCancelled {
1509-
break
1510-
}
1511-
if len(*outputLines) != 0 {
1512-
// write the events to a file
1513-
if flagWriteEventsToFile {
1514-
if err := writeEventsToFile(outputDir+"/"+myTarget.GetName()+"_"+"events.jsonl", *outputLines); err != nil {
1515-
slog.Error("failed to write events to file", slog.String("error", err.Error()))
1516-
}
1531+
case batchLines, ok := <-intervalBatchChannel:
1532+
if !ok {
1533+
// Channel closed, no more batches
1534+
contextCancelled = true
1535+
break
15171536
}
1518-
// process the events
1519-
var metricFrames []MetricFrame
1520-
var err error
1521-
metricFrames, frameTimestamp, err = ProcessEvents(*outputLines, eventGroupDefinitions, metricDefinitions, processes, frameTimestamp, metadata)
1522-
if err != nil {
1523-
slog.Error(err.Error())
1524-
numConsecutiveProcessEventErrors++
1525-
if numConsecutiveProcessEventErrors > maxConsecutiveProcessEventErrors {
1526-
slog.Error("too many consecutive errors processing events, killing perf", slog.Int("max errors", maxConsecutiveProcessEventErrors))
1527-
// signaling self with SIGUSR1 will signal child processes to exit, which will cancel the context and let this function exit
1528-
err := util.SignalSelf(syscall.SIGUSR1)
1529-
if err != nil {
1530-
slog.Error("failed to signal self", slog.String("error", err.Error()))
1537+
// Process the interval batch
1538+
if len(batchLines) > 0 {
1539+
// write the events to a file
1540+
if flagWriteEventsToFile {
1541+
if err := writeEventsToFile(outputDir+"/"+myTarget.GetName()+"_"+"events.jsonl", batchLines); err != nil {
1542+
slog.Error("failed to write events to file", slog.String("error", err.Error()))
15311543
}
15321544
}
1533-
*outputLines = [][]byte{} // empty it
1534-
} else {
1535-
// send the metrics frames out to be printed
1536-
frameChannel <- metricFrames
1537-
// empty the outputLines
1538-
*outputLines = [][]byte{}
1539-
// reset the error count
1540-
numConsecutiveProcessEventErrors = 0
1545+
// process the events
1546+
var metricFrames []MetricFrame
1547+
var err error
1548+
metricFrames, frameTimestamp, err = ProcessEvents(batchLines, eventGroupDefinitions, metricDefinitions, processes, frameTimestamp, metadata)
1549+
if err != nil {
1550+
slog.Error(err.Error())
1551+
numConsecutiveProcessEventErrors++
1552+
if numConsecutiveProcessEventErrors > maxConsecutiveProcessEventErrors {
1553+
slog.Error("too many consecutive errors processing events, killing perf", slog.Int("max errors", maxConsecutiveProcessEventErrors))
1554+
// signaling self with SIGUSR1 will signal child processes to exit, which will cancel the context and let this function exit
1555+
err := util.SignalSelf(syscall.SIGUSR1)
1556+
if err != nil {
1557+
slog.Error("failed to signal self", slog.String("error", err.Error()))
1558+
}
1559+
}
1560+
} else {
1561+
// send the metrics frames out to be printed
1562+
frameChannel <- metricFrames
1563+
// reset the error count
1564+
numConsecutiveProcessEventErrors = 0
1565+
}
15411566
}
1567+
case <-ctx.Done(): // context cancellation
1568+
contextCancelled = true
1569+
}
1570+
if contextCancelled {
1571+
break
15421572
}
15431573
// for cgroup scope, terminate perf if refresh timeout is reached
15441574
if flagScope == scopeCgroup && cgroupTimeout != 0 {
@@ -1552,4 +1582,24 @@ func processPerfOutput(
15521582
}
15531583
}
15541584
}
1585+
// Drain any remaining batches from the channel
1586+
for {
1587+
select {
1588+
case batchLines, ok := <-intervalBatchChannel:
1589+
if !ok {
1590+
return // Channel closed and drained
1591+
}
1592+
// Process final batches if they exist
1593+
if len(batchLines) > 0 {
1594+
if metricFrames, _, err := ProcessEvents(batchLines, eventGroupDefinitions, metricDefinitions, processes, frameTimestamp, metadata); err == nil {
1595+
select {
1596+
case frameChannel <- metricFrames:
1597+
default: // Don't block if frameChannel is full
1598+
}
1599+
}
1600+
}
1601+
default:
1602+
return // No more batches
1603+
}
1604+
}
15551605
}

internal/script/script.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ func RunScripts(myTarget target.Target, scripts []ScriptDefinition, ignoreScript
164164
}
165165

166166
// RunScriptStream runs a script on the specified target and streams the output to the specified channels.
167-
func RunScriptStream(myTarget target.Target, script ScriptDefinition, localTempDir string, stdoutChannel chan string, stderrChannel chan string, exitcodeChannel chan int, errorChannel chan error, cmdChannel chan *exec.Cmd) {
167+
func RunScriptStream(myTarget target.Target, script ScriptDefinition, localTempDir string, stdoutChannel chan []byte, stderrChannel chan []byte, exitcodeChannel chan int, errorChannel chan error, cmdChannel chan *exec.Cmd) {
168168
targetArchitecture, err := myTarget.GetArchitecture()
169169
if err != nil {
170170
err = fmt.Errorf("error getting target architecture: %v", err)

internal/target/helpers.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func runLocalCommandWithInputWithTimeout(cmd *exec.Cmd, input string, timeout in
128128
//
129129
// Returns:
130130
// - err: An error if the command fails to start or if there are issues with pipes.
131-
func runLocalCommandWithInputWithTimeoutAsync(cmd *exec.Cmd, stdoutChannel chan string, stderrChannel chan string, exitcodeChannel chan int, input string, timeout int) (err error) {
131+
func runLocalCommandWithInputWithTimeoutAsync(cmd *exec.Cmd, stdoutChannel chan []byte, stderrChannel chan []byte, exitcodeChannel chan int, input string, timeout int) (err error) {
132132
logInput := ""
133133
if input != "" {
134134
logInput = "******"
@@ -163,14 +163,18 @@ func runLocalCommandWithInputWithTimeoutAsync(cmd *exec.Cmd, stdoutChannel chan
163163
}
164164
go func() {
165165
for stdoutScanner.Scan() {
166-
text := stdoutScanner.Text()
167-
stdoutChannel <- text
166+
internalBuf := stdoutScanner.Bytes()
167+
sendBuf := make([]byte, len(internalBuf))
168+
copy(sendBuf, internalBuf)
169+
stdoutChannel <- sendBuf
168170
}
169171
}()
170172
go func() {
171173
for stderrScanner.Scan() {
172-
text := stderrScanner.Text()
173-
stderrChannel <- text
174+
internalBuf := stderrScanner.Bytes()
175+
sendBuf := make([]byte, len(internalBuf))
176+
copy(sendBuf, internalBuf)
177+
stderrChannel <- sendBuf
174178
}
175179
}()
176180
err = cmd.Wait()

0 commit comments

Comments
 (0)