Skip to content

Commit ecd3fc0

Browse files
Merge branch 'feat/redis-current-store'
2 parents 2549a95 + 70fa565 commit ecd3fc0

File tree

8 files changed

+163
-49
lines changed

8 files changed

+163
-49
lines changed

README.md

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,30 @@ This is Jay's Timeseries API server. It requires a Timescale DB instance.
44

55
## Getting Started
66

7-
Create a `.env` file in this directory that contains necessary environment variables. For example:
7+
Create a `.env` file in this directory that contains necessary environment variables. Defaults:
88

99
```
10-
USERNAME=<username>
11-
PASSWORD=<password>
12-
API_KEY=<apikey>
13-
14-
HOST=<host>
15-
PORT=<port>
16-
JWT_SECRET=<password>
17-
18-
DATABASE_HOST=<host>
19-
DATABASE_PORT=<port>
20-
DATABASE_USERNAME=<username>
21-
DATABASE_PASSWORD=<password>
22-
DATABASE_NAME=<name>
10+
USERNAME=
11+
PASSWORD=
12+
API_KEY=
13+
14+
HOST=localhost
15+
PORT=80
16+
JWT_SECRET=
17+
18+
DATABASE_HOST=localhost
19+
DATABASE_PORT=5432
20+
DATABASE_USERNAME=postgres
21+
DATABASE_PASSWORD=postgres
22+
DATABASE_NAME=postgres
23+
24+
CURRENT_STORE_TYPE=memory # Options: redis, memory
25+
REDIS_ADDRESS=localhost:6379
26+
REDIS_PASSWORD=
27+
REDIS_DATABASE=0
28+
29+
MQTT_ADDRESS=
30+
MQTT_USERNAME=
31+
MQTT_PASSWORD=
2332
```
2433

currentController.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,12 @@ func (h currentController) getCurrent(w http.ResponseWriter, request *http.Reque
2929
return
3030
}
3131

32-
httpResult := h.store.getCurrent(pointId)
32+
httpResult, err := h.store.getCurrent(pointId)
33+
if err != nil {
34+
log.Printf("Cannot retrieve current value")
35+
w.WriteHeader(http.StatusInternalServerError)
36+
return
37+
}
3338

3439
httpJson, err := json.Marshal(httpResult)
3540
if err != nil {
@@ -61,7 +66,12 @@ func (h currentController) postCurrent(writer http.ResponseWriter, request *http
6166
return
6267
}
6368

64-
h.store.setCurrent(pointId, currentItem)
69+
err = h.store.setCurrent(pointId, currentItem)
70+
if err != nil {
71+
log.Printf("Cannot save current value")
72+
writer.WriteHeader(http.StatusInternalServerError)
73+
return
74+
}
6575

6676
writer.WriteHeader(http.StatusOK)
6777
}

currentStore.go

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,20 @@
11
package main
22

33
import (
4+
"context"
5+
"encoding/json"
6+
"log"
47
"sync"
58
"time"
69

710
"github.com/google/uuid"
11+
"github.com/redis/go-redis/v9"
812
)
913

1014
// currentStore is able to store point current values
1115
type currentStore interface {
12-
getCurrent(uuid.UUID) current
13-
setCurrent(uuid.UUID, currentInput)
16+
getCurrent(uuid.UUID) (current, error)
17+
setCurrent(uuid.UUID, currentInput) error
1418
}
1519

1620
type currentInput struct {
@@ -36,18 +40,67 @@ func newInMemoryCurrentStore() inMemoryCurrentStore {
3640
}
3741
}
3842

39-
func (s inMemoryCurrentStore) getCurrent(id uuid.UUID) current {
43+
func (s inMemoryCurrentStore) getCurrent(id uuid.UUID) (current, error) {
4044
s.mux.Lock()
4145
defer s.mux.Unlock()
42-
return s.cache[id]
46+
return s.cache[id], nil
4347
}
4448

45-
func (s inMemoryCurrentStore) setCurrent(id uuid.UUID, input currentInput) {
49+
func (s inMemoryCurrentStore) setCurrent(id uuid.UUID, input currentInput) error {
4650
timestamp := time.Now()
4751
s.mux.Lock()
4852
defer s.mux.Unlock()
4953
s.cache[id] = current{
5054
Ts: &timestamp,
5155
Value: input.Value,
5256
}
57+
return nil
58+
}
59+
60+
// redisCurrentStore stores point current values in a Redis database.
61+
type redisCurrentStore struct {
62+
db *redis.Client
63+
keyPrefix string
64+
ctx context.Context
65+
}
66+
67+
func newRedisCurrentStore(db *redis.Client) redisCurrentStore {
68+
return redisCurrentStore{
69+
db: db,
70+
keyPrefix: "timeseries-api:",
71+
ctx: context.Background(),
72+
}
73+
}
74+
75+
func (s redisCurrentStore) getCurrent(id uuid.UUID) (current, error) {
76+
currentJson, err := s.db.Get(s.ctx, s.keyPrefix+id.String()).Bytes()
77+
if err != nil {
78+
log.Printf("Cannot retrieve current: %s", err)
79+
return current{}, err
80+
}
81+
var result current
82+
err = json.Unmarshal(currentJson, &result)
83+
if err != nil {
84+
log.Printf("Cannot decode current JSON: %s", err)
85+
return current{}, err
86+
}
87+
return result, nil
88+
}
89+
90+
func (s redisCurrentStore) setCurrent(id uuid.UUID, input currentInput) error {
91+
timestamp := time.Now()
92+
currentJson, err := json.Marshal(current{
93+
Ts: &timestamp,
94+
Value: input.Value,
95+
})
96+
if err != nil {
97+
log.Printf("Cannot encode current JSON: %s", err)
98+
return err
99+
}
100+
err = s.db.Set(s.ctx, s.keyPrefix+id.String(), currentJson, 0).Err()
101+
if err != nil {
102+
log.Printf("Cannot store current: %s", err)
103+
return err
104+
}
105+
return nil
53106
}

docker-compose.yaml

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,22 @@
11
services:
22
timeseries-api:
3+
profiles:
4+
- local
35
build:
46
context: .
57
dockerfile: Dockerfile
68
env_file:
79
- .env
810
environment:
9-
- DATABASE_HOST=db
11+
- DATABASE_HOST=postgres
1012
ports:
1113
- '8080:80'
1214
- '2112:2112' # Metrics
1315
volumes:
1416
- .env:/project/.env
1517
depends_on:
16-
- db
17-
db:
18+
- postgres
19+
postgres:
1820
image: timescale/timescaledb:latest-pg16
1921
restart: unless-stopped
2022
volumes:
@@ -26,6 +28,11 @@ services:
2628
POSTGRES_PASSWORD: postgres
2729
ports:
2830
- '${DATABASE_PORT:-5432}:5432'
31+
redis:
32+
image: redis:latest
33+
restart: unless-stopped
34+
ports:
35+
- '${REDIS_PORT:-6379}:6379'
2936

3037
volumes:
3138
db_data:

go.mod

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ require (
77
github.com/google/uuid v1.6.0
88
github.com/joho/godotenv v1.5.1
99
github.com/prometheus/client_golang v1.20.5
10+
github.com/redis/go-redis/v9 v9.7.0
1011
github.com/stretchr/testify v1.9.0
1112
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.57.0
1213
go.opentelemetry.io/otel v1.32.0
@@ -19,6 +20,7 @@ require (
1920
)
2021

2122
require (
23+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
2224
github.com/felixge/httpsnoop v1.0.4 // indirect
2325
github.com/go-logr/logr v1.4.2 // indirect
2426
github.com/go-logr/stdr v1.2.2 // indirect
@@ -49,12 +51,12 @@ require (
4951
github.com/prometheus/common v0.60.1 // indirect
5052
github.com/prometheus/procfs v0.15.1 // indirect
5153
go.opentelemetry.io/otel/exporters/prometheus v0.54.0
52-
golang.org/x/crypto v0.27.0 // indirect
53-
golang.org/x/net v0.29.0 // indirect
54-
golang.org/x/sync v0.8.0 // indirect
55-
golang.org/x/sys v0.27.0 // indirect
56-
golang.org/x/text v0.18.0 // indirect
57-
google.golang.org/protobuf v1.35.1 // indirect
54+
golang.org/x/crypto v0.31.0 // indirect
55+
golang.org/x/net v0.33.0 // indirect
56+
golang.org/x/sync v0.10.0 // indirect
57+
golang.org/x/sys v0.28.0 // indirect
58+
golang.org/x/text v0.21.0 // indirect
59+
google.golang.org/protobuf v1.36.1 // indirect
5860
gopkg.in/yaml.v3 v3.0.1 // indirect
5961
gorm.io/driver/mysql v1.5.6 // indirect
6062
)

go.sum

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,17 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
22
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
33
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
44
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
5+
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
6+
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
7+
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
8+
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
59
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
610
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
711
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
812
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
913
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
14+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
15+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
1016
github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o=
1117
github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk=
1218
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
@@ -69,6 +75,8 @@ github.com/prometheus/common v0.60.1 h1:FUas6GcOw66yB/73KC+BOZoFJmbo/1pojoILArPA
6975
github.com/prometheus/common v0.60.1/go.mod h1:h0LYf1R1deLSKtD4Vdg8gy4RuOvENW2J/h19V5NADQw=
7076
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
7177
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
78+
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
79+
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
7280
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
7381
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
7482
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@@ -90,18 +98,18 @@ go.opentelemetry.io/otel/sdk/metric v1.32.0 h1:rZvFnvmvawYb0alrYkjraqJq0Z4ZUJAiy
9098
go.opentelemetry.io/otel/sdk/metric v1.32.0/go.mod h1:PWeZlq0zt9YkYAp3gjKZ0eicRYvOh1Gd+X99x6GHpCQ=
9199
go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM=
92100
go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8=
93-
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
94-
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=
95-
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
96-
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
97-
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
98-
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
99-
golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s=
100-
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
101-
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
102-
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
103-
google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
104-
google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
101+
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
102+
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
103+
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
104+
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
105+
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
106+
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
107+
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
108+
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
109+
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
110+
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
111+
google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk=
112+
google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
105113
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
106114
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
107115
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=

ingester_test.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,12 @@ func (suite *IngesterTestSuite) TestIngester() {
6565
assert.Equal(suite.T(), suite.ingester.topics["test"][rec1.ID], true)
6666
assert.Equal(suite.T(), suite.ingester.topics["test"][rec2.ID], true)
6767
assert.Equal(suite.T(), suite.ingester.topics["test2"][rec3.ID], true)
68-
assert.Equal(suite.T(), *suite.currentStore.getCurrent(rec1.ID).Value, 0.0)
69-
assert.Equal(suite.T(), *suite.currentStore.getCurrent(rec2.ID).Value, 0.0)
70-
assert.Equal(suite.T(), *suite.currentStore.getCurrent(rec3.ID).Value, 0.0)
68+
actualRec1, _ := suite.currentStore.getCurrent(rec1.ID)
69+
assert.Equal(suite.T(), *actualRec1.Value, 0.0)
70+
actualRec2, _ := suite.currentStore.getCurrent(rec2.ID)
71+
assert.Equal(suite.T(), *actualRec2.Value, 0.0)
72+
actualRec3, _ := suite.currentStore.getCurrent(rec3.ID)
73+
assert.Equal(suite.T(), *actualRec3.Value, 0.0)
7174

7275
// Check that removing some records removes subscriptions and emit no longer sets their current value
7376
suite.ingester.refreshSubscriptions(
@@ -79,7 +82,10 @@ func (suite *IngesterTestSuite) TestIngester() {
7982
assert.Equal(suite.T(), suite.ingester.topics["test"][rec1.ID], true)
8083
assert.Equal(suite.T(), suite.ingester.topics["test"][rec2.ID], false)
8184
assert.Equal(suite.T(), suite.ingester.topics["test2"][rec3.ID], false)
82-
assert.Equal(suite.T(), *suite.currentStore.getCurrent(rec1.ID).Value, 1.0)
83-
assert.Equal(suite.T(), *suite.currentStore.getCurrent(rec2.ID).Value, 0.0)
84-
assert.Equal(suite.T(), *suite.currentStore.getCurrent(rec3.ID).Value, 0.0)
85+
actualRec1, _ = suite.currentStore.getCurrent(rec1.ID)
86+
assert.Equal(suite.T(), *actualRec1.Value, 1.0)
87+
actualRec2, _ = suite.currentStore.getCurrent(rec2.ID)
88+
assert.Equal(suite.T(), *actualRec2.Value, 0.0)
89+
actualRec3, _ = suite.currentStore.getCurrent(rec3.ID)
90+
assert.Equal(suite.T(), *actualRec3.Value, 0.0)
8591
}

main.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
mqtt "github.com/eclipse/paho.mqtt.golang"
1212
"github.com/joho/godotenv"
1313
"github.com/prometheus/client_golang/prometheus/promhttp"
14+
"github.com/redis/go-redis/v9"
1415
"go.opentelemetry.io/otel"
1516
"go.opentelemetry.io/otel/exporters/prometheus"
1617
"go.opentelemetry.io/otel/sdk/metric"
@@ -70,7 +71,25 @@ func main() {
7071
}
7172
historyStore := newGormHistoryStore(db)
7273
recStore := newGormRecStore(db)
73-
currentStore := newInMemoryCurrentStore()
74+
75+
var currentStore currentStore
76+
currentStoreType := envOrDefault("CURRENT_STORE_TYPE", "memory")
77+
if currentStoreType == "redis" {
78+
redisDatabase, err := strconv.Atoi(envOrDefault("REDIS_DATABASE", "0"))
79+
if err != nil {
80+
log.Fatal(err)
81+
}
82+
redisClient := redis.NewClient(&redis.Options{
83+
Addr: envOrDefault("REDIS_ADDRESS", "localhost:6379"),
84+
Password: envOrDefault("REDIS_PASSWORD", ""),
85+
DB: redisDatabase,
86+
})
87+
currentStore = newRedisCurrentStore(redisClient)
88+
} else if currentStoreType == "memory" {
89+
currentStore = newInMemoryCurrentStore()
90+
} else {
91+
log.Fatalf("Unknown current store type: %s", currentStoreType)
92+
}
7493

7594
serverConfig := ServerConfig{
7695
authenticator: authenticator,

0 commit comments

Comments
 (0)