Skip to content

Commit a54c104

Browse files
ref(cmd/tumlive): First cleanup round of application bootstrap
1 parent 8767329 commit a54c104

File tree

2 files changed

+115
-145
lines changed

2 files changed

+115
-145
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ COPY --from=node /app/web/node_modules ./web/node_modules
2626

2727
# bundle version into binary if specified in build-args, dev otherwise.
2828
ARG version=dev
29-
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags "-w -extldflags '-static' -X main.VersionTag=${version}" -o /go/bin/tumlive cmd/tumlive/tumlive.go
29+
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags "-w -extldflags '-static' -X main.VersionTag=${version}" -o /go/bin/tumlive cmd/tumlive/main.go
3030

3131
FROM alpine:3.22
3232
RUN apk add --no-cache tzdata openssl
Lines changed: 114 additions & 144 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,24 @@
11
package main
22

33
import (
4+
"context"
45
"fmt"
5-
log "log/slog"
6+
"log/slog"
67
"net"
7-
"net/http"
8-
_ "net/http/pprof"
98
"os"
109
"os/signal"
1110
"syscall"
1211
"time"
1312

14-
"github.com/soheilhy/cmux"
15-
"google.golang.org/grpc"
16-
"google.golang.org/grpc/credentials/insecure"
17-
1813
"github.com/dgraph-io/ristretto/v2"
1914
"github.com/getsentry/sentry-go"
20-
sentrygin "github.com/getsentry/sentry-go/gin"
2115
"github.com/gin-contrib/gzip"
2216
"github.com/gin-gonic/gin"
2317
slogGorm "github.com/orandin/slog-gorm"
24-
"github.com/pkg/profile"
18+
"github.com/soheilhy/cmux"
19+
"golang.org/x/sync/errgroup"
20+
"google.golang.org/grpc"
21+
"google.golang.org/grpc/credentials/insecure"
2522
"gorm.io/driver/mysql"
2623
"gorm.io/gorm"
2724

@@ -36,123 +33,26 @@ import (
3633
"github.com/TUM-Dev/gocast/web"
3734
)
3835

39-
var VersionTag = "development"
40-
41-
type initializer func()
42-
43-
var logger = log.New(log.NewJSONHandler(os.Stdout, &log.HandlerOptions{
44-
Level: log.LevelDebug,
45-
})).With("service", "main")
46-
47-
var initializers = []initializer{
48-
tools.LoadConfig,
49-
tools.InitBranding,
50-
}
51-
52-
func initAll(initializers []initializer) {
53-
for _, init := range initializers {
54-
init()
55-
}
56-
}
57-
58-
// GinServer launches the gin server
59-
func GinServer(manager *runner_manager.Manager) (err error) {
60-
router := gin.New()
61-
router.Use(gin.Recovery())
62-
gin.SetMode(gin.ReleaseMode)
63-
// capture performance with sentry
64-
router.Use(sentrygin.New(sentrygin.Options{Repanic: true}))
65-
if VersionTag != "development" {
66-
tools.CookieSecure = true
67-
}
68-
69-
router.Use(gin.LoggerWithFormatter(func(param gin.LogFormatterParams) string {
70-
if param.StatusCode >= 400 {
71-
return fmt.Sprintf("{\"service\": \"GIN\", \"time\": %s, \"status\": %d, \"client\": \"%s\", \"path\": \"%s\", \"agent\": %s}\n",
72-
param.TimeStamp.Format(time.DateTime),
73-
param.StatusCode,
74-
param.ClientIP,
75-
param.Path,
76-
param.Request.UserAgent(),
77-
)
78-
}
79-
return ""
80-
}))
81-
82-
router.Use(tools.InitContext(dao.NewDaoWrapper()))
83-
84-
l, err := net.Listen("tcp", ":8081")
36+
func main() {
37+
ctx := context.Background()
38+
err := run(ctx)
8539
if err != nil {
86-
logger.Error("can't listen on port 8081", "err", err)
40+
slog.With("err", err).ErrorContext(ctx, "shutting down")
41+
os.Exit(1)
8742
}
88-
89-
m := cmux.New(l)
90-
grpcl := m.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
91-
92-
api2Client := apiv2.New(dao.DB)
93-
go func() {
94-
if err := api2Client.Run(grpcl); err != nil {
95-
logger.Error("can't launch grpc server", "err", err)
96-
}
97-
}()
98-
99-
liveUpdates := router.Group("/api/pub-sub")
100-
api.ConfigRealtimeRouter(liveUpdates)
101-
102-
// event streams don't work with gzip, configure group without
103-
chat := router.Group("/api/chat")
104-
api.ConfigChatRouter(chat)
105-
106-
router.Use(gzip.Gzip(gzip.DefaultCompression))
107-
router.Any("/api/v2/*any", api2Client.Proxy())
108-
api.ConfigGinRouter(router, manager)
109-
web.ConfigGinRouter(router)
110-
go func() {
111-
err = router.RunListener(m.Match(cmux.Any()))
112-
// err = router.RunTLS(":443", tools.Cfg.Saml.Cert, tools.Cfg.Saml.Privkey)
113-
if err != nil {
114-
sentry.CaptureException(err)
115-
logger.Error("Error starting tumlive", "err", err)
116-
}
117-
}()
118-
119-
return m.Serve()
12043
}
12144

122-
var osSignal chan os.Signal
45+
func run(ctx context.Context) error {
46+
ctx, cancel := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM)
47+
defer cancel()
12348

124-
func main() {
49+
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
50+
Level: slog.LevelDebug,
51+
})).With("service", "main")
12552
initAll(initializers)
12653
defer api.RealtimeInstance.CloseAll()
12754

128-
defer profile.Start(profile.MemProfile).Stop()
129-
go func() {
130-
_ = http.ListenAndServe(":8082", nil) // debug endpoint
131-
}()
132-
13355
web.VersionTag = VersionTag
134-
osSignal = make(chan os.Signal, 1)
135-
136-
env := "production"
137-
if VersionTag == "development" {
138-
env = "development"
139-
}
140-
if os.Getenv("SentryDSN") != "" {
141-
err := sentry.Init(sentry.ClientOptions{
142-
Dsn: os.Getenv("SentryDSN"),
143-
Release: VersionTag,
144-
TracesSampleRate: 0.15,
145-
Debug: true,
146-
AttachStacktrace: true,
147-
Environment: env,
148-
})
149-
if err != nil {
150-
logger.Error("sentry.Init", "err", err)
151-
}
152-
// Flush buffered events before the program terminates.
153-
defer sentry.Flush(2 * time.Second)
154-
defer sentry.Recover()
155-
}
15656

15757
gormJSONLogger := slogGorm.New()
15858

@@ -168,16 +68,13 @@ func main() {
16868
Logger: gormJSONLogger,
16969
})
17070
if err != nil {
171-
sentry.CaptureException(err)
172-
sentry.Flush(time.Second * 5)
17371
logger.Error("Error opening database", "err", err)
17472
}
17573
dao.DB = db
17674

17775
err = dao.Migrator.RunBefore(db)
17876
if err != nil {
179-
logger.Error("Error running before db", "err", err)
180-
return
77+
return fmt.Errorf("before migration: %s", err)
18178
}
18279

18380
err = db.AutoMigrate(
@@ -217,14 +114,11 @@ func main() {
217114
&model.Runner{},
218115
)
219116
if err != nil {
220-
sentry.CaptureException(err)
221-
sentry.Flush(time.Second * 5)
222-
logger.Error("can't migrate database", "err", err)
117+
return fmt.Errorf("migration: %w", err)
223118
}
224119
err = dao.Migrator.RunAfter(db)
225120
if err != nil {
226-
logger.Error("Error running after db", "err", err)
227-
return
121+
return fmt.Errorf("after migrate: %s", err)
228122
}
229123

230124
cache, _ := ristretto.NewCache[string, any](&ristretto.Config[string, any]{
@@ -243,16 +137,16 @@ func main() {
243137
api.RunVoiceServiceReceiver(tools.Cfg.VoiceService.AuthToken)
244138
c, err := grpc.NewClient(fmt.Sprintf("%s:%s", tools.Cfg.VoiceService.Host, tools.Cfg.VoiceService.Port), grpc.WithTransportCredentials(insecure.NewCredentials()))
245139
if err != nil {
246-
log.Error("failed to connect to voice service", "err", err)
140+
logger.Error("failed to connect to voice service", "err", err)
247141
} else {
248142
opts = append(opts, runner_manager.WithSubtitleClient(pb.NewSubtitleGeneratorClient(c), tools.Cfg.VoiceService.AuthToken))
249143
}
250144
}
251145
m := runner_manager.New(dao.NewDaoWrapper(), opts...)
252-
log.Info("running runner manager")
146+
logger.Info("running runner manager")
253147
err = m.Run()
254148
if err != nil {
255-
log.Error("Failed to start runner manager", "err", err)
149+
logger.Error("Failed to start runner manager", "err", err)
256150
}
257151

258152
api.ServeWorkerGRPC(subtitleClient, tools.Cfg.VoiceService.AuthToken)
@@ -263,19 +157,101 @@ func main() {
263157
mailer := tools.NewMailer(dao.NewDaoWrapper(), tools.Cfg.Mail.MaxMailsPerMinute)
264158
go mailer.Run()
265159

266-
initCron(m)
160+
initCron(logger, m)
161+
return serveHttp(ctx, m)
162+
}
163+
164+
var VersionTag = "development"
165+
166+
type initializer func()
167+
168+
var initializers = []initializer{
169+
tools.LoadConfig,
170+
tools.InitBranding,
171+
}
172+
173+
func initAll(initializers []initializer) {
174+
for _, init := range initializers {
175+
init()
176+
}
177+
}
178+
179+
// serveHttp launches all http servers
180+
func serveHttp(ctx context.Context, manager *runner_manager.Manager) (err error) {
181+
router := gin.New()
182+
router.Use(gin.Recovery())
183+
gin.SetMode(gin.ReleaseMode)
184+
if VersionTag != "development" {
185+
tools.CookieSecure = true
186+
}
187+
188+
router.Use(gin.LoggerWithFormatter(func(param gin.LogFormatterParams) string {
189+
if param.StatusCode >= 400 {
190+
return fmt.Sprintf("{\"service\": \"GIN\", \"time\": %s, \"status\": %d, \"client\": \"%s\", \"path\": \"%s\", \"agent\": %s}\n",
191+
param.TimeStamp.Format(time.DateTime),
192+
param.StatusCode,
193+
param.ClientIP,
194+
param.Path,
195+
param.Request.UserAgent(),
196+
)
197+
}
198+
return ""
199+
}))
200+
201+
router.Use(tools.InitContext(dao.NewDaoWrapper()))
202+
203+
l, err := net.Listen("tcp", ":8081")
204+
if err != nil {
205+
return err
206+
}
207+
208+
m := cmux.New(l)
209+
267210
go func() {
268-
err = GinServer(m)
269-
if err != nil {
270-
sentry.CaptureException(err)
271-
sentry.Flush(time.Second * 5)
272-
logger.Error("can't launch gin server", "err", err)
211+
select {
212+
case <-ctx.Done():
213+
m.Close()
273214
}
274215
}()
275-
keepAlive()
216+
217+
grpcl := m.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
218+
httpl := m.Match(cmux.Any())
219+
220+
api2Client := apiv2.New(dao.DB)
221+
222+
g, _ := errgroup.WithContext(ctx)
223+
224+
g.Go(func() error {
225+
return api2Client.Run(grpcl)
226+
})
227+
228+
liveUpdates := router.Group("/api/pub-sub")
229+
api.ConfigRealtimeRouter(liveUpdates)
230+
231+
// event streams don't work with gzip, configure group without
232+
chat := router.Group("/api/chat")
233+
api.ConfigChatRouter(chat)
234+
235+
router.Use(gzip.Gzip(gzip.DefaultCompression))
236+
router.Any("/api/v2/*any", api2Client.Proxy())
237+
api.ConfigGinRouter(router, manager)
238+
web.ConfigGinRouter(router)
239+
g.Go(func() error {
240+
return router.RunListener(httpl)
241+
})
242+
243+
g.Go(func() error {
244+
return m.Serve()
245+
})
246+
247+
if err = g.Wait(); err != nil && ctx.Err() != nil {
248+
// webserver gracefully shut down
249+
return nil
250+
}
251+
return err
276252
}
277253

278-
func initCron(m *runner_manager.Manager) {
254+
func initCron(logger *slog.Logger, m *runner_manager.Manager) {
279255
daoWrapper := dao.NewDaoWrapper()
280256
tools.InitCronService()
281257
// Fetch students every 12 hours
@@ -289,7 +265,7 @@ func initCron(m *runner_manager.Manager) {
289265
_ = tools.Cron.AddFunc("triggerDueStreamsRunner", func() {
290266
err := m.TriggerDueStreams()
291267
if err != nil {
292-
log.With("err", err).Error("Can't run streams with runner")
268+
logger.With("err", err).Error("Can't run streams with runner")
293269
}
294270
}, "0-59 * * * *")
295271
// update courses available
@@ -300,9 +276,3 @@ func initCron(m *runner_manager.Manager) {
300276
_ = tools.Cron.AddFunc("fetchLivePreviews", api.FetchLivePreviews(daoWrapper), "*/1 * * * *")
301277
tools.Cron.Run()
302278
}
303-
304-
func keepAlive() {
305-
signal.Notify(osSignal, syscall.SIGINT, syscall.SIGTERM, syscall.SIGUSR1)
306-
s := <-osSignal
307-
logger.Info("Exiting on signal" + s.String())
308-
}

0 commit comments

Comments
 (0)