Skip to content

Commit bbed616

Browse files
committed
Fix review
1 parent c67bf64 commit bbed616

File tree

7 files changed

+192
-176
lines changed

7 files changed

+192
-176
lines changed

cmd/icinga-kubernetes/main.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,6 @@ func main() {
7171

7272
g, ctx := errgroup.WithContext(ctx)
7373

74-
forwardUpsertPodsChannel := make(chan database.Entity)
75-
forwardDeletePodsChannel := make(chan any)
76-
defer close(forwardUpsertPodsChannel)
77-
defer close(forwardDeletePodsChannel)
78-
7974
g.Go(func() error {
8075
return sync.NewSync(
8176
db, schema.NewNode, informers.Core().V1().Nodes().Informer(), logs.GetChildLogger("Nodes"),
@@ -101,7 +96,7 @@ func main() {
10196
)
10297
})
10398

104-
logSync := sync.NewLogSync(k, db, logs.GetChildLogger("ContainerLogs"))
99+
logSync := sync.NewContainerLogSync(k, db, logs.GetChildLogger("ContainerLogs"))
105100

106101
g.Go(func() error {
107102
return logSync.Run(ctx, podUpserts, podDeletes)

pkg/schema/log.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
package schema
22

3-
type Log struct {
3+
import "github.com/icinga/icinga-go-library/types"
4+
5+
type ContainerLog struct {
46
kmetaWithoutNamespace
5-
Id []byte
6-
ReferenceId []byte
7-
ContainerName string
8-
Time string
9-
Log string
7+
ContainerId types.Binary
8+
PodId types.Binary
9+
Time string
10+
Log string
1011
}

pkg/sync/channel-mux.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package sync
2+
3+
import (
4+
"context"
5+
"sync/atomic"
6+
)
7+
8+
// ChannelMux is a multiplexer for channels of variable types.
9+
// It fans all input channels to all output channels.
10+
type ChannelMux[T any] interface {
11+
12+
// AddInChannel adds given input channel to the list of input channels.
13+
AddInChannel(<-chan T)
14+
15+
// NewOutChannel returns and adds new output channel to the pods of created addedOutChannels.
16+
NewOutChannel() <-chan T
17+
18+
// AddOutChannel adds given output channel to the list of added addedOutChannels.
19+
AddOutChannel(chan<- T)
20+
21+
// Run combines output channel lists and starts multiplexing.
22+
Run(context.Context) error
23+
}
24+
25+
type channelMux[T any] struct {
26+
inChannels []<-chan T
27+
createdOutChannels []chan<- T
28+
addedOutChannels []chan<- T
29+
started atomic.Bool
30+
}
31+
32+
// NewChannelMux creates new ChannelMux initialized with at least one input channel
33+
func NewChannelMux[T any](initInChannel <-chan T, inChannels ...<-chan T) ChannelMux[T] {
34+
return &channelMux[T]{
35+
inChannels: append(make([]<-chan T, 0), append(inChannels, initInChannel)...),
36+
}
37+
}
38+
39+
func (mux *channelMux[T]) AddInChannel(channel <-chan T) {
40+
if mux.started.Load() {
41+
panic("channelMux already started")
42+
}
43+
44+
mux.inChannels = append(mux.inChannels, channel)
45+
}
46+
47+
func (mux *channelMux[T]) NewOutChannel() <-chan T {
48+
if mux.started.Load() {
49+
panic("channelMux already started")
50+
}
51+
52+
channel := make(chan T)
53+
mux.createdOutChannels = append(mux.createdOutChannels, channel)
54+
55+
return channel
56+
}
57+
58+
func (mux *channelMux[T]) AddOutChannel(channel chan<- T) {
59+
if mux.started.Load() {
60+
panic("channelMux already started")
61+
}
62+
63+
mux.addedOutChannels = append(mux.addedOutChannels, channel)
64+
}
65+
66+
func (mux *channelMux[T]) Run(ctx context.Context) error {
67+
mux.started.Store(true)
68+
69+
defer func() {
70+
for _, channelToClose := range mux.createdOutChannels {
71+
close(channelToClose)
72+
}
73+
}()
74+
75+
outChannels := append(mux.addedOutChannels, mux.createdOutChannels...)
76+
77+
for {
78+
for _, inChannel := range mux.inChannels {
79+
select {
80+
case spread, more := <-inChannel:
81+
if !more {
82+
return nil
83+
}
84+
85+
for _, outChannel := range outChannels {
86+
select {
87+
case outChannel <- spread:
88+
case <-ctx.Done():
89+
return ctx.Err()
90+
}
91+
}
92+
case <-ctx.Done():
93+
return ctx.Err()
94+
}
95+
}
96+
}
97+
}

0 commit comments

Comments
 (0)