Skip to content

Commit acb1b59

Browse files
fix: Adds read-write lock around ingester
1 parent 2c41f8c commit acb1b59

File tree

1 file changed

+12
-1
lines changed

1 file changed

+12
-1
lines changed

ingester.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"log"
5+
"sync"
56

67
"github.com/google/uuid"
78
)
@@ -13,6 +14,7 @@ type ingester struct {
1314
// Mutable
1415
// Stores a list of subject names that have been subscribed to.
1516
topics map[string]map[uuid.UUID]bool
17+
mux *sync.RWMutex
1618
}
1719

1820
func newIngester(
@@ -24,11 +26,16 @@ func newIngester(
2426
valueEmitter: valueEmitter,
2527

2628
topics: map[string]map[uuid.UUID]bool{},
29+
mux: &sync.RWMutex{},
2730
}
2831
}
2932

3033
func (i *ingester) refreshSubscriptions(recs []rec) {
31-
// TODO: Block around this. Data races will occur on `topics` if also running the onMessage
34+
// Use read-write lock to avoid modifying topics while onMessage is processing.
35+
// TODO: Consider breaking out a separate data structure type for topics.
36+
i.mux.Lock()
37+
defer i.mux.Unlock()
38+
3239
recIDs := map[uuid.UUID]bool{}
3340
for _, record := range recs {
3441
recIDs[record.ID] = true
@@ -79,6 +86,10 @@ func (i *ingester) subscribe(topic string, recID uuid.UUID) {
7986
i.valueEmitter.subscribe(
8087
topic,
8188
func(source string, value float64) {
89+
// Ensure that we are not modifying topics while onMessage is processing.
90+
i.mux.RLock()
91+
defer i.mux.RUnlock()
92+
8293
recIDs := i.topics[source]
8394
for recID, _ := range recIDs {
8495
i.currentStore.setCurrent(recID, currentInput{Value: &value})

0 commit comments

Comments
 (0)