Skip to content

Commit d0a33aa

Browse files
Merge branch 'experiment/mqttIngester'
2 parents ace4f8e + 25a0206 commit d0a33aa

File tree

8 files changed

+351
-12
lines changed

8 files changed

+351
-12
lines changed

.go-version

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.23.0
1+
1.23.4

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# Build image
2-
FROM golang:1.23.0 AS build
2+
FROM golang:1.23.4 AS build
33
WORKDIR /project
44

55
COPY go.mod go.sum ./

go.mod

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ require (
88
github.com/joho/godotenv v1.5.1
99
github.com/prometheus/client_golang v1.20.5
1010
github.com/stretchr/testify v1.9.0
11+
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.57.0
12+
go.opentelemetry.io/otel v1.32.0
13+
go.opentelemetry.io/otel/sdk v1.32.0
14+
go.opentelemetry.io/otel/sdk/metric v1.32.0
1115
gorm.io/datatypes v1.2.1
1216
gorm.io/driver/postgres v1.5.9
1317
gorm.io/driver/sqlite v1.5.6
@@ -18,11 +22,7 @@ require (
1822
github.com/felixge/httpsnoop v1.0.4 // indirect
1923
github.com/go-logr/logr v1.4.2 // indirect
2024
github.com/go-logr/stdr v1.2.2 // indirect
21-
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.57.0 // indirect
22-
go.opentelemetry.io/otel v1.32.0 // indirect
2325
go.opentelemetry.io/otel/metric v1.32.0 // indirect
24-
go.opentelemetry.io/otel/sdk v1.32.0 // indirect
25-
go.opentelemetry.io/otel/sdk/metric v1.32.0 // indirect
2626
go.opentelemetry.io/otel/trace v1.32.0 // indirect
2727
)
2828

@@ -31,7 +31,9 @@ require (
3131
github.com/beorn7/perks v1.0.1 // indirect
3232
github.com/cespare/xxhash/v2 v2.3.0 // indirect
3333
github.com/davecgh/go-spew v1.1.1 // indirect
34+
github.com/eclipse/paho.mqtt.golang v1.5.0
3435
github.com/go-sql-driver/mysql v1.8.1 // indirect
36+
github.com/gorilla/websocket v1.5.3 // indirect
3537
github.com/jackc/pgpassfile v1.0.0 // indirect
3638
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
3739
github.com/jackc/pgx/v5 v5.6.0 // indirect
@@ -46,8 +48,8 @@ require (
4648
github.com/prometheus/common v0.60.1 // indirect
4749
github.com/prometheus/procfs v0.15.1 // indirect
4850
go.opentelemetry.io/otel/exporters/prometheus v0.54.0
49-
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.32.0
50-
golang.org/x/crypto v0.26.0 // indirect
51+
golang.org/x/crypto v0.27.0 // indirect
52+
golang.org/x/net v0.29.0 // indirect
5153
golang.org/x/sync v0.8.0 // indirect
5254
golang.org/x/sys v0.27.0 // indirect
5355
golang.org/x/text v0.18.0 // indirect

go.sum

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL
77
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
88
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
99
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
10+
github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o=
11+
github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk=
1012
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
1113
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
1214
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
@@ -27,6 +29,8 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
2729
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
2830
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
2931
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
32+
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
33+
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
3034
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
3135
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
3236
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
@@ -78,8 +82,6 @@ go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U=
7882
go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg=
7983
go.opentelemetry.io/otel/exporters/prometheus v0.54.0 h1:rFwzp68QMgtzu9PgP3jm9XaMICI6TsofWWPcBDKwlsU=
8084
go.opentelemetry.io/otel/exporters/prometheus v0.54.0/go.mod h1:QyjcV9qDP6VeK5qPyKETvNjmaaEc7+gqjh4SS0ZYzDU=
81-
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.32.0 h1:SZmDnHcgp3zwlPBS2JX2urGYe/jBKEIT6ZedHRUyCz8=
82-
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.32.0/go.mod h1:fdWW0HtZJ7+jNpTKUR0GpMEDP69nR8YBJQxNiVCE3jk=
8385
go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZktzNa0M=
8486
go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8=
8587
go.opentelemetry.io/otel/sdk v1.32.0 h1:RNxepc9vK59A8XsgZQouW8ue8Gkb4jpWtJm9ge5lEG4=
@@ -88,8 +90,10 @@ go.opentelemetry.io/otel/sdk/metric v1.32.0 h1:rZvFnvmvawYb0alrYkjraqJq0Z4ZUJAiy
8890
go.opentelemetry.io/otel/sdk/metric v1.32.0/go.mod h1:PWeZlq0zt9YkYAp3gjKZ0eicRYvOh1Gd+X99x6GHpCQ=
8991
go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM=
9092
go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8=
91-
golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw=
92-
golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54=
93+
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
94+
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=
95+
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
96+
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
9397
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
9498
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
9599
golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s=

ingester.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package main
2+
3+
import (
4+
"log"
5+
"sync"
6+
7+
"github.com/google/uuid"
8+
)
9+
10+
type ingester struct {
11+
currentStore currentStore
12+
valueEmitter valueEmitter
13+
14+
// Mutable
15+
// Stores a list of subject names that have been subscribed to.
16+
topics map[string]map[uuid.UUID]bool
17+
mux *sync.RWMutex
18+
}
19+
20+
func newIngester(
21+
currentStore currentStore,
22+
valueEmitter valueEmitter,
23+
) ingester {
24+
return ingester{
25+
currentStore: currentStore,
26+
valueEmitter: valueEmitter,
27+
28+
topics: map[string]map[uuid.UUID]bool{},
29+
mux: &sync.RWMutex{},
30+
}
31+
}
32+
33+
func (i *ingester) refreshSubscriptions(recs []rec) {
34+
// Use read-write lock to avoid modifying topics while onMessage is processing.
35+
// TODO: Consider breaking out a separate data structure type for topics.
36+
i.mux.Lock()
37+
defer i.mux.Unlock()
38+
39+
recIDs := map[uuid.UUID]bool{}
40+
for _, record := range recs {
41+
recIDs[record.ID] = true
42+
}
43+
44+
toSubscribe := []topicAndId{}
45+
for _, record := range recs {
46+
topic, ok := record.Tags["mqttSubject"].(string)
47+
if !ok {
48+
log.Printf("Error asserting type for mqttSubject")
49+
continue
50+
}
51+
52+
_, present := i.topics[topic]
53+
if !present {
54+
toSubscribe = append(toSubscribe, topicAndId{topic: topic, recID: record.ID})
55+
continue
56+
}
57+
_, present = i.topics[topic][record.ID]
58+
if !present {
59+
toSubscribe = append(toSubscribe, topicAndId{topic: topic, recID: record.ID})
60+
continue
61+
}
62+
}
63+
64+
toUnsubscribe := []topicAndId{}
65+
for topic, subscribedRecIDs := range i.topics {
66+
for subscribedRecID, _ := range subscribedRecIDs {
67+
_, present := recIDs[subscribedRecID]
68+
if !present {
69+
toUnsubscribe = append(toUnsubscribe, topicAndId{topic: topic, recID: subscribedRecID})
70+
}
71+
}
72+
}
73+
74+
for _, topicAndId := range toSubscribe {
75+
i.subscribe(topicAndId.topic, topicAndId.recID)
76+
}
77+
for _, topicAndId := range toUnsubscribe {
78+
i.unsubscribe(topicAndId.topic, topicAndId.recID)
79+
}
80+
}
81+
82+
// Helper methods
83+
84+
// Subscribe to a topic, and associate the rec with the topic
85+
func (i *ingester) subscribe(topic string, recID uuid.UUID) {
86+
i.valueEmitter.subscribe(
87+
topic,
88+
func(source string, value float64) {
89+
// Ensure that we are not modifying topics while onMessage is processing.
90+
i.mux.RLock()
91+
defer i.mux.RUnlock()
92+
93+
recIDs := i.topics[source]
94+
for recID, _ := range recIDs {
95+
i.currentStore.setCurrent(recID, currentInput{Value: &value})
96+
}
97+
},
98+
)
99+
100+
_, present := i.topics[topic]
101+
if present {
102+
i.topics[topic][recID] = true
103+
} else {
104+
i.topics[topic] = map[uuid.UUID]bool{recID: true}
105+
}
106+
}
107+
108+
func (i *ingester) unsubscribe(topic string, recID uuid.UUID) {
109+
if i.topics[topic] == nil {
110+
return
111+
} else {
112+
delete(i.topics[topic], recID)
113+
}
114+
115+
if len(i.topics[topic]) == 0 {
116+
i.valueEmitter.unsubscribe(topic)
117+
}
118+
}
119+
120+
type topicAndId struct {
121+
topic string
122+
recID uuid.UUID
123+
}

ingester_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package main
2+
3+
import (
4+
"testing"
5+
6+
"github.com/google/uuid"
7+
"github.com/stretchr/testify/assert"
8+
"github.com/stretchr/testify/suite"
9+
)
10+
11+
type IngesterTestSuite struct {
12+
suite.Suite
13+
valueEmitter *mockValueEmitter
14+
ingester *ingester
15+
currentStore currentStore
16+
}
17+
18+
func TestIngesterTestSuite(t *testing.T) {
19+
suite.Run(t, new(IngesterTestSuite))
20+
}
21+
22+
func (suite *IngesterTestSuite) SetupTest() {
23+
currentStore := newInMemoryCurrentStore()
24+
valueEmitter := mockValueEmitter{}
25+
ingester := newIngester(
26+
currentStore,
27+
&valueEmitter,
28+
)
29+
30+
suite.valueEmitter = &valueEmitter
31+
suite.ingester = &ingester
32+
suite.currentStore = currentStore
33+
}
34+
35+
func (suite *IngesterTestSuite) TestIngester() {
36+
// Setup some records
37+
rec1 := rec{
38+
ID: uuid.New(),
39+
Tags: map[string]interface{}{
40+
"mqttSubject": "test",
41+
},
42+
}
43+
rec2 := rec{
44+
ID: uuid.New(),
45+
Tags: map[string]interface{}{
46+
"mqttSubject": "test",
47+
},
48+
}
49+
rec3 := rec{
50+
ID: uuid.New(),
51+
Tags: map[string]interface{}{
52+
"mqttSubject": "test2",
53+
},
54+
}
55+
56+
// Check that refresh adds all subscriptions and emit sets the current value
57+
suite.ingester.refreshSubscriptions(
58+
[]rec{
59+
rec1,
60+
rec2,
61+
rec3,
62+
},
63+
)
64+
suite.valueEmitter.emit(0.0)
65+
assert.Equal(suite.T(), suite.ingester.topics["test"][rec1.ID], true)
66+
assert.Equal(suite.T(), suite.ingester.topics["test"][rec2.ID], true)
67+
assert.Equal(suite.T(), suite.ingester.topics["test2"][rec3.ID], true)
68+
assert.Equal(suite.T(), *suite.currentStore.getCurrent(rec1.ID).Value, 0.0)
69+
assert.Equal(suite.T(), *suite.currentStore.getCurrent(rec2.ID).Value, 0.0)
70+
assert.Equal(suite.T(), *suite.currentStore.getCurrent(rec3.ID).Value, 0.0)
71+
72+
// Check that removing some records removes subscriptions and emit no longer sets their current value
73+
suite.ingester.refreshSubscriptions(
74+
[]rec{
75+
rec1,
76+
},
77+
)
78+
suite.valueEmitter.emit(1.0)
79+
assert.Equal(suite.T(), suite.ingester.topics["test"][rec1.ID], true)
80+
assert.Equal(suite.T(), suite.ingester.topics["test"][rec2.ID], false)
81+
assert.Equal(suite.T(), suite.ingester.topics["test2"][rec3.ID], false)
82+
assert.Equal(suite.T(), *suite.currentStore.getCurrent(rec1.ID).Value, 1.0)
83+
assert.Equal(suite.T(), *suite.currentStore.getCurrent(rec2.ID).Value, 0.0)
84+
assert.Equal(suite.T(), *suite.currentStore.getCurrent(rec3.ID).Value, 0.0)
85+
}

main.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ import (
66
"net/http"
77
"os"
88
"strconv"
9+
"time"
910

11+
mqtt "github.com/eclipse/paho.mqtt.golang"
1012
"github.com/joho/godotenv"
1113
"github.com/prometheus/client_golang/prometheus/promhttp"
1214
"go.opentelemetry.io/otel"
@@ -76,6 +78,45 @@ func main() {
7678
recStore: recStore,
7779
currentStore: currentStore,
7880
}
81+
82+
// Start MQTT
83+
mqttAddress := os.Getenv("MQTT_ADDRESS")
84+
mqttUsername := os.Getenv("MQTT_USERNAME")
85+
mqttPassword := os.Getenv("MQTT_PASSWORD")
86+
mqttConnectionTimeout := time.Duration(5 * time.Second)
87+
88+
options := mqtt.NewClientOptions()
89+
options.AddBroker(mqttAddress)
90+
options.SetUsername(mqttUsername)
91+
options.SetPassword(mqttPassword)
92+
mqttClient := mqtt.NewClient(options)
93+
connectToken := mqttClient.Connect()
94+
if !connectToken.WaitTimeout(mqttConnectionTimeout) {
95+
log.Fatalf("MQTT timeout connecting to %s", mqttAddress)
96+
}
97+
if connectToken.Error() != nil {
98+
log.Fatal(connectToken.Error())
99+
}
100+
log.Printf("MQTT connected to %s", mqttAddress)
101+
valueEmitter := newMQTTValueEmitter(mqttClient)
102+
103+
// Setup ingester
104+
ingester := newIngester(
105+
currentStore,
106+
&valueEmitter,
107+
)
108+
recs, err := recStore.readRecs("mqttSubject")
109+
if err != nil {
110+
log.Fatalf("error getting mqttSubject points: %s", err)
111+
}
112+
ingester.refreshSubscriptions(recs)
113+
114+
defer func() {
115+
ingester.refreshSubscriptions([]rec{})
116+
mqttClient.Disconnect(1)
117+
log.Printf("Disconnected from %s", mqttAddress)
118+
}()
119+
79120
server, err := NewServer(serverConfig)
80121
if err != nil {
81122
log.Fatal(err)

0 commit comments

Comments
 (0)