From 568a73b280795e7d8702a2a616c195c2702e2d39 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Mon, 3 Jul 2023 10:35:55 +0200 Subject: [PATCH 1/8] Fix `types#MarshalJSON()` causes unexpected json input error The `MarshalJSON()` method of all our custom types currently returns `nil` if its value is invalid, which isn't a valid json value. In order to fix this, the methods have to return a `null` string instead. --- pkg/types/binary.go | 13 ++++--------- pkg/types/bool.go | 2 +- pkg/types/unix_milli.go | 2 +- 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/pkg/types/binary.go b/pkg/types/binary.go index 9c83ff59..d1ef6ee8 100644 --- a/pkg/types/binary.go +++ b/pkg/types/binary.go @@ -19,14 +19,9 @@ var nullBinary Binary // Equal returns whether the binaries are the same length and // contain the same bytes. -// func (binary Binary) Equal(equaler contracts.Equaler) bool { -// b, ok := equaler.(Binary) -// if !ok { -// panic("bad Binary type assertion") -// } -// -// return bytes.Equal(binary, b) -// } +func (binary Binary) Equal(other Binary) bool { + return bytes.Equal(binary, other) +} // Valid returns whether the Binary is valid. func (binary Binary) Valid() bool { @@ -64,7 +59,7 @@ func (binary *Binary) UnmarshalText(text []byte) error { // Supports JSON null. func (binary Binary) MarshalJSON() ([]byte, error) { if !binary.Valid() { - return nil, nil + return []byte("null"), nil } return MarshalJSON(binary.String()) diff --git a/pkg/types/bool.go b/pkg/types/bool.go index 96b937fb..eb73694e 100644 --- a/pkg/types/bool.go +++ b/pkg/types/bool.go @@ -25,7 +25,7 @@ type Bool struct { // MarshalJSON implements the json.Marshaler interface. func (b Bool) MarshalJSON() ([]byte, error) { if !b.Valid { - return nil, nil + return []byte("null"), nil } return MarshalJSON(b.Bool) diff --git a/pkg/types/unix_milli.go b/pkg/types/unix_milli.go index d68c12df..fff3a66c 100644 --- a/pkg/types/unix_milli.go +++ b/pkg/types/unix_milli.go @@ -21,7 +21,7 @@ func (t UnixMilli) Time() time.Time { // Marshals to milliseconds. Supports JSON null. func (t UnixMilli) MarshalJSON() ([]byte, error) { if time.Time(t).IsZero() { - return nil, nil + return []byte("null"), nil } return []byte(strconv.FormatInt(time.Time(t).UnixMilli(), 10)), nil From cf5e0b746620690efc1be87e2937c4e186cf95bc Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Mon, 3 Jul 2023 10:42:34 +0200 Subject: [PATCH 2/8] Cherry-pick type `Objectpacker` from Icinga DB --- pkg/types/objectpacker.go | 213 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 213 insertions(+) create mode 100644 pkg/types/objectpacker.go diff --git a/pkg/types/objectpacker.go b/pkg/types/objectpacker.go new file mode 100644 index 00000000..3f89bedf --- /dev/null +++ b/pkg/types/objectpacker.go @@ -0,0 +1,213 @@ +package types + +import ( + "bytes" + "encoding/binary" + "fmt" + "github.com/pkg/errors" + "io" + "reflect" + "sort" +) + +// MustPackSlice calls PackAny using items and panics if there was an error. +func MustPackSlice(items ...interface{}) []byte { + var buf bytes.Buffer + + if err := PackAny(items, &buf); err != nil { + panic(err) + } + + return buf.Bytes() +} + +// PackAny packs any JSON-encodable value (ex. structs, also ignores interfaces like encoding.TextMarshaler) +// to a BSON-similar format suitable for consistent hashing. Spec: +// +// PackAny(nil) => 0x0 +// PackAny(false) => 0x1 +// PackAny(true) => 0x2 +// PackAny(float64(42)) => 0x3 ieee754_binary64_bigendian(42) +// PackAny("exämple") => 0x4 uint64_bigendian(len([]byte("exämple"))) []byte("exämple") +// PackAny([]uint8{0x42}) => 0x4 uint64_bigendian(len([]uint8{0x42})) []uint8{0x42} +// PackAny([1]uint8{0x42}) => 0x4 uint64_bigendian(len([1]uint8{0x42})) [1]uint8{0x42} +// PackAny([]T{x,y}) => 0x5 uint64_bigendian(len([]T{x,y})) PackAny(x) PackAny(y) +// PackAny(map[K]V{x:y}) => 0x6 uint64_bigendian(len(map[K]V{x:y})) len(map_key(x)) map_key(x) PackAny(y) +// PackAny((*T)(nil)) => 0x0 +// PackAny((*T)(0x42)) => PackAny(*(*T)(0x42)) +// PackAny(x) => panic() +// +// map_key([1]uint8{0x42}) => [1]uint8{0x42} +// map_key(x) => []byte(fmt.Sprint(x)) +func PackAny(in interface{}, out io.Writer) error { + return errors.Wrapf(packValue(reflect.ValueOf(in), out), "can't pack %#v", in) +} + +var tByte = reflect.TypeOf(byte(0)) +var tBytes = reflect.TypeOf([]uint8(nil)) + +// packValue does the actual job of packAny and just exists for recursion w/o unnecessary reflect.ValueOf calls. +func packValue(in reflect.Value, out io.Writer) error { + switch kind := in.Kind(); kind { + case reflect.Invalid: // nil + _, err := out.Write([]byte{0}) + return err + case reflect.Bool: + if in.Bool() { + _, err := out.Write([]byte{2}) + return err + } else { + _, err := out.Write([]byte{1}) + return err + } + case reflect.Float64: + if _, err := out.Write([]byte{3}); err != nil { + return err + } + + return binary.Write(out, binary.BigEndian, in.Float()) + case reflect.Array, reflect.Slice: + if typ := in.Type(); typ.Elem() == tByte { + if kind == reflect.Array { + if !in.CanAddr() { + vNewElem := reflect.New(typ).Elem() + vNewElem.Set(in) + in = vNewElem + } + + in = in.Slice(0, in.Len()) + } + + // Pack []byte as string, not array of numbers. + return packString(in.Convert(tBytes). // Support types.Binary + Interface().([]uint8), out) + } + + if _, err := out.Write([]byte{5}); err != nil { + return err + } + + l := in.Len() + if err := binary.Write(out, binary.BigEndian, uint64(l)); err != nil { + return err + } + + for i := 0; i < l; i++ { + if err := packValue(in.Index(i), out); err != nil { + return err + } + } + + // If there aren't any values to pack, ... + if l < 1 { + // ... create one and pack it - panics on disallowed type. + _ = packValue(reflect.Zero(in.Type().Elem()), io.Discard) + } + + return nil + case reflect.Interface: + return packValue(in.Elem(), out) + case reflect.Map: + type kv struct { + key []byte + value reflect.Value + } + + if _, err := out.Write([]byte{6}); err != nil { + return err + } + + l := in.Len() + if err := binary.Write(out, binary.BigEndian, uint64(l)); err != nil { + return err + } + + sorted := make([]kv, 0, l) + + { + iter := in.MapRange() + for iter.Next() { + var packedKey []byte + if key := iter.Key(); key.Kind() == reflect.Array { + if typ := key.Type(); typ.Elem() == tByte { + if !key.CanAddr() { + vNewElem := reflect.New(typ).Elem() + vNewElem.Set(key) + key = vNewElem + } + + packedKey = key.Slice(0, key.Len()).Interface().([]byte) + } else { + // Not just stringify the key (below), but also pack it (here) - panics on disallowed type. + _ = packValue(iter.Key(), io.Discard) + + packedKey = []byte(fmt.Sprint(key.Interface())) + } + } else { + // Not just stringify the key (below), but also pack it (here) - panics on disallowed type. + _ = packValue(iter.Key(), io.Discard) + + packedKey = []byte(fmt.Sprint(key.Interface())) + } + + sorted = append(sorted, kv{packedKey, iter.Value()}) + } + } + + sort.Slice(sorted, func(i, j int) bool { return bytes.Compare(sorted[i].key, sorted[j].key) < 0 }) + + for _, kv := range sorted { + if err := binary.Write(out, binary.BigEndian, uint64(len(kv.key))); err != nil { + return err + } + + if _, err := out.Write(kv.key); err != nil { + return err + } + + if err := packValue(kv.value, out); err != nil { + return err + } + } + + // If there aren't any key-value pairs to pack, ... + if l < 1 { + typ := in.Type() + + // ... create one and pack it - panics on disallowed type. + _ = packValue(reflect.Zero(typ.Key()), io.Discard) + _ = packValue(reflect.Zero(typ.Elem()), io.Discard) + } + + return nil + case reflect.Ptr: + if in.IsNil() { + err := packValue(reflect.Value{}, out) + + // Create a fictive referenced value and pack it - panics on disallowed type. + _ = packValue(reflect.Zero(in.Type().Elem()), io.Discard) + + return err + } else { + return packValue(in.Elem(), out) + } + case reflect.String: + return packString([]byte(in.String()), out) + default: + panic("bad type: " + in.Kind().String()) + } +} + +// packString deduplicates string packing of multiple locations in packValue. +func packString(in []byte, out io.Writer) error { + if _, err := out.Write([]byte{4}); err != nil { + return err + } + + if err := binary.Write(out, binary.BigEndian, uint64(len(in))); err != nil { + return err + } + + _, err := out.Write(in) + return err +} From 43edbb1c0a161d72443ca3adbe2bb289faa51136 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Mon, 3 Jul 2023 10:45:45 +0200 Subject: [PATCH 3/8] Introduce `contracts` package --- pkg/contracts/contracts.go | 84 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 pkg/contracts/contracts.go diff --git a/pkg/contracts/contracts.go b/pkg/contracts/contracts.go new file mode 100644 index 00000000..11110058 --- /dev/null +++ b/pkg/contracts/contracts.go @@ -0,0 +1,84 @@ +package contracts + +import ( + "github.com/icinga/icinga-kubernetes/pkg/types" + kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// Entity is implemented by every Icinga k8s type. +// It just encapsulates other essential interfaces for an entity but doesn't have its own methods. +type Entity interface { + IDer + ParentIDer + Checksumer + FingerPrinter +} + +// FingerPrinter is implemented by every Icinga k8s type. +type FingerPrinter interface { + // Fingerprint returns the columns of this type, which are retrieved from the + // database during the initial config dump and are used for the config delta and for caching. + Fingerprint() FingerPrinter +} + +// Checksumer is implemented by every Icinga k8s type that maintains its own database table. +type Checksumer interface { + // Checksum computes and returns the sha1 value of this type. + Checksum() types.Binary +} + +// IDer is implemented by every Icinga k8s type that provides a unique identifier. +type IDer interface { + // ID returns the unique identifier of this entity as a binary. + ID() types.Binary + SetID(id types.Binary) +} + +// ParentIDer is implemented by every Icinga k8s type that provides a unique parent identifier. +// This is a no-op for all types by default. Currently, it's only implemented by all entities of +// a k8s entity sub resources. +type ParentIDer interface { + // ParentID returns the parent id of this entity. + ParentID() types.Binary +} + +type Resource interface { + kmetav1.Object + Entity + + Obtain(k8s kmetav1.Object) +} + +type Meta struct { + Id types.Binary `db:"id"` + PropertiesChecksum types.Binary `json:"-"` +} + +func (m *Meta) Checksum() types.Binary { + return m.PropertiesChecksum +} + +func (m *Meta) ID() types.Binary { + return m.Id +} + +func (m *Meta) SetID(id types.Binary) { + m.Id = id +} + +func (m *Meta) Fingerprint() FingerPrinter { + return m +} + +func (m *Meta) ParentID() types.Binary { + return nil +} + +// Assert interface compliance. +var ( + _ FingerPrinter = (*Meta)(nil) + _ Checksumer = (*Meta)(nil) + _ IDer = (*Meta)(nil) + _ ParentIDer = (*Meta)(nil) + _ Entity = (*Meta)(nil) +) From 90b72af5ce996997f290e3ce7c130c7d70a48fbe Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Mon, 3 Jul 2023 10:44:18 +0200 Subject: [PATCH 4/8] `DB#YieldAll()` allow to recursively select relations --- pkg/database/database.go | 72 +++++++++++++++++++++++++++------------ pkg/database/relations.go | 28 +++++++++++++++ pkg/sync/v1/sync.go | 4 +-- 3 files changed, 81 insertions(+), 23 deletions(-) diff --git a/pkg/database/database.go b/pkg/database/database.go index 8395d6f3..36f7430c 100644 --- a/pkg/database/database.go +++ b/pkg/database/database.go @@ -7,6 +7,7 @@ import ( "github.com/go-sql-driver/mysql" "github.com/icinga/icinga-kubernetes/pkg/backoff" "github.com/icinga/icinga-kubernetes/pkg/com" + "github.com/icinga/icinga-kubernetes/pkg/contracts" "github.com/icinga/icinga-kubernetes/pkg/periodic" "github.com/icinga/icinga-kubernetes/pkg/retry" "github.com/icinga/icinga-kubernetes/pkg/strcase" @@ -27,6 +28,8 @@ import ( var registerDriversOnce sync.Once +type FactoryFunc func() (interface{}, bool, error) + // Database is a wrapper around sqlx.DB with bulk execution, // statement building, streaming and logging capabilities. type Database struct { @@ -589,7 +592,7 @@ func (db *Database) UpsertStreamed( // YieldAll executes the query with the supplied scope, // scans each resulting row into an entity returned by the factory function, // and streams them into a returned channel. -func (db *Database) YieldAll(ctx context.Context, factoryFunc func() (interface{}, error), query string, scope ...interface{}) (<-chan interface{}, <-chan error) { +func (db *Database) YieldAll(ctx context.Context, factoryFunc FactoryFunc, query string, scope ...interface{}) (<-chan interface{}, <-chan error) { g, ctx := errgroup.WithContext(ctx) entities := make(chan interface{}, 1) @@ -597,35 +600,62 @@ func (db *Database) YieldAll(ctx context.Context, factoryFunc func() (interface{ defer runtime.HandleCrash() defer close(entities) - var counter com.Counter - defer db.periodicLog(ctx, query, &counter).Stop() - - rows, err := db.query(ctx, query, scope...) - if err != nil { - return CantPerformQuery(err, query) - } + var run func(ctx context.Context, factory FactoryFunc, query string, scope ...interface{}) error - defer rows.Close() + run = func(ctx context.Context, factory FactoryFunc, query string, scope ...interface{}) error { + g, ctx := errgroup.WithContext(ctx) + var counter com.Counter + defer db.periodicLog(ctx, query, &counter).Stop() - for rows.Next() { - e, err := factoryFunc() + rows, err := db.query(ctx, query, scope...) if err != nil { - return errors.Wrap(err, "can't create entity") + return CantPerformQuery(err, query) } + defer func() { _ = rows.Close() }() - if err = rows.StructScan(e); err != nil { - return errors.Wrapf(err, "can't store query result into a %T: %s", e, query) - } + for rows.Next() { + e, selectRecursive, err := factory() + if err != nil { + return errors.Wrap(err, "can't create entity") + } - select { - case entities <- e: - counter.Inc() - case <-ctx.Done(): - return ctx.Err() + if err = rows.StructScan(e); err != nil { + return errors.Wrapf(err, "can't store query result into a %T: %s", e, query) + } + + select { + case entities <- e: + counter.Inc() + case <-ctx.Done(): + return ctx.Err() + } + + if relations, ok := e.(HasRelations); ok && selectRecursive { + for _, relation := range relations.Relations() { + relation := relation + fingerprint, ok := relation.TypePointer().(contracts.FingerPrinter) + if !ok || !relation.CascadeSelect() { + continue + } + + g.Go(func() error { + query := db.BuildSelectStmt(relation.TypePointer(), fingerprint.Fingerprint()) + query += fmt.Sprintf(` WHERE %s=?`, db.quoter.QuoteIdentifier(relation.ForeignKey())) + + factory := func() (interface{}, bool, error) { + return relation.TypePointer().(contracts.Entity), true, nil + } + + return run(ctx, factory, query, e.(contracts.IDer).ID()) + }) + } + } } + + return g.Wait() } - return nil + return run(ctx, factoryFunc, query, scope...) }) return entities, com.WaitAsync(g) diff --git a/pkg/database/relations.go b/pkg/database/relations.go index 2656bb02..c9b00ba5 100644 --- a/pkg/database/relations.go +++ b/pkg/database/relations.go @@ -2,6 +2,7 @@ package database import ( "context" + "reflect" ) type Relation interface { @@ -9,8 +10,11 @@ type Relation interface { SetForeignKey(fk string) CascadeDelete() bool WithoutCascadeDelete() + CascadeSelect() bool + WithoutCascadeSelect() StreamInto(context.Context, chan interface{}) error TableName() string + TypePointer() any } type HasRelations interface { @@ -31,9 +35,16 @@ func WithoutCascadeDelete() RelationOption { } } +func WithoutCascadeSelect() RelationOption { + return func(r Relation) { + r.WithoutCascadeSelect() + } +} + type relation[T any] struct { foreignKey string withoutCascadeDelete bool + withoutCascadeSelect bool } func (r *relation[T]) ForeignKey() string { @@ -52,10 +63,27 @@ func (r *relation[T]) WithoutCascadeDelete() { r.withoutCascadeDelete = true } +func (r *relation[T]) CascadeSelect() bool { + return !r.withoutCascadeSelect +} + +func (r *relation[T]) WithoutCascadeSelect() { + r.withoutCascadeSelect = true +} + func (r *relation[T]) TableName() string { return TableName(*new(T)) } +func (r *relation[T]) TypePointer() any { + var typ T + if reflect.ValueOf(typ).Kind() == reflect.Ptr { + return reflect.New(reflect.TypeOf(typ).Elem()).Interface() + } + + return &typ +} + type hasMany[T any] struct { relation[T] entities []T diff --git a/pkg/sync/v1/sync.go b/pkg/sync/v1/sync.go index ff0080b4..72938338 100644 --- a/pkg/sync/v1/sync.go +++ b/pkg/sync/v1/sync.go @@ -52,8 +52,8 @@ func (s *Sync) Run(ctx context.Context, features ...sync.Feature) error { func (s *Sync) warmup(ctx context.Context, c *sync.Controller) error { g, ctx := errgroup.WithContext(ctx) - entities, errs := s.db.YieldAll(ctx, func() (interface{}, error) { - return s.factory(), nil + entities, errs := s.db.YieldAll(ctx, func() (interface{}, bool, error) { + return s.factory(), true, nil }, s.db.BuildSelectStmt(s.factory(), &schemav1.Meta{})) // Let errors from YieldAll() cancel the group. com.ErrgroupReceive(g, errs) From 19a0b4a2494e9d88de32469d8bb0785616701711 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Wed, 28 Jun 2023 15:12:16 +0200 Subject: [PATCH 5/8] `DB#BulkExec()` stream the query chunks to the `onSuccess` handler --- pkg/database/database.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/database/database.go b/pkg/database/database.go index 36f7430c..9b3ff94b 100644 --- a/pkg/database/database.go +++ b/pkg/database/database.go @@ -262,6 +262,12 @@ func (db *Database) BulkExec( counter.Add(uint64(len(b))) + if f.onSuccess != nil { + if err := f.onSuccess(ctx, b); err != nil { + return err + } + } + return nil }, IsRetryable, From e01e6258eaf45a09377c5f3cb68c6e9fb69fc25f Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Thu, 6 Jul 2023 15:53:02 +0200 Subject: [PATCH 6/8] Sink: Don't forward for deletion marked entities --- pkg/sync/sink.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pkg/sync/sink.go b/pkg/sync/sink.go index 6721584e..c3b0f4b0 100644 --- a/pkg/sync/sink.go +++ b/pkg/sync/sink.go @@ -55,6 +55,13 @@ func (s *Sink) ErrorCh() <-chan error { } func (s *Sink) Upsert(ctx context.Context, item *Item) error { + if !item.Item.GetDeletionTimestamp().IsZero() { + // K8s might dispatch an update event for an object that is already marked for + // deletion due to its sub-resources being deleted/modified. However, neither event + // matters to us as their parent object is going to be deleted soon. + return nil + } + select { case s.upsert <- s.upsertFunc(item): return nil From 406657fedbda5eb8a7088cee70bc82ae7407a78a Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Mon, 3 Jul 2023 10:46:54 +0200 Subject: [PATCH 7/8] Implement `contracts.Entity` interface for `schemav1.Types` --- pkg/schema/v1/container.go | 61 +++++++ pkg/schema/v1/contracts.go | 90 +++++---- pkg/schema/v1/daemon_set.go | 64 ++++--- pkg/schema/v1/deployment.go | 64 ++++--- pkg/schema/v1/event.go | 9 +- pkg/schema/v1/label.go | 26 ++- pkg/schema/v1/namespace.go | 37 +++- pkg/schema/v1/node.go | 54 ++++-- pkg/schema/v1/persistent_volume.go | 45 +++-- pkg/schema/v1/pod.go | 282 ++++++++++++++--------------- pkg/schema/v1/pvc.go | 64 ++++--- pkg/schema/v1/replica_set.go | 87 +++++---- pkg/schema/v1/service.go | 8 +- pkg/schema/v1/stateful_set.go | 64 ++++--- pkg/schema/v1/utils.go | 13 ++ 15 files changed, 589 insertions(+), 379 deletions(-) create mode 100644 pkg/schema/v1/container.go diff --git a/pkg/schema/v1/container.go b/pkg/schema/v1/container.go new file mode 100644 index 00000000..2e41c2ff --- /dev/null +++ b/pkg/schema/v1/container.go @@ -0,0 +1,61 @@ +package v1 + +import ( + "database/sql" + "github.com/icinga/icinga-kubernetes/pkg/contracts" + "github.com/icinga/icinga-kubernetes/pkg/database" + "github.com/icinga/icinga-kubernetes/pkg/types" +) + +type Container struct { + PodMeta + Name string + Image string + CpuLimits int64 + CpuRequests int64 + MemoryLimits int64 + MemoryRequests int64 + State sql.NullString + StateDetails string + Ready types.Bool + Started types.Bool + RestartCount int32 + Devices []*ContainerDevice `db:"-"` + Mounts []*ContainerMount `db:"-"` +} + +func (c *Container) Relations() []database.Relation { + fk := database.WithForeignKey("container_id") + + return []database.Relation{ + database.HasMany(c.Devices, fk), + database.HasMany(c.Mounts, fk), + } +} + +type ContainerMeta struct { + contracts.Meta + ContainerId types.Binary +} + +func (cm *ContainerMeta) Fingerprint() contracts.FingerPrinter { + return cm +} + +func (cm *ContainerMeta) ParentID() types.Binary { + return cm.ContainerId +} + +type ContainerDevice struct { + ContainerMeta + Name string + Path string +} + +type ContainerMount struct { + ContainerMeta + VolumeName string + Path string + SubPath string + ReadOnly types.Bool +} diff --git a/pkg/schema/v1/contracts.go b/pkg/schema/v1/contracts.go index fbdc3655..0bf7d7fa 100644 --- a/pkg/schema/v1/contracts.go +++ b/pkg/schema/v1/contracts.go @@ -1,17 +1,14 @@ package v1 import ( + "github.com/icinga/icinga-kubernetes/pkg/contracts" "github.com/icinga/icinga-kubernetes/pkg/types" kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ktypes "k8s.io/apimachinery/pkg/types" ) -type Resource interface { - kmetav1.Object - Obtain(k8s kmetav1.Object) -} - -type Meta struct { +type ResourceMeta struct { + contracts.Meta Uid ktypes.UID Namespace string Name string @@ -19,7 +16,11 @@ type Meta struct { Created types.UnixMilli } -func (m *Meta) ObtainMeta(k8s kmetav1.Object) { +func (m *ResourceMeta) Fingerprint() contracts.FingerPrinter { + return m +} + +func (m *ResourceMeta) ObtainMeta(k8s kmetav1.Object) { m.Uid = k8s.GetUID() m.Namespace = k8s.GetNamespace() m.Name = k8s.GetName() @@ -27,38 +28,51 @@ func (m *Meta) ObtainMeta(k8s kmetav1.Object) { m.Created = types.UnixMilli(k8s.GetCreationTimestamp().Time) } -func (m *Meta) GetNamespace() string { return m.Namespace } -func (m *Meta) SetNamespace(string) { panic("Not expected to be called") } -func (m *Meta) GetName() string { return m.Name } -func (m *Meta) SetName(string) { panic("Not expected to be called") } -func (m *Meta) GetGenerateName() string { panic("Not expected to be called") } -func (m *Meta) SetGenerateName(string) { panic("Not expected to be called") } -func (m *Meta) GetUID() ktypes.UID { return m.Uid } -func (m *Meta) SetUID(ktypes.UID) { panic("Not expected to be called") } -func (m *Meta) GetResourceVersion() string { return m.ResourceVersion } -func (m *Meta) SetResourceVersion(string) { panic("Not expected to be called") } -func (m *Meta) GetGeneration() int64 { panic("Not expected to be called") } -func (m *Meta) SetGeneration(int64) { panic("Not expected to be called") } -func (m *Meta) GetSelfLink() string { panic("Not expected to be called") } -func (m *Meta) SetSelfLink(string) { panic("Not expected to be called") } -func (m *Meta) GetCreationTimestamp() kmetav1.Time { return kmetav1.NewTime(m.Created.Time()) } -func (m *Meta) SetCreationTimestamp(kmetav1.Time) { panic("Not expected to be called") } -func (m *Meta) GetDeletionTimestamp() *kmetav1.Time { panic("Not expected to be called") } -func (m *Meta) SetDeletionTimestamp(*kmetav1.Time) { panic("Not expected to be called") } -func (m *Meta) GetDeletionGracePeriodSeconds() *int64 { panic("Not expected to be called") } -func (m *Meta) SetDeletionGracePeriodSeconds(*int64) { panic("Not expected to be called") } -func (m *Meta) GetLabels() map[string]string { panic("Not expected to be called") } -func (m *Meta) SetLabels(map[string]string) { panic("Not expected to be called") } -func (m *Meta) GetAnnotations() map[string]string { panic("Not expected to be called") } -func (m *Meta) SetAnnotations(_ map[string]string) { panic("Not expected to be called") } -func (m *Meta) GetFinalizers() []string { panic("Not expected to be called") } -func (m *Meta) SetFinalizers([]string) { panic("Not expected to be called") } -func (m *Meta) GetOwnerReferences() []kmetav1.OwnerReference { panic("Not expected to be called") } -func (m *Meta) SetOwnerReferences([]kmetav1.OwnerReference) { panic("Not expected to be called") } -func (m *Meta) GetManagedFields() []kmetav1.ManagedFieldsEntry { panic("Not expected to be called") } -func (m *Meta) SetManagedFields([]kmetav1.ManagedFieldsEntry) { panic("Not expected to be called") } +func (m *ResourceMeta) GetNamespace() string { return m.Namespace } +func (m *ResourceMeta) SetNamespace(string) { panic("Not expected to be called") } +func (m *ResourceMeta) GetName() string { return m.Name } +func (m *ResourceMeta) SetName(string) { panic("Not expected to be called") } +func (m *ResourceMeta) GetGenerateName() string { panic("Not expected to be called") } +func (m *ResourceMeta) SetGenerateName(string) { panic("Not expected to be called") } +func (m *ResourceMeta) GetUID() ktypes.UID { return m.Uid } +func (m *ResourceMeta) SetUID(ktypes.UID) { panic("Not expected to be called") } +func (m *ResourceMeta) GetResourceVersion() string { return m.ResourceVersion } +func (m *ResourceMeta) SetResourceVersion(string) { panic("Not expected to be called") } +func (m *ResourceMeta) GetGeneration() int64 { panic("Not expected to be called") } +func (m *ResourceMeta) SetGeneration(int64) { panic("Not expected to be called") } +func (m *ResourceMeta) GetSelfLink() string { panic("Not expected to be called") } +func (m *ResourceMeta) SetSelfLink(string) { panic("Not expected to be called") } +func (m *ResourceMeta) GetCreationTimestamp() kmetav1.Time { return kmetav1.NewTime(m.Created.Time()) } +func (m *ResourceMeta) SetCreationTimestamp(kmetav1.Time) { panic("Not expected to be called") } +func (m *ResourceMeta) GetDeletionTimestamp() *kmetav1.Time { panic("Not expected to be called") } +func (m *ResourceMeta) SetDeletionTimestamp(*kmetav1.Time) { panic("Not expected to be called") } +func (m *ResourceMeta) GetDeletionGracePeriodSeconds() *int64 { panic("Not expected to be called") } +func (m *ResourceMeta) SetDeletionGracePeriodSeconds(*int64) { panic("Not expected to be called") } +func (m *ResourceMeta) GetLabels() map[string]string { panic("Not expected to be called") } +func (m *ResourceMeta) SetLabels(map[string]string) { panic("Not expected to be called") } +func (m *ResourceMeta) GetAnnotations() map[string]string { panic("Not expected to be called") } +func (m *ResourceMeta) SetAnnotations(_ map[string]string) { panic("Not expected to be called") } +func (m *ResourceMeta) GetFinalizers() []string { panic("Not expected to be called") } +func (m *ResourceMeta) SetFinalizers([]string) { panic("Not expected to be called") } +func (m *ResourceMeta) GetOwnerReferences() []kmetav1.OwnerReference { + panic("Not expected to be called") +} +func (m *ResourceMeta) SetOwnerReferences([]kmetav1.OwnerReference) { + panic("Not expected to be called") +} +func (m *ResourceMeta) GetManagedFields() []kmetav1.ManagedFieldsEntry { + panic("Not expected to be called") +} +func (m *ResourceMeta) SetManagedFields([]kmetav1.ManagedFieldsEntry) { + panic("Not expected to be called") +} // Assert interface compliance. var ( - _ kmetav1.Object = (*Meta)(nil) + _ kmetav1.Object = (*ResourceMeta)(nil) + _ contracts.FingerPrinter = (*ResourceMeta)(nil) + _ contracts.Checksumer = (*ResourceMeta)(nil) + _ contracts.IDer = (*ResourceMeta)(nil) + _ contracts.ParentIDer = (*ResourceMeta)(nil) + _ contracts.Entity = (*ResourceMeta)(nil) ) diff --git a/pkg/schema/v1/daemon_set.go b/pkg/schema/v1/daemon_set.go index e3194f7e..6dff2246 100644 --- a/pkg/schema/v1/daemon_set.go +++ b/pkg/schema/v1/daemon_set.go @@ -1,17 +1,16 @@ package v1 import ( + "github.com/icinga/icinga-kubernetes/pkg/contracts" "github.com/icinga/icinga-kubernetes/pkg/database" "github.com/icinga/icinga-kubernetes/pkg/strcase" "github.com/icinga/icinga-kubernetes/pkg/types" kappsv1 "k8s.io/api/apps/v1" kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "strings" ) type DaemonSet struct { - Meta - Id types.Binary + ResourceMeta UpdateStrategy string MinReadySeconds int32 DesiredNumberScheduled int32 @@ -21,13 +20,25 @@ type DaemonSet struct { UpdateNumberScheduled int32 NumberAvailable int32 NumberUnavailable int32 - Conditions []DaemonSetCondition `db:"-"` - Labels []Label `db:"-"` - DaemonSetLabels []DaemonSetLabel `db:"-"` + Conditions []*DaemonSetCondition `json:"-" db:"-"` + Labels []*Label `json:"-" db:"-"` +} + +type DaemonSetMeta struct { + contracts.Meta + DaemonSetId types.Binary +} + +func (dm *DaemonSetMeta) Fingerprint() contracts.FingerPrinter { + return dm +} + +func (dm *DaemonSetMeta) ParentID() types.Binary { + return dm.DaemonSetId } type DaemonSetCondition struct { - DaemonSetId types.Binary + DaemonSetMeta Type string Status string LastTransition types.UnixMilli @@ -35,12 +46,7 @@ type DaemonSetCondition struct { Message string } -type DaemonSetLabel struct { - DaemonSetId types.Binary - LabelId types.Binary -} - -func NewDaemonSet() Resource { +func NewDaemonSet() contracts.Entity { return &DaemonSet{} } @@ -60,28 +66,31 @@ func (d *DaemonSet) Obtain(k8s kmetav1.Object) { d.NumberAvailable = daemonSet.Status.NumberAvailable d.NumberUnavailable = daemonSet.Status.NumberUnavailable + d.PropertiesChecksum = types.Checksum(MustMarshalJSON(d)) + for _, condition := range daemonSet.Status.Conditions { - d.Conditions = append(d.Conditions, DaemonSetCondition{ - DaemonSetId: d.Id, + daemonCond := &DaemonSetCondition{ + DaemonSetMeta: DaemonSetMeta{ + DaemonSetId: d.Id, + Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(d.Id, condition.Type))}, + }, Type: string(condition.Type), Status: string(condition.Status), LastTransition: types.UnixMilli(condition.LastTransitionTime.Time), Reason: condition.Reason, Message: condition.Message, - }) + } + daemonCond.PropertiesChecksum = types.Checksum(MustMarshalJSON(daemonCond)) + + d.Conditions = append(d.Conditions, daemonCond) } for labelName, labelValue := range daemonSet.Labels { - labelId := types.Checksum(strings.ToLower(labelName + ":" + labelValue)) - d.Labels = append(d.Labels, Label{ - Id: labelId, - Name: labelName, - Value: labelValue, - }) - d.DaemonSetLabels = append(d.DaemonSetLabels, DaemonSetLabel{ - DaemonSetId: d.Id, - LabelId: labelId, - }) + label := NewLabel(labelName, labelValue) + label.DaemonSetId = d.Id + label.PropertiesChecksum = types.Checksum(MustMarshalJSON(label)) + + d.Labels = append(d.Labels, label) } } @@ -90,7 +99,6 @@ func (d *DaemonSet) Relations() []database.Relation { return []database.Relation{ database.HasMany(d.Conditions, fk), - database.HasMany(d.DaemonSetLabels, fk), - database.HasMany(d.Labels, database.WithoutCascadeDelete()), + database.HasMany(d.Labels, fk), } } diff --git a/pkg/schema/v1/deployment.go b/pkg/schema/v1/deployment.go index b6f1b63d..764d0e41 100644 --- a/pkg/schema/v1/deployment.go +++ b/pkg/schema/v1/deployment.go @@ -1,17 +1,16 @@ package v1 import ( + "github.com/icinga/icinga-kubernetes/pkg/contracts" "github.com/icinga/icinga-kubernetes/pkg/database" "github.com/icinga/icinga-kubernetes/pkg/strcase" "github.com/icinga/icinga-kubernetes/pkg/types" kappsv1 "k8s.io/api/apps/v1" kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "strings" ) type Deployment struct { - Meta - Id types.Binary + ResourceMeta DesiredReplicas int32 Strategy string MinReadySeconds int32 @@ -22,13 +21,25 @@ type Deployment struct { ReadyReplicas int32 AvailableReplicas int32 UnavailableReplicas int32 - Conditions []DeploymentCondition `db:"-"` - Labels []Label `db:"-"` - DeploymentLabels []DeploymentLabel `db:"-"` + Conditions []*DeploymentCondition `json:"-" db:"-"` + Labels []*Label `json:"-" db:"-"` +} + +type DeploymentConditionMeta struct { + contracts.Meta + DeploymentId types.Binary +} + +func (dm *DeploymentConditionMeta) Fingerprint() contracts.FingerPrinter { + return dm +} + +func (dm *DeploymentConditionMeta) ParentID() types.Binary { + return dm.DeploymentId } type DeploymentCondition struct { - DeploymentId types.Binary + DeploymentConditionMeta Type string Status string LastUpdate types.UnixMilli @@ -37,12 +48,7 @@ type DeploymentCondition struct { Message string } -type DeploymentLabel struct { - DeploymentId types.Binary - LabelId types.Binary -} - -func NewDeployment() Resource { +func NewDeployment() contracts.Entity { return &Deployment{} } @@ -73,29 +79,32 @@ func (d *Deployment) Obtain(k8s kmetav1.Object) { d.ReadyReplicas = deployment.Status.ReadyReplicas d.UnavailableReplicas = deployment.Status.UnavailableReplicas + d.PropertiesChecksum = types.Checksum(MustMarshalJSON(d)) + for _, condition := range deployment.Status.Conditions { - d.Conditions = append(d.Conditions, DeploymentCondition{ - DeploymentId: d.Id, + deploymentCond := &DeploymentCondition{ + DeploymentConditionMeta: DeploymentConditionMeta{ + DeploymentId: d.Id, + Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(d.Id, condition.Type))}, + }, Type: strcase.Snake(string(condition.Type)), Status: string(condition.Status), LastUpdate: types.UnixMilli(condition.LastUpdateTime.Time), LastTransition: types.UnixMilli(condition.LastTransitionTime.Time), Reason: condition.Reason, Message: condition.Message, - }) + } + deploymentCond.PropertiesChecksum = types.Checksum(MustMarshalJSON(deploymentCond)) + + d.Conditions = append(d.Conditions, deploymentCond) } for labelName, labelValue := range deployment.Labels { - labelId := types.Checksum(strings.ToLower(labelName + ":" + labelValue)) - d.Labels = append(d.Labels, Label{ - Id: labelId, - Name: labelName, - Value: labelValue, - }) - d.DeploymentLabels = append(d.DeploymentLabels, DeploymentLabel{ - DeploymentId: d.Id, - LabelId: labelId, - }) + label := NewLabel(labelName, labelValue) + label.DeploymentId = d.Id + label.PropertiesChecksum = types.Checksum(MustMarshalJSON(label)) + + d.Labels = append(d.Labels, label) } } @@ -104,7 +113,6 @@ func (d *Deployment) Relations() []database.Relation { return []database.Relation{ database.HasMany(d.Conditions, fk), - database.HasMany(d.DeploymentLabels, fk), - database.HasMany(d.Labels, database.WithoutCascadeDelete()), + database.HasMany(d.Labels, fk), } } diff --git a/pkg/schema/v1/event.go b/pkg/schema/v1/event.go index 8b6fbfca..f21a4aaf 100644 --- a/pkg/schema/v1/event.go +++ b/pkg/schema/v1/event.go @@ -1,14 +1,14 @@ package v1 import ( + "github.com/icinga/icinga-kubernetes/pkg/contracts" "github.com/icinga/icinga-kubernetes/pkg/types" keventsv1 "k8s.io/api/events/v1" kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) type Event struct { - Meta - Id types.Binary + ResourceMeta ReportingController string ReportingInstance string Action string @@ -23,12 +23,15 @@ type Event struct { Count int32 } -func NewEvent() Resource { +func NewEvent() contracts.Entity { return &Event{} } func (e *Event) Obtain(k8s kmetav1.Object) { e.ObtainMeta(k8s) + defer func() { + e.PropertiesChecksum = types.Checksum(MustMarshalJSON(e)) + }() event := k8s.(*keventsv1.Event) diff --git a/pkg/schema/v1/label.go b/pkg/schema/v1/label.go index 401d7fc8..ec7b4a57 100644 --- a/pkg/schema/v1/label.go +++ b/pkg/schema/v1/label.go @@ -1,9 +1,27 @@ package v1 -import "github.com/icinga/icinga-kubernetes/pkg/types" +import ( + "github.com/icinga/icinga-kubernetes/pkg/contracts" + "github.com/icinga/icinga-kubernetes/pkg/types" + "strings" +) type Label struct { - Id types.Binary - Name string - Value string + contracts.Meta + PodId types.Binary + ReplicaSetId types.Binary + DeploymentId types.Binary + DaemonSetId types.Binary + StatefulSetId types.Binary + PvcId types.Binary + Name string + Value string +} + +func NewLabel(name string, value string) *Label { + return &Label{ + Meta: contracts.Meta{Id: types.Checksum(strings.ToLower(name + ":" + value))}, + Name: name, + Value: value, + } } diff --git a/pkg/schema/v1/namespace.go b/pkg/schema/v1/namespace.go index bc7fc756..77c3d94b 100644 --- a/pkg/schema/v1/namespace.go +++ b/pkg/schema/v1/namespace.go @@ -1,6 +1,7 @@ package v1 import ( + "github.com/icinga/icinga-kubernetes/pkg/contracts" "github.com/icinga/icinga-kubernetes/pkg/database" "github.com/icinga/icinga-kubernetes/pkg/types" kcorev1 "k8s.io/api/core/v1" @@ -9,14 +10,26 @@ import ( ) type Namespace struct { - Meta - Id types.Binary + ResourceMeta Phase string - Conditions []NamespaceCondition `db:"-"` + Conditions []*NamespaceCondition `json:"-" db:"-"` +} + +type NamespaceMeta struct { + contracts.Meta + NamespaceId types.Binary +} + +func (nm *NamespaceMeta) Fingerprint() contracts.FingerPrinter { + return nm +} + +func (nm *NamespaceMeta) ParentID() types.Binary { + return nm.NamespaceId } type NamespaceCondition struct { - NamespaceId types.Binary + NamespaceMeta Type string Status string LastTransition types.UnixMilli @@ -24,7 +37,7 @@ type NamespaceCondition struct { Message string } -func NewNamespace() Resource { +func NewNamespace() contracts.Entity { return &Namespace{} } @@ -36,15 +49,23 @@ func (n *Namespace) Obtain(k8s kmetav1.Object) { n.Id = types.Checksum(namespace.Name) n.Phase = strings.ToLower(string(namespace.Status.Phase)) + n.PropertiesChecksum = types.Checksum(MustMarshalJSON(n)) + for _, condition := range namespace.Status.Conditions { - n.Conditions = append(n.Conditions, NamespaceCondition{ - NamespaceId: n.Id, + namespaceCond := &NamespaceCondition{ + NamespaceMeta: NamespaceMeta{ + NamespaceId: n.Id, + Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(n.Id, condition.Type))}, + }, Type: string(condition.Type), Status: string(condition.Status), LastTransition: types.UnixMilli(condition.LastTransitionTime.Time), Reason: condition.Reason, Message: condition.Message, - }) + } + namespaceCond.PropertiesChecksum = types.Checksum(MustMarshalJSON(namespaceCond)) + + n.Conditions = append(n.Conditions, namespaceCond) } } diff --git a/pkg/schema/v1/node.go b/pkg/schema/v1/node.go index 38a269b3..648d1f5d 100644 --- a/pkg/schema/v1/node.go +++ b/pkg/schema/v1/node.go @@ -1,6 +1,7 @@ package v1 import ( + "github.com/icinga/icinga-kubernetes/pkg/contracts" "github.com/icinga/icinga-kubernetes/pkg/database" "github.com/icinga/icinga-kubernetes/pkg/types" "github.com/pkg/errors" @@ -11,8 +12,7 @@ import ( ) type Node struct { - Meta - Id types.Binary + ResourceMeta PodCIDR string NumIps int64 Unschedulable types.Bool @@ -22,12 +22,25 @@ type Node struct { MemoryCapacity int64 MemoryAllocatable int64 PodCapacity int64 - Conditions []NodeCondition `db:"-"` - Volumes []NodeVolume `db:"-"` + Conditions []*NodeCondition `json:"-" db:"-"` + Volumes []*NodeVolume `json:"-" db:"-"` +} + +type NodeMeta struct { + contracts.Meta + NodeId types.Binary +} + +func (nm *NodeMeta) Fingerprint() contracts.FingerPrinter { + return nm +} + +func (nm *NodeMeta) ParentID() types.Binary { + return nm.NodeId } type NodeCondition struct { - NodeId types.Binary + NodeMeta Type string Status string LastHeartbeat types.UnixMilli @@ -37,13 +50,13 @@ type NodeCondition struct { } type NodeVolume struct { - NodeId types.Binary + NodeMeta name kcorev1.UniqueVolumeName DevicePath string Mounted types.Bool } -func NewNode() Resource { +func NewNode() contracts.Entity { return &Node{} } @@ -55,6 +68,7 @@ func (n *Node) Obtain(k8s kmetav1.Object) { n.Id = types.Checksum(n.Namespace + "/" + n.Name) n.PodCIDR = node.Spec.PodCIDR _, cidr, err := net.ParseCIDR(n.PodCIDR) + // TODO(yh): Make field NumIps nullable and don't panic here! if err != nil { panic(errors.Wrapf(err, "failed to parse CIDR %s", n.PodCIDR)) } @@ -73,16 +87,24 @@ func (n *Node) Obtain(k8s kmetav1.Object) { n.MemoryAllocatable = node.Status.Allocatable.Memory().MilliValue() n.PodCapacity = node.Status.Allocatable.Pods().Value() + n.PropertiesChecksum = types.Checksum(MustMarshalJSON(n)) + for _, condition := range node.Status.Conditions { - n.Conditions = append(n.Conditions, NodeCondition{ - NodeId: n.Id, + nodeCond := &NodeCondition{ + NodeMeta: NodeMeta{ + NodeId: n.Id, + Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(n.Id, condition.Type))}, + }, Type: string(condition.Type), Status: string(condition.Status), LastHeartbeat: types.UnixMilli(condition.LastHeartbeatTime.Time), LastTransition: types.UnixMilli(condition.LastTransitionTime.Time), Reason: condition.Reason, Message: condition.Message, - }) + } + nodeCond.PropertiesChecksum = types.Checksum(MustMarshalJSON(nodeCond)) + + n.Conditions = append(n.Conditions, nodeCond) } volumesMounted := make(map[kcorev1.UniqueVolumeName]interface{}, len(node.Status.VolumesInUse)) @@ -91,15 +113,21 @@ func (n *Node) Obtain(k8s kmetav1.Object) { } for _, volume := range node.Status.VolumesAttached { _, mounted := volumesMounted[volume.Name] - n.Volumes = append(n.Volumes, NodeVolume{ - NodeId: n.Id, + nodeVolume := &NodeVolume{ + NodeMeta: NodeMeta{ + NodeId: n.Id, + Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(n.Id, volume.Name))}, + }, name: volume.Name, DevicePath: volume.DevicePath, Mounted: types.Bool{ Bool: mounted, Valid: true, }, - }) + } + nodeVolume.PropertiesChecksum = types.Checksum(MustMarshalJSON(nodeVolume)) + + n.Volumes = append(n.Volumes, nodeVolume) } } diff --git a/pkg/schema/v1/persistent_volume.go b/pkg/schema/v1/persistent_volume.go index 07a692fe..3da11726 100644 --- a/pkg/schema/v1/persistent_volume.go +++ b/pkg/schema/v1/persistent_volume.go @@ -2,6 +2,7 @@ package v1 import ( "database/sql" + "github.com/icinga/icinga-kubernetes/pkg/contracts" "github.com/icinga/icinga-kubernetes/pkg/database" "github.com/icinga/icinga-kubernetes/pkg/strcase" "github.com/icinga/icinga-kubernetes/pkg/types" @@ -11,8 +12,7 @@ import ( ) type PersistentVolume struct { - Meta - Id types.Binary + ResourceMeta AccessModes types.Bitmask[kpersistentVolumeAccessModesSize] Capacity int64 ReclaimPolicy string @@ -23,17 +23,30 @@ type PersistentVolume struct { Phase string Reason string Message string - Claim PersistentVolumeClaimRef `db:"-"` + Claim *PersistentVolumeClaimRef `json:"-" db:"-"` } -type PersistentVolumeClaimRef struct { +type PersistentVolumeMeta struct { + contracts.Meta PersistentVolumeId types.Binary - Kind string - Name string - Uid ktypes.UID } -func NewPersistentVolume() Resource { +func (pvm *PersistentVolumeMeta) Fingerprint() contracts.FingerPrinter { + return pvm +} + +func (pvm *PersistentVolumeMeta) ParentID() types.Binary { + return pvm.PersistentVolumeId +} + +type PersistentVolumeClaimRef struct { + PersistentVolumeMeta + Kind string + Name string + Uid ktypes.UID +} + +func NewPersistentVolume() contracts.Entity { return &PersistentVolume{} } @@ -62,12 +75,18 @@ func (p *PersistentVolume) Obtain(k8s kmetav1.Object) { panic(err) } - p.Claim = PersistentVolumeClaimRef{ - PersistentVolumeId: p.Id, - Kind: persistentVolume.Spec.ClaimRef.Kind, - Name: persistentVolume.Spec.ClaimRef.Name, - Uid: persistentVolume.Spec.ClaimRef.UID, + p.PropertiesChecksum = types.Checksum(MustMarshalJSON(p)) + + p.Claim = &PersistentVolumeClaimRef{ + PersistentVolumeMeta: PersistentVolumeMeta{ + PersistentVolumeId: p.Id, + Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, persistentVolume.Spec.ClaimRef.UID))}, + }, + Kind: persistentVolume.Spec.ClaimRef.Kind, + Name: persistentVolume.Spec.ClaimRef.Name, + Uid: persistentVolume.Spec.ClaimRef.UID, } + p.Claim.PropertiesChecksum = types.Checksum(MustMarshalJSON(p.Claim)) } func (p *PersistentVolume) Relations() []database.Relation { diff --git a/pkg/schema/v1/pod.go b/pkg/schema/v1/pod.go index 8eab0306..960afbb1 100644 --- a/pkg/schema/v1/pod.go +++ b/pkg/schema/v1/pod.go @@ -1,60 +1,23 @@ package v1 import ( - "context" "database/sql" - "fmt" + "github.com/icinga/icinga-kubernetes/pkg/contracts" "github.com/icinga/icinga-kubernetes/pkg/database" "github.com/icinga/icinga-kubernetes/pkg/strcase" "github.com/icinga/icinga-kubernetes/pkg/types" - "io" kcorev1 "k8s.io/api/core/v1" kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ktypes "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" - "strings" ) -type Container struct { - Id types.Binary - PodId types.Binary - Name string - Image string - CpuLimits int64 - CpuRequests int64 - MemoryLimits int64 - MemoryRequests int64 - State sql.NullString - StateDetails string - Ready types.Bool - Started types.Bool - RestartCount int32 - Logs string -} - -type ContainerDevice struct { - ContainerId types.Binary - PodId types.Binary - Name string - Path string -} - -type ContainerMount struct { - ContainerId types.Binary - PodId types.Binary - VolumeName string - Path string - SubPath string - ReadOnly types.Bool -} - type PodFactory struct { clientset *kubernetes.Clientset } type Pod struct { - Meta - Id types.Binary + ResourceMeta NodeName string NominatedNodeName string Ip string @@ -67,20 +30,30 @@ type Pod struct { Message string Qos string RestartPolicy string - Conditions []PodCondition `db:"-"` - Containers []Container `db:"-"` - ContainerDevices []ContainerDevice `db:"-"` - ContainerMounts []ContainerMount `db:"-"` - Owners []PodOwner `db:"-"` - Labels []Label `db:"-"` - PodLabels []PodLabel `db:"-"` - Pvcs []PodPvc `db:"-"` - Volumes []PodVolume `db:"-"` + Conditions []*PodCondition `db:"-"` + Containers []*Container `db:"-"` + Owners []*PodOwner `db:"-"` + Labels []*Label `db:"-"` + Pvcs []*PodPvc `db:"-"` + Volumes []*PodVolume `db:"-"` factory *PodFactory } +type PodMeta struct { + contracts.Meta + PodId types.Binary `db:"pod_id"` +} + +func (pm *PodMeta) Fingerprint() contracts.FingerPrinter { + return pm +} + +func (pm *PodMeta) ParentID() types.Binary { + return pm.PodId +} + type PodCondition struct { - PodId types.Binary + PodMeta Type string Status string LastProbe types.UnixMilli @@ -89,13 +62,8 @@ type PodCondition struct { Message string } -type PodLabel struct { - PodId types.Binary - LabelId types.Binary -} - type PodOwner struct { - PodId types.Binary + PodMeta Kind string Name string Uid ktypes.UID @@ -104,14 +72,14 @@ type PodOwner struct { } type PodVolume struct { - PodId types.Binary + PodMeta VolumeName string Type string Source string } type PodPvc struct { - PodId types.Binary + PodMeta VolumeName string ClaimName string ReadOnly types.Bool @@ -123,12 +91,15 @@ func NewPodFactory(clientset *kubernetes.Clientset) *PodFactory { } } -func (f *PodFactory) New() Resource { +func (f *PodFactory) New() contracts.Entity { return &Pod{factory: f} } func (p *Pod) Obtain(k8s kmetav1.Object) { p.ObtainMeta(k8s) + defer func() { + p.PropertiesChecksum = types.Checksum(MustMarshalJSON(p)) + }() pod := k8s.(*kcorev1.Pod) @@ -143,107 +114,113 @@ func (p *Pod) Obtain(k8s kmetav1.Object) { p.RestartPolicy = strcase.Snake(string(pod.Spec.RestartPolicy)) for _, condition := range pod.Status.Conditions { - p.Conditions = append(p.Conditions, PodCondition{ - PodId: p.Id, + podCond := &PodCondition{ + PodMeta: PodMeta{ + PodId: p.Id, + Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, condition.Type))}, + }, Type: string(condition.Type), Status: string(condition.Status), LastProbe: types.UnixMilli(condition.LastProbeTime.Time), LastTransition: types.UnixMilli(condition.LastTransitionTime.Time), Reason: condition.Reason, Message: condition.Message, - }) + } + podCond.PropertiesChecksum = types.Checksum(MustMarshalJSON(podCond)) + + p.Conditions = append(p.Conditions, podCond) } containerStatuses := make(map[string]kcorev1.ContainerStatus, len(pod.Spec.Containers)) for _, containerStatus := range pod.Status.ContainerStatuses { containerStatuses[containerStatus.Name] = containerStatus } - for _, container := range pod.Spec.Containers { - if containerStatuses[container.Name].RestartCount > 0 { - fmt.Println(containerStatuses[container.Name].LastTerminationState) - } + for _, k8sContainer := range pod.Spec.Containers { var started bool - if containerStatuses[container.Name].Started != nil { - started = *containerStatuses[container.Name].Started + if containerStatuses[k8sContainer.Name].Started != nil { + started = *containerStatuses[k8sContainer.Name].Started } - state, stateDetails, err := MarshalFirstNonNilStructFieldToJSON(containerStatuses[container.Name].State) + state, stateDetails, err := MarshalFirstNonNilStructFieldToJSON(containerStatuses[k8sContainer.Name].State) if err != nil { panic(err) } - logs, err := getContainerLogs(p.factory.clientset, pod, container) - if err != nil { - // ContainerCreating, NotFound, ... - fmt.Println(err) - logs = "" - } + var containerState sql.NullString if state != "" { containerState.String = strcase.Snake(state) containerState.Valid = true } - p.Containers = append(p.Containers, Container{ - Id: types.Checksum(pod.Namespace + "/" + pod.Name + "/" + container.Name), - PodId: p.Id, - Name: container.Name, - Image: container.Image, - CpuLimits: container.Resources.Limits.Cpu().MilliValue(), - CpuRequests: container.Resources.Requests.Cpu().MilliValue(), - MemoryLimits: container.Resources.Limits.Memory().MilliValue(), - MemoryRequests: container.Resources.Requests.Memory().MilliValue(), + + container := &Container{ + PodMeta: PodMeta{ + PodId: p.Id, + Meta: contracts.Meta{Id: types.Checksum(pod.Namespace + "/" + pod.Name + "/" + k8sContainer.Name)}, + }, + Name: k8sContainer.Name, + Image: k8sContainer.Image, + CpuLimits: k8sContainer.Resources.Limits.Cpu().MilliValue(), + CpuRequests: k8sContainer.Resources.Requests.Cpu().MilliValue(), + MemoryLimits: k8sContainer.Resources.Limits.Memory().MilliValue(), + MemoryRequests: k8sContainer.Resources.Requests.Memory().MilliValue(), Ready: types.Bool{ - Bool: containerStatuses[container.Name].Ready, + Bool: containerStatuses[k8sContainer.Name].Ready, Valid: true, }, Started: types.Bool{ Bool: started, Valid: true, }, - RestartCount: containerStatuses[container.Name].RestartCount, + RestartCount: containerStatuses[k8sContainer.Name].RestartCount, State: containerState, StateDetails: stateDetails, - Logs: logs, - }) - - p.CpuLimits += container.Resources.Limits.Cpu().MilliValue() - p.CpuRequests += container.Resources.Requests.Cpu().MilliValue() - p.MemoryLimits += container.Resources.Limits.Memory().MilliValue() - p.MemoryRequests += container.Resources.Requests.Memory().MilliValue() - - for _, device := range container.VolumeDevices { - p.ContainerDevices = append(p.ContainerDevices, ContainerDevice{ - ContainerId: types.Checksum(pod.Namespace + "/" + pod.Name + "/" + container.Name), - PodId: p.Id, - Name: device.Name, - Path: device.DevicePath, - }) } + container.PropertiesChecksum = types.Checksum(MustMarshalJSON(container)) + + p.CpuLimits += k8sContainer.Resources.Limits.Cpu().MilliValue() + p.CpuRequests += k8sContainer.Resources.Requests.Cpu().MilliValue() + p.MemoryLimits += k8sContainer.Resources.Limits.Memory().MilliValue() + p.MemoryRequests += k8sContainer.Resources.Requests.Memory().MilliValue() - for _, mount := range container.VolumeMounts { - p.ContainerMounts = append(p.ContainerMounts, ContainerMount{ - ContainerId: types.Checksum(pod.Namespace + "/" + pod.Name + "/" + container.Name), - PodId: p.Id, - VolumeName: mount.Name, - Path: mount.MountPath, - SubPath: mount.SubPath, + for _, device := range k8sContainer.VolumeDevices { + cd := &ContainerDevice{ + ContainerMeta: ContainerMeta{ + Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, container.Id, device.Name))}, + ContainerId: container.Id, + }, + Name: device.Name, + Path: device.DevicePath, + } + cd.PropertiesChecksum = types.Checksum(MustMarshalJSON(cd)) + + container.Devices = append(container.Devices, cd) + } + + for _, mount := range k8sContainer.VolumeMounts { + cm := &ContainerMount{ + ContainerMeta: ContainerMeta{ + Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, container.Id, mount.Name))}, + ContainerId: container.Id, + }, + VolumeName: mount.Name, + Path: mount.MountPath, + SubPath: mount.SubPath, ReadOnly: types.Bool{ Bool: mount.ReadOnly, Valid: true, }, - }) + } + cm.PropertiesChecksum = types.Checksum(MustMarshalJSON(cm)) + + container.Mounts = append(container.Mounts, cm) } } for labelName, labelValue := range pod.Labels { - labelId := types.Checksum(strings.ToLower(labelName + ":" + labelValue)) - p.Labels = append(p.Labels, Label{ - Id: labelId, - Name: labelName, - Value: labelValue, - }) - p.PodLabels = append(p.PodLabels, PodLabel{ - PodId: p.Id, - LabelId: labelId, - }) + label := NewLabel(labelName, labelValue) + label.PodId = p.Id + label.PropertiesChecksum = types.Checksum(MustMarshalJSON(label)) + + p.Labels = append(p.Labels, label) } for _, ownerReference := range pod.OwnerReferences { @@ -254,11 +231,14 @@ func (p *Pod) Obtain(k8s kmetav1.Object) { if ownerReference.Controller != nil { controller = *ownerReference.Controller } - p.Owners = append(p.Owners, PodOwner{ - PodId: p.Id, - Kind: strcase.Snake(ownerReference.Kind), - Name: ownerReference.Name, - Uid: ownerReference.UID, + owner := &PodOwner{ + PodMeta: PodMeta{ + PodId: p.Id, + Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, ownerReference.UID))}, + }, + Kind: strcase.Snake(ownerReference.Kind), + Name: ownerReference.Name, + Uid: ownerReference.UID, BlockOwnerDeletion: types.Bool{ Bool: blockOwnerDeletion, Valid: true, @@ -267,7 +247,10 @@ func (p *Pod) Obtain(k8s kmetav1.Object) { Bool: controller, Valid: true, }, - }) + } + owner.PropertiesChecksum = types.Checksum(MustMarshalJSON(owner)) + + p.Owners = append(p.Owners, owner) } // https://kubernetes.io/docs/concepts/workloads/pods/init-containers/#resources @@ -282,27 +265,39 @@ func (p *Pod) Obtain(k8s kmetav1.Object) { for _, volume := range pod.Spec.Volumes { if volume.PersistentVolumeClaim != nil { - p.Pvcs = append(p.Pvcs, PodPvc{ - PodId: p.Id, + pvc := &PodPvc{ + PodMeta: PodMeta{ + PodId: p.Id, + Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, volume.Name, volume.PersistentVolumeClaim.ClaimName))}, + }, VolumeName: volume.Name, ClaimName: volume.PersistentVolumeClaim.ClaimName, ReadOnly: types.Bool{ Bool: volume.PersistentVolumeClaim.ReadOnly, Valid: true, }, - }) + } + pvc.PropertiesChecksum = types.Checksum(MustMarshalJSON(pvc)) + + p.Pvcs = append(p.Pvcs, pvc) } else { t, source, err := MarshalFirstNonNilStructFieldToJSON(volume.VolumeSource) if err != nil { panic(err) } - p.Volumes = append(p.Volumes, PodVolume{ - PodId: p.Id, + vol := &PodVolume{ + PodMeta: PodMeta{ + PodId: p.Id, + Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, volume.Name))}, + }, VolumeName: volume.Name, Type: t, Source: source, - }) + } + vol.PropertiesChecksum = types.Checksum(MustMarshalJSON(vol)) + + p.Volumes = append(p.Volumes, vol) } } } @@ -311,29 +306,16 @@ func (p *Pod) Relations() []database.Relation { fk := database.WithForeignKey("pod_id") return []database.Relation{ - database.HasMany(p.Conditions, fk), - database.HasMany(p.Containers, fk), - database.HasMany(p.ContainerDevices, fk), - database.HasMany(p.ContainerMounts, fk), + database.HasMany(p.Containers, fk, database.WithoutCascadeDelete()), database.HasMany(p.Owners, fk), - database.HasMany(p.Labels, database.WithoutCascadeDelete()), - database.HasMany(p.PodLabels, fk), + database.HasMany(p.Labels, fk), database.HasMany(p.Pvcs, fk), database.HasMany(p.Volumes, fk), } } -func getContainerLogs(clientset *kubernetes.Clientset, pod *kcorev1.Pod, container kcorev1.Container) (string, error) { - req := clientset.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &kcorev1.PodLogOptions{Container: container.Name}) - body, err := req.Stream(context.TODO()) - if err != nil { - return "", err - } - defer body.Close() - logs, err := io.ReadAll(body) - if err != nil { - return "", err - } - - return string(logs), nil -} +var ( + _ contracts.Entity = (*Pod)(nil) + _ contracts.Resource = (*Pod)(nil) + _ contracts.Entity = (*PodCondition)(nil) +) diff --git a/pkg/schema/v1/pvc.go b/pkg/schema/v1/pvc.go index 73499a0a..84b20dd4 100644 --- a/pkg/schema/v1/pvc.go +++ b/pkg/schema/v1/pvc.go @@ -2,12 +2,12 @@ package v1 import ( "database/sql" + "github.com/icinga/icinga-kubernetes/pkg/contracts" "github.com/icinga/icinga-kubernetes/pkg/database" "github.com/icinga/icinga-kubernetes/pkg/strcase" "github.com/icinga/icinga-kubernetes/pkg/types" kcorev1 "k8s.io/api/core/v1" kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "strings" ) type kpersistentVolumeAccessModesSize byte @@ -32,8 +32,7 @@ var persistentVolumeAccessModes = kpersistentVolumeAccessModes{ } type Pvc struct { - Meta - Id types.Binary + ResourceMeta DesiredAccessModes types.Bitmask[kpersistentVolumeAccessModesSize] ActualAccessModes types.Bitmask[kpersistentVolumeAccessModesSize] MinimumCapacity sql.NullInt64 @@ -42,13 +41,25 @@ type Pvc struct { VolumeName string VolumeMode sql.NullString StorageClass sql.NullString - Conditions []PvcCondition `db:"-"` - Labels []Label `db:"-"` - PvcLabels []PvcLabel `db:"-"` + Conditions []*PvcCondition `json:"-" db:"-"` + Labels []*Label `json:"-" db:"-"` +} + +type PvcMeta struct { + contracts.Meta + PvcId types.Binary +} + +func (pm *PvcMeta) Fingerprint() contracts.FingerPrinter { + return pm +} + +func (pm *PvcMeta) ParentID() types.Binary { + return pm.PvcId } type PvcCondition struct { - PvcId types.Binary + PvcMeta Type string Status string LastProbe types.UnixMilli @@ -57,12 +68,7 @@ type PvcCondition struct { Message string } -type PvcLabel struct { - PvcId types.Binary - LabelId types.Binary -} - -func NewPvc() Resource { +func NewPvc() contracts.Entity { return &Pvc{} } @@ -96,29 +102,32 @@ func (p *Pvc) Obtain(k8s kmetav1.Object) { } } + p.PropertiesChecksum = types.Checksum(MustMarshalJSON(p)) + for _, condition := range pvc.Status.Conditions { - p.Conditions = append(p.Conditions, PvcCondition{ - PvcId: p.Id, + pvcCond := &PvcCondition{ + PvcMeta: PvcMeta{ + PvcId: p.Id, + Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, condition.Type))}, + }, Type: strcase.Snake(string(condition.Type)), Status: string(condition.Status), LastProbe: types.UnixMilli(condition.LastProbeTime.Time), LastTransition: types.UnixMilli(condition.LastTransitionTime.Time), Reason: condition.Reason, Message: condition.Message, - }) + } + pvcCond.PropertiesChecksum = types.Checksum(MustMarshalJSON(pvcCond)) + + p.Conditions = append(p.Conditions, pvcCond) } for labelName, labelValue := range pvc.Labels { - labelId := types.Checksum(strings.ToLower(labelName + ":" + labelValue)) - p.Labels = append(p.Labels, Label{ - Id: labelId, - Name: labelName, - Value: labelValue, - }) - p.PvcLabels = append(p.PvcLabels, PvcLabel{ - PvcId: p.Id, - LabelId: labelId, - }) + label := NewLabel(labelName, labelValue) + label.PvcId = p.Id + label.PropertiesChecksum = types.Checksum(MustMarshalJSON(label)) + + p.Labels = append(p.Labels, label) } } @@ -127,7 +136,6 @@ func (p *Pvc) Relations() []database.Relation { return []database.Relation{ database.HasMany(p.Conditions, fk), - database.HasMany(p.PvcLabels, fk), - database.HasMany(p.Labels, database.WithoutCascadeDelete()), + database.HasMany(p.Labels, fk), } } diff --git a/pkg/schema/v1/replica_set.go b/pkg/schema/v1/replica_set.go index 670201aa..bae3de96 100644 --- a/pkg/schema/v1/replica_set.go +++ b/pkg/schema/v1/replica_set.go @@ -1,32 +1,43 @@ package v1 import ( + "github.com/icinga/icinga-kubernetes/pkg/contracts" "github.com/icinga/icinga-kubernetes/pkg/database" "github.com/icinga/icinga-kubernetes/pkg/strcase" "github.com/icinga/icinga-kubernetes/pkg/types" kappsv1 "k8s.io/api/apps/v1" kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ktypes "k8s.io/apimachinery/pkg/types" - "strings" ) type ReplicaSet struct { - Meta - Id types.Binary + ResourceMeta DesiredReplicas int32 MinReadySeconds int32 ActualReplicas int32 FullyLabeledReplicas int32 ReadyReplicas int32 AvailableReplicas int32 - Conditions []ReplicaSetCondition `db:"-"` - Owners []ReplicaSetOwner `db:"-"` - Labels []Label `db:"-"` - ReplicaSetLabels []ReplicaSetLabel `db:"-"` + Conditions []*ReplicaSetCondition `json:"-" db:"-"` + Owners []*ReplicaSetOwner `json:"-" db:"-"` + Labels []*Label `json:"-" db:"-"` +} + +type ReplicaSetMeta struct { + contracts.Meta + ReplicaSetId types.Binary +} + +func (rm *ReplicaSetMeta) Fingerprint() contracts.FingerPrinter { + return rm +} + +func (rm *ReplicaSetMeta) ParentID() types.Binary { + return rm.ReplicaSetId } type ReplicaSetCondition struct { - ReplicaSetId types.Binary + ReplicaSetMeta Type string Status string LastTransition types.UnixMilli @@ -35,7 +46,7 @@ type ReplicaSetCondition struct { } type ReplicaSetOwner struct { - ReplicaSetId types.Binary + ReplicaSetMeta Kind string Name string Uid ktypes.UID @@ -43,12 +54,7 @@ type ReplicaSetOwner struct { BlockOwnerDeletion types.Bool } -type ReplicaSetLabel struct { - ReplicaSetId types.Binary - LabelId types.Binary -} - -func NewReplicaSet() Resource { +func NewReplicaSet() contracts.Entity { return &ReplicaSet{} } @@ -69,15 +75,23 @@ func (r *ReplicaSet) Obtain(k8s kmetav1.Object) { r.ReadyReplicas = replicaSet.Status.ReadyReplicas r.AvailableReplicas = replicaSet.Status.AvailableReplicas + r.PropertiesChecksum = types.Checksum(MustMarshalJSON(r)) + for _, condition := range replicaSet.Status.Conditions { - r.Conditions = append(r.Conditions, ReplicaSetCondition{ - ReplicaSetId: r.Id, + replicaSetCond := &ReplicaSetCondition{ + ReplicaSetMeta: ReplicaSetMeta{ + ReplicaSetId: r.Id, + Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(r.Id, condition.Type))}, + }, Type: strcase.Snake(string(condition.Type)), Status: string(condition.Status), LastTransition: types.UnixMilli(condition.LastTransitionTime.Time), Reason: condition.Reason, Message: condition.Message, - }) + } + replicaSetCond.PropertiesChecksum = types.Checksum(MustMarshalJSON(replicaSetCond)) + + r.Conditions = append(r.Conditions, replicaSetCond) } for _, ownerReference := range replicaSet.OwnerReferences { @@ -88,11 +102,15 @@ func (r *ReplicaSet) Obtain(k8s kmetav1.Object) { if ownerReference.Controller != nil { controller = *ownerReference.Controller } - r.Owners = append(r.Owners, ReplicaSetOwner{ - ReplicaSetId: r.Id, - Kind: strcase.Snake(ownerReference.Kind), - Name: ownerReference.Name, - Uid: ownerReference.UID, + + owner := &ReplicaSetOwner{ + ReplicaSetMeta: ReplicaSetMeta{ + ReplicaSetId: r.Id, + Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(r.Id, ownerReference.UID))}, + }, + Kind: strcase.Snake(ownerReference.Kind), + Name: ownerReference.Name, + Uid: ownerReference.UID, BlockOwnerDeletion: types.Bool{ Bool: blockOwnerDeletion, Valid: true, @@ -101,20 +119,18 @@ func (r *ReplicaSet) Obtain(k8s kmetav1.Object) { Bool: controller, Valid: true, }, - }) + } + owner.PropertiesChecksum = types.Checksum(MustMarshalJSON(owner)) + + r.Owners = append(r.Owners, owner) } for labelName, labelValue := range replicaSet.Labels { - labelId := types.Checksum(strings.ToLower(labelName + ":" + labelValue)) - r.Labels = append(r.Labels, Label{ - Id: labelId, - Name: labelName, - Value: labelValue, - }) - r.ReplicaSetLabels = append(r.ReplicaSetLabels, ReplicaSetLabel{ - ReplicaSetId: r.Id, - LabelId: labelId, - }) + label := NewLabel(labelName, labelValue) + label.ReplicaSetId = r.Id + label.PropertiesChecksum = types.Checksum(MustMarshalJSON(label)) + + r.Labels = append(r.Labels, label) } } @@ -124,7 +140,6 @@ func (r *ReplicaSet) Relations() []database.Relation { return []database.Relation{ database.HasMany(r.Conditions, fk), database.HasMany(r.Owners, fk), - database.HasMany(r.ReplicaSetLabels, fk), - database.HasMany(r.Labels, database.WithoutCascadeDelete()), + database.HasMany(r.Labels, fk), } } diff --git a/pkg/schema/v1/service.go b/pkg/schema/v1/service.go index 40e11488..3e78b5c8 100644 --- a/pkg/schema/v1/service.go +++ b/pkg/schema/v1/service.go @@ -1,17 +1,19 @@ package v1 import ( + "github.com/icinga/icinga-kubernetes/pkg/contracts" + "github.com/icinga/icinga-kubernetes/pkg/types" kcorev1 "k8s.io/api/core/v1" kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) type Service struct { - Meta + ResourceMeta Type string ClusterIP string } -func NewService() Resource { +func NewService() contracts.Entity { return &Service{} } @@ -20,6 +22,8 @@ func (s *Service) Obtain(k8s kmetav1.Object) { service := k8s.(*kcorev1.Service) + s.Id = types.Checksum(s.Namespace + "/" + s.Name) s.Type = string(service.Spec.Type) s.ClusterIP = service.Spec.ClusterIP + s.PropertiesChecksum = types.Checksum(MustMarshalJSON(s)) } diff --git a/pkg/schema/v1/stateful_set.go b/pkg/schema/v1/stateful_set.go index 21fa0b41..55719c93 100644 --- a/pkg/schema/v1/stateful_set.go +++ b/pkg/schema/v1/stateful_set.go @@ -1,17 +1,16 @@ package v1 import ( + "github.com/icinga/icinga-kubernetes/pkg/contracts" "github.com/icinga/icinga-kubernetes/pkg/database" "github.com/icinga/icinga-kubernetes/pkg/strcase" "github.com/icinga/icinga-kubernetes/pkg/types" kappsv1 "k8s.io/api/apps/v1" kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "strings" ) type StatefulSet struct { - Meta - Id types.Binary + ResourceMeta DesiredReplicas int32 ServiceName string PodManagementPolicy string @@ -25,13 +24,25 @@ type StatefulSet struct { CurrentReplicas int32 UpdatedReplicas int32 AvailableReplicas int32 - Conditions []StatefulSetCondition `db:"-"` - Labels []Label `db:"-"` - StatefulSetLabels []StatefulSetLabel `db:"-"` + Conditions []*StatefulSetCondition `json:"-" db:"-"` + Labels []*Label `json:"-" db:"-"` +} + +type StatefulSetMeta struct { + contracts.Meta + StatefulSetId types.Binary +} + +func (sm *StatefulSetMeta) Fingerprint() contracts.FingerPrinter { + return sm +} + +func (sm *StatefulSetMeta) ParentID() types.Binary { + return sm.StatefulSetId } type StatefulSetCondition struct { - StatefulSetId types.Binary + StatefulSetMeta Type string Status string LastTransition types.UnixMilli @@ -39,12 +50,7 @@ type StatefulSetCondition struct { Message string } -type StatefulSetLabel struct { - StatefulSetId types.Binary - LabelId types.Binary -} - -func NewStatefulSet() Resource { +func NewStatefulSet() contracts.Entity { return &StatefulSet{} } @@ -83,28 +89,31 @@ func (s *StatefulSet) Obtain(k8s kmetav1.Object) { s.UpdatedReplicas = statefulSet.Status.UpdatedReplicas s.AvailableReplicas = statefulSet.Status.AvailableReplicas + s.PropertiesChecksum = types.Checksum(MustMarshalJSON(s)) + for _, condition := range statefulSet.Status.Conditions { - s.Conditions = append(s.Conditions, StatefulSetCondition{ - StatefulSetId: s.Id, + cond := &StatefulSetCondition{ + StatefulSetMeta: StatefulSetMeta{ + StatefulSetId: s.Id, + Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(s.Id, condition.Type))}, + }, Type: string(condition.Type), Status: string(condition.Status), LastTransition: types.UnixMilli(condition.LastTransitionTime.Time), Reason: condition.Reason, Message: condition.Message, - }) + } + cond.PropertiesChecksum = types.Checksum(MustMarshalJSON(cond)) + + s.Conditions = append(s.Conditions, cond) } for labelName, labelValue := range statefulSet.Labels { - labelId := types.Checksum(strings.ToLower(labelName + ":" + labelValue)) - s.Labels = append(s.Labels, Label{ - Id: labelId, - Name: labelName, - Value: labelValue, - }) - s.StatefulSetLabels = append(s.StatefulSetLabels, StatefulSetLabel{ - StatefulSetId: s.Id, - LabelId: labelId, - }) + label := NewLabel(labelName, labelValue) + label.StatefulSetId = s.Id + label.PropertiesChecksum = types.Checksum(MustMarshalJSON(label)) + + s.Labels = append(s.Labels, label) } } @@ -113,7 +122,6 @@ func (s *StatefulSet) Relations() []database.Relation { return []database.Relation{ database.HasMany(s.Conditions, fk), - database.HasMany(s.StatefulSetLabels, fk), - database.HasMany(s.Labels, database.WithoutCascadeDelete()), + database.HasMany(s.Labels, fk), } } diff --git a/pkg/schema/v1/utils.go b/pkg/schema/v1/utils.go index e9d08e5b..b50c6a11 100644 --- a/pkg/schema/v1/utils.go +++ b/pkg/schema/v1/utils.go @@ -21,3 +21,16 @@ func MarshalFirstNonNilStructFieldToJSON(i any) (string, string, error) { return "", "", nil } + +// MustMarshalJSON json encodes the given object. +// TODO: This is just used to generate the checksum of the object properties. +// - This should no longer be necessary once we have implemented a more sophisticated +// - method for hashing a structure. +func MustMarshalJSON(v interface{}) []byte { + b, err := types.MarshalJSON(v) + if err != nil { + panic(err) + } + + return b +} From 4324dc7998e9fca13a26db4c105ede3a7e3a5873 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Mon, 3 Jul 2023 12:16:00 +0200 Subject: [PATCH 8/8] Enhance config sync --- pkg/contracts/contracts.go | 2 +- pkg/database/database.go | 102 +++++++- pkg/database/features.go | 16 +- pkg/schema/v1/container.go | 5 +- pkg/schema/v1/daemon_set.go | 10 +- pkg/schema/v1/deployment.go | 10 +- pkg/schema/v1/event.go | 4 +- pkg/schema/v1/namespace.go | 6 +- pkg/schema/v1/node.go | 10 +- pkg/schema/v1/persistent_volume.go | 6 +- pkg/schema/v1/pod.go | 380 ++++++++++++++++------------- pkg/schema/v1/pvc.go | 10 +- pkg/schema/v1/replica_set.go | 14 +- pkg/schema/v1/service.go | 2 +- pkg/schema/v1/stateful_set.go | 10 +- pkg/schema/v1/utils.go | 13 - pkg/sync/v1/sync.go | 269 ++++++++++++++++---- pkg/types/objectpacker.go | 77 ++++++ schema/mysql/schema.sql | 125 ++++++---- 19 files changed, 735 insertions(+), 336 deletions(-) diff --git a/pkg/contracts/contracts.go b/pkg/contracts/contracts.go index 11110058..6f734997 100644 --- a/pkg/contracts/contracts.go +++ b/pkg/contracts/contracts.go @@ -51,7 +51,7 @@ type Resource interface { type Meta struct { Id types.Binary `db:"id"` - PropertiesChecksum types.Binary `json:"-"` + PropertiesChecksum types.Binary `hash:"-"` } func (m *Meta) Checksum() types.Binary { diff --git a/pkg/database/database.go b/pkg/database/database.go index 9b3ff94b..868f8304 100644 --- a/pkg/database/database.go +++ b/pkg/database/database.go @@ -406,7 +406,7 @@ func (db *Database) DeleteStreamed( defer runtime.HandleCrash() defer close(ch) - return db.DeleteStreamed(ctx, relation, ch, features...) + return db.DeleteStreamed(ctx, relation, ch, WithCascading(), WithBlocking()) }) streams[TableName(relation)] = ch } @@ -494,8 +494,33 @@ func (db *Database) DeleteStreamed( // UpsertStreamed bulk upserts the specified entities via NamedBulkExec. // The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream. -// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and -// concurrency is controlled via Options.MaxConnectionsPerTable. +// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and concurrency is controlled via +// Options.MaxConnectionsPerTable. +// +// This sync process consists of the following steps: +// +// - It initially copies the first item from the specified stream and checks if this entity type provides relations. +// If so, it first traverses all these relations recursively and starts a separate goroutine and caches the streams +// for the started goroutine/relations type. +// +// - After the relations have been resolved, another goroutine is started which consumes from the specified `entities` +// chan and performs the following actions for each of the streamed entities: +// +// - If the consumed entity doesn't satisfy the contracts.Entity interface, it will just forward that entity to the +// next stage. +// +// - When the entity does satisfy the contracts.Entity, it applies the filter func on this entity (which hopefully +// should check for its checksums), and forwards the entity to the `forward` chan only if the filter function +// returns true and initiates a database upsert stream. Regardless, whether the function returns true, it will +// stream each of the child entity with the `relation.Stream()` method to the respective cached stream of the relation. +// +// However, when the first item doesn't satisfy the database.HasRelations interface, it will just use only two +// stages for the streamed entities to be upserted: +// +// - The first stage just consumes from the source stream (the `entities` chan) and applies the filter function (if any) +// on each of the entities. This won't forward entities for which the filter function didn't also return true as well. +// +// - The second stage just performs a database upsert queries for entities that were forwarded from the previous one. func (db *Database) UpsertStreamed( ctx context.Context, entities <-chan interface{}, features ...Feature, ) error { @@ -520,7 +545,7 @@ func (db *Database) UpsertStreamed( defer runtime.HandleCrash() defer close(ch) - return db.UpsertStreamed(ctx, ch) + return db.UpsertStreamed(ctx, ch, WithCascading(), WithPreExecution(with.preExecution)) }) streams[TableName(relation)] = ch } @@ -535,19 +560,30 @@ func (db *Database) UpsertStreamed( for { select { - case entity, more := <-source: + case e, more := <-source: if !more { return nil } - select { - case forward <- entity: - case <-ctx.Done(): - return ctx.Err() + entity, ok := e.(contracts.Entity) + shouldUpsert := true + if ok && with.preExecution != nil { + shouldUpsert, err = with.preExecution(entity) + if err != nil { + return err + } + } + + if shouldUpsert { + select { + case forward <- e: + case <-ctx.Done(): + return ctx.Err() + } } select { - case dup <- entity: + case dup <- e: case <-ctx.Done(): return ctx.Err() } @@ -591,8 +627,50 @@ func (db *Database) UpsertStreamed( return g.Wait() } - return db.NamedBulkExec( - ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, com.NeverSplit[any], features...) + upsertEntities := make(chan interface{}) + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + defer runtime.HandleCrash() + defer close(upsertEntities) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case e, ok := <-forward: + if !ok { + return nil + } + + entity, ok := e.(contracts.Entity) + shouldUpsert := true + if ok && with.preExecution != nil { + shouldUpsert, err = with.preExecution(entity) + if err != nil { + return err + } + } + + if shouldUpsert { + select { + case upsertEntities <- entity: + case <-ctx.Done(): + return ctx.Err() + } + } + } + } + }) + + g.Go(func() error { + defer runtime.HandleCrash() + + return db.NamedBulkExec( + ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, upsertEntities, com.NeverSplit[any], features..., + ) + }) + + return g.Wait() } // YieldAll executes the query with the supplied scope, diff --git a/pkg/database/features.go b/pkg/database/features.go index cafb26a7..54ddcf86 100644 --- a/pkg/database/features.go +++ b/pkg/database/features.go @@ -2,14 +2,18 @@ package database import ( "github.com/icinga/icinga-kubernetes/pkg/com" + "github.com/icinga/icinga-kubernetes/pkg/contracts" ) type Feature func(*Features) +type PreExecFunc func(contracts.Entity) (bool, error) + type Features struct { - blocking bool - cascading bool - onSuccess com.ProcessBulk[any] + blocking bool + cascading bool + onSuccess com.ProcessBulk[any] + preExecution PreExecFunc } func NewFeatures(features ...Feature) *Features { @@ -33,6 +37,12 @@ func WithCascading() Feature { } } +func WithPreExecution(preExec PreExecFunc) Feature { + return func(f *Features) { + f.preExecution = preExec + } +} + func WithOnSuccess(fn com.ProcessBulk[any]) Feature { return func(f *Features) { f.onSuccess = fn diff --git a/pkg/schema/v1/container.go b/pkg/schema/v1/container.go index 2e41c2ff..067f24e9 100644 --- a/pkg/schema/v1/container.go +++ b/pkg/schema/v1/container.go @@ -20,8 +20,9 @@ type Container struct { Ready types.Bool Started types.Bool RestartCount int32 - Devices []*ContainerDevice `db:"-"` - Mounts []*ContainerMount `db:"-"` + Logs string + Devices []*ContainerDevice `db:"-" hash:"-"` + Mounts []*ContainerMount `db:"-" hash:"-"` } func (c *Container) Relations() []database.Relation { diff --git a/pkg/schema/v1/daemon_set.go b/pkg/schema/v1/daemon_set.go index 6dff2246..860835c8 100644 --- a/pkg/schema/v1/daemon_set.go +++ b/pkg/schema/v1/daemon_set.go @@ -20,8 +20,8 @@ type DaemonSet struct { UpdateNumberScheduled int32 NumberAvailable int32 NumberUnavailable int32 - Conditions []*DaemonSetCondition `json:"-" db:"-"` - Labels []*Label `json:"-" db:"-"` + Conditions []*DaemonSetCondition `db:"-" hash:"-"` + Labels []*Label `db:"-" hash:"-"` } type DaemonSetMeta struct { @@ -66,7 +66,7 @@ func (d *DaemonSet) Obtain(k8s kmetav1.Object) { d.NumberAvailable = daemonSet.Status.NumberAvailable d.NumberUnavailable = daemonSet.Status.NumberUnavailable - d.PropertiesChecksum = types.Checksum(MustMarshalJSON(d)) + d.PropertiesChecksum = types.HashStruct(d) for _, condition := range daemonSet.Status.Conditions { daemonCond := &DaemonSetCondition{ @@ -80,7 +80,7 @@ func (d *DaemonSet) Obtain(k8s kmetav1.Object) { Reason: condition.Reason, Message: condition.Message, } - daemonCond.PropertiesChecksum = types.Checksum(MustMarshalJSON(daemonCond)) + daemonCond.PropertiesChecksum = types.HashStruct(daemonCond) d.Conditions = append(d.Conditions, daemonCond) } @@ -88,7 +88,7 @@ func (d *DaemonSet) Obtain(k8s kmetav1.Object) { for labelName, labelValue := range daemonSet.Labels { label := NewLabel(labelName, labelValue) label.DaemonSetId = d.Id - label.PropertiesChecksum = types.Checksum(MustMarshalJSON(label)) + label.PropertiesChecksum = types.HashStruct(label) d.Labels = append(d.Labels, label) } diff --git a/pkg/schema/v1/deployment.go b/pkg/schema/v1/deployment.go index 764d0e41..a7d7dbd6 100644 --- a/pkg/schema/v1/deployment.go +++ b/pkg/schema/v1/deployment.go @@ -21,8 +21,8 @@ type Deployment struct { ReadyReplicas int32 AvailableReplicas int32 UnavailableReplicas int32 - Conditions []*DeploymentCondition `json:"-" db:"-"` - Labels []*Label `json:"-" db:"-"` + Conditions []*DeploymentCondition `db:"-" hash:"-"` + Labels []*Label `db:"-" hash:"-"` } type DeploymentConditionMeta struct { @@ -79,7 +79,7 @@ func (d *Deployment) Obtain(k8s kmetav1.Object) { d.ReadyReplicas = deployment.Status.ReadyReplicas d.UnavailableReplicas = deployment.Status.UnavailableReplicas - d.PropertiesChecksum = types.Checksum(MustMarshalJSON(d)) + d.PropertiesChecksum = types.HashStruct(d) for _, condition := range deployment.Status.Conditions { deploymentCond := &DeploymentCondition{ @@ -94,7 +94,7 @@ func (d *Deployment) Obtain(k8s kmetav1.Object) { Reason: condition.Reason, Message: condition.Message, } - deploymentCond.PropertiesChecksum = types.Checksum(MustMarshalJSON(deploymentCond)) + deploymentCond.PropertiesChecksum = types.HashStruct(deploymentCond) d.Conditions = append(d.Conditions, deploymentCond) } @@ -102,7 +102,7 @@ func (d *Deployment) Obtain(k8s kmetav1.Object) { for labelName, labelValue := range deployment.Labels { label := NewLabel(labelName, labelValue) label.DeploymentId = d.Id - label.PropertiesChecksum = types.Checksum(MustMarshalJSON(label)) + label.PropertiesChecksum = types.HashStruct(label) d.Labels = append(d.Labels, label) } diff --git a/pkg/schema/v1/event.go b/pkg/schema/v1/event.go index f21a4aaf..1a962eaf 100644 --- a/pkg/schema/v1/event.go +++ b/pkg/schema/v1/event.go @@ -29,9 +29,6 @@ func NewEvent() contracts.Entity { func (e *Event) Obtain(k8s kmetav1.Object) { e.ObtainMeta(k8s) - defer func() { - e.PropertiesChecksum = types.Checksum(MustMarshalJSON(e)) - }() event := k8s.(*keventsv1.Event) @@ -56,6 +53,7 @@ func (e *Event) Obtain(k8s kmetav1.Object) { e.LastSeen = types.UnixMilli(event.DeprecatedLastTimestamp.Time) } e.Count = event.DeprecatedCount + e.PropertiesChecksum = types.HashStruct(e) // e.FirstSeen = types.UnixMilli(event.EventTime.Time) // if event.Series != nil { // e.LastSeen = types.UnixMilli(event.Series.LastObservedTime.Time) diff --git a/pkg/schema/v1/namespace.go b/pkg/schema/v1/namespace.go index 77c3d94b..21cc3b22 100644 --- a/pkg/schema/v1/namespace.go +++ b/pkg/schema/v1/namespace.go @@ -12,7 +12,7 @@ import ( type Namespace struct { ResourceMeta Phase string - Conditions []*NamespaceCondition `json:"-" db:"-"` + Conditions []*NamespaceCondition `db:"-" hash:"-"` } type NamespaceMeta struct { @@ -49,7 +49,7 @@ func (n *Namespace) Obtain(k8s kmetav1.Object) { n.Id = types.Checksum(namespace.Name) n.Phase = strings.ToLower(string(namespace.Status.Phase)) - n.PropertiesChecksum = types.Checksum(MustMarshalJSON(n)) + n.PropertiesChecksum = types.HashStruct(n) for _, condition := range namespace.Status.Conditions { namespaceCond := &NamespaceCondition{ @@ -63,7 +63,7 @@ func (n *Namespace) Obtain(k8s kmetav1.Object) { Reason: condition.Reason, Message: condition.Message, } - namespaceCond.PropertiesChecksum = types.Checksum(MustMarshalJSON(namespaceCond)) + namespaceCond.PropertiesChecksum = types.HashStruct(namespaceCond) n.Conditions = append(n.Conditions, namespaceCond) } diff --git a/pkg/schema/v1/node.go b/pkg/schema/v1/node.go index 648d1f5d..914b51c6 100644 --- a/pkg/schema/v1/node.go +++ b/pkg/schema/v1/node.go @@ -22,8 +22,8 @@ type Node struct { MemoryCapacity int64 MemoryAllocatable int64 PodCapacity int64 - Conditions []*NodeCondition `json:"-" db:"-"` - Volumes []*NodeVolume `json:"-" db:"-"` + Conditions []*NodeCondition `db:"-" hash:"-"` + Volumes []*NodeVolume `db:"-" hash:"-"` } type NodeMeta struct { @@ -87,7 +87,7 @@ func (n *Node) Obtain(k8s kmetav1.Object) { n.MemoryAllocatable = node.Status.Allocatable.Memory().MilliValue() n.PodCapacity = node.Status.Allocatable.Pods().Value() - n.PropertiesChecksum = types.Checksum(MustMarshalJSON(n)) + n.PropertiesChecksum = types.HashStruct(n) for _, condition := range node.Status.Conditions { nodeCond := &NodeCondition{ @@ -102,7 +102,7 @@ func (n *Node) Obtain(k8s kmetav1.Object) { Reason: condition.Reason, Message: condition.Message, } - nodeCond.PropertiesChecksum = types.Checksum(MustMarshalJSON(nodeCond)) + nodeCond.PropertiesChecksum = types.HashStruct(nodeCond) n.Conditions = append(n.Conditions, nodeCond) } @@ -125,7 +125,7 @@ func (n *Node) Obtain(k8s kmetav1.Object) { Valid: true, }, } - nodeVolume.PropertiesChecksum = types.Checksum(MustMarshalJSON(nodeVolume)) + nodeVolume.PropertiesChecksum = types.HashStruct(nodeVolume) n.Volumes = append(n.Volumes, nodeVolume) } diff --git a/pkg/schema/v1/persistent_volume.go b/pkg/schema/v1/persistent_volume.go index 3da11726..bfb1315d 100644 --- a/pkg/schema/v1/persistent_volume.go +++ b/pkg/schema/v1/persistent_volume.go @@ -23,7 +23,7 @@ type PersistentVolume struct { Phase string Reason string Message string - Claim *PersistentVolumeClaimRef `json:"-" db:"-"` + Claim *PersistentVolumeClaimRef `db:"-" hash:"-"` } type PersistentVolumeMeta struct { @@ -75,7 +75,7 @@ func (p *PersistentVolume) Obtain(k8s kmetav1.Object) { panic(err) } - p.PropertiesChecksum = types.Checksum(MustMarshalJSON(p)) + p.PropertiesChecksum = types.HashStruct(p) p.Claim = &PersistentVolumeClaimRef{ PersistentVolumeMeta: PersistentVolumeMeta{ @@ -86,7 +86,7 @@ func (p *PersistentVolume) Obtain(k8s kmetav1.Object) { Name: persistentVolume.Spec.ClaimRef.Name, Uid: persistentVolume.Spec.ClaimRef.UID, } - p.Claim.PropertiesChecksum = types.Checksum(MustMarshalJSON(p.Claim)) + p.Claim.PropertiesChecksum = types.HashStruct(p.Claim) } func (p *PersistentVolume) Relations() []database.Relation { diff --git a/pkg/schema/v1/pod.go b/pkg/schema/v1/pod.go index 960afbb1..4f81e59d 100644 --- a/pkg/schema/v1/pod.go +++ b/pkg/schema/v1/pod.go @@ -1,11 +1,15 @@ package v1 import ( + "context" "database/sql" + "fmt" "github.com/icinga/icinga-kubernetes/pkg/contracts" "github.com/icinga/icinga-kubernetes/pkg/database" "github.com/icinga/icinga-kubernetes/pkg/strcase" "github.com/icinga/icinga-kubernetes/pkg/types" + "golang.org/x/sync/errgroup" + "io" kcorev1 "k8s.io/api/core/v1" kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ktypes "k8s.io/apimachinery/pkg/types" @@ -30,12 +34,12 @@ type Pod struct { Message string Qos string RestartPolicy string - Conditions []*PodCondition `db:"-"` - Containers []*Container `db:"-"` - Owners []*PodOwner `db:"-"` - Labels []*Label `db:"-"` - Pvcs []*PodPvc `db:"-"` - Volumes []*PodVolume `db:"-"` + Conditions []*PodCondition `db:"-" hash:"-"` + Containers []*Container `db:"-" hash:"-"` + Owners []*PodOwner `db:"-" hash:"-"` + Labels []*Label `db:"-" hash:"-"` + Pvcs []*PodPvc `db:"-" hash:"-"` + Volumes []*PodVolume `db:"-" hash:"-"` factory *PodFactory } @@ -98,7 +102,7 @@ func (f *PodFactory) New() contracts.Entity { func (p *Pod) Obtain(k8s kmetav1.Object) { p.ObtainMeta(k8s) defer func() { - p.PropertiesChecksum = types.Checksum(MustMarshalJSON(p)) + p.PropertiesChecksum = types.HashStruct(p) }() pod := k8s.(*kcorev1.Pod) @@ -113,200 +117,229 @@ func (p *Pod) Obtain(k8s kmetav1.Object) { p.Qos = strcase.Snake(string(pod.Status.QOSClass)) p.RestartPolicy = strcase.Snake(string(pod.Spec.RestartPolicy)) - for _, condition := range pod.Status.Conditions { - podCond := &PodCondition{ - PodMeta: PodMeta{ - PodId: p.Id, - Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, condition.Type))}, - }, - Type: string(condition.Type), - Status: string(condition.Status), - LastProbe: types.UnixMilli(condition.LastProbeTime.Time), - LastTransition: types.UnixMilli(condition.LastTransitionTime.Time), - Reason: condition.Reason, - Message: condition.Message, - } - podCond.PropertiesChecksum = types.Checksum(MustMarshalJSON(podCond)) - - p.Conditions = append(p.Conditions, podCond) - } + g := &errgroup.Group{} + g.Go(func() error { + for _, condition := range pod.Status.Conditions { + podCond := &PodCondition{ + PodMeta: PodMeta{ + PodId: p.Id, + Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, condition.Type))}, + }, + Type: string(condition.Type), + Status: string(condition.Status), + LastProbe: types.UnixMilli(condition.LastProbeTime.Time), + LastTransition: types.UnixMilli(condition.LastTransitionTime.Time), + Reason: condition.Reason, + Message: condition.Message, + } + podCond.PropertiesChecksum = types.HashStruct(podCond) - containerStatuses := make(map[string]kcorev1.ContainerStatus, len(pod.Spec.Containers)) - for _, containerStatus := range pod.Status.ContainerStatuses { - containerStatuses[containerStatus.Name] = containerStatus - } - for _, k8sContainer := range pod.Spec.Containers { - var started bool - if containerStatuses[k8sContainer.Name].Started != nil { - started = *containerStatuses[k8sContainer.Name].Started - } - state, stateDetails, err := MarshalFirstNonNilStructFieldToJSON(containerStatuses[k8sContainer.Name].State) - if err != nil { - panic(err) + p.Conditions = append(p.Conditions, podCond) } - var containerState sql.NullString - if state != "" { - containerState.String = strcase.Snake(state) - containerState.Valid = true - } + return nil + }) - container := &Container{ - PodMeta: PodMeta{ - PodId: p.Id, - Meta: contracts.Meta{Id: types.Checksum(pod.Namespace + "/" + pod.Name + "/" + k8sContainer.Name)}, - }, - Name: k8sContainer.Name, - Image: k8sContainer.Image, - CpuLimits: k8sContainer.Resources.Limits.Cpu().MilliValue(), - CpuRequests: k8sContainer.Resources.Requests.Cpu().MilliValue(), - MemoryLimits: k8sContainer.Resources.Limits.Memory().MilliValue(), - MemoryRequests: k8sContainer.Resources.Requests.Memory().MilliValue(), - Ready: types.Bool{ - Bool: containerStatuses[k8sContainer.Name].Ready, - Valid: true, - }, - Started: types.Bool{ - Bool: started, - Valid: true, - }, - RestartCount: containerStatuses[k8sContainer.Name].RestartCount, - State: containerState, - StateDetails: stateDetails, + g.Go(func() error { + containerStatuses := make(map[string]kcorev1.ContainerStatus, len(pod.Spec.Containers)) + for _, containerStatus := range pod.Status.ContainerStatuses { + containerStatuses[containerStatus.Name] = containerStatus } - container.PropertiesChecksum = types.Checksum(MustMarshalJSON(container)) - - p.CpuLimits += k8sContainer.Resources.Limits.Cpu().MilliValue() - p.CpuRequests += k8sContainer.Resources.Requests.Cpu().MilliValue() - p.MemoryLimits += k8sContainer.Resources.Limits.Memory().MilliValue() - p.MemoryRequests += k8sContainer.Resources.Requests.Memory().MilliValue() - - for _, device := range k8sContainer.VolumeDevices { - cd := &ContainerDevice{ - ContainerMeta: ContainerMeta{ - Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, container.Id, device.Name))}, - ContainerId: container.Id, - }, - Name: device.Name, - Path: device.DevicePath, + for _, k8sContainer := range pod.Spec.Containers { + var started bool + if containerStatuses[k8sContainer.Name].Started != nil { + started = *containerStatuses[k8sContainer.Name].Started + } + state, stateDetails, err := MarshalFirstNonNilStructFieldToJSON(containerStatuses[k8sContainer.Name].State) + if err != nil { + panic(err) } - cd.PropertiesChecksum = types.Checksum(MustMarshalJSON(cd)) - container.Devices = append(container.Devices, cd) - } + logs, err := getContainerLogs(p.factory.clientset, pod, k8sContainer) + if err != nil { + // ContainerCreating, NotFound, ... + fmt.Println(err) + logs = "" + } - for _, mount := range k8sContainer.VolumeMounts { - cm := &ContainerMount{ - ContainerMeta: ContainerMeta{ - Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, container.Id, mount.Name))}, - ContainerId: container.Id, + var containerState sql.NullString + if state != "" { + containerState.String = strcase.Snake(state) + containerState.Valid = true + } + + container := &Container{ + PodMeta: PodMeta{ + PodId: p.Id, + Meta: contracts.Meta{Id: types.Checksum(pod.Namespace + "/" + pod.Name + "/" + k8sContainer.Name)}, }, - VolumeName: mount.Name, - Path: mount.MountPath, - SubPath: mount.SubPath, - ReadOnly: types.Bool{ - Bool: mount.ReadOnly, + Name: k8sContainer.Name, + Image: k8sContainer.Image, + CpuLimits: k8sContainer.Resources.Limits.Cpu().MilliValue(), + CpuRequests: k8sContainer.Resources.Requests.Cpu().MilliValue(), + MemoryLimits: k8sContainer.Resources.Limits.Memory().MilliValue(), + MemoryRequests: k8sContainer.Resources.Requests.Memory().MilliValue(), + Logs: logs, + Ready: types.Bool{ + Bool: containerStatuses[k8sContainer.Name].Ready, Valid: true, }, + Started: types.Bool{ + Bool: started, + Valid: true, + }, + RestartCount: containerStatuses[k8sContainer.Name].RestartCount, + State: containerState, + StateDetails: stateDetails, + } + container.PropertiesChecksum = types.HashStruct(container) + p.Containers = append(p.Containers, container) + + p.CpuLimits += k8sContainer.Resources.Limits.Cpu().MilliValue() + p.CpuRequests += k8sContainer.Resources.Requests.Cpu().MilliValue() + p.MemoryLimits += k8sContainer.Resources.Limits.Memory().MilliValue() + p.MemoryRequests += k8sContainer.Resources.Requests.Memory().MilliValue() + + for _, device := range k8sContainer.VolumeDevices { + cd := &ContainerDevice{ + ContainerMeta: ContainerMeta{ + Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, container.Id, device.Name))}, + ContainerId: container.Id, + }, + Name: device.Name, + Path: device.DevicePath, + } + cd.PropertiesChecksum = types.HashStruct(cd) + + container.Devices = append(container.Devices, cd) } - cm.PropertiesChecksum = types.Checksum(MustMarshalJSON(cm)) - container.Mounts = append(container.Mounts, cm) + for _, mount := range k8sContainer.VolumeMounts { + cm := &ContainerMount{ + ContainerMeta: ContainerMeta{ + Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, container.Id, mount.Name))}, + ContainerId: container.Id, + }, + VolumeName: mount.Name, + Path: mount.MountPath, + SubPath: mount.SubPath, + ReadOnly: types.Bool{ + Bool: mount.ReadOnly, + Valid: true, + }, + } + cm.PropertiesChecksum = types.HashStruct(cm) + + container.Mounts = append(container.Mounts, cm) + } } - } - for labelName, labelValue := range pod.Labels { - label := NewLabel(labelName, labelValue) - label.PodId = p.Id - label.PropertiesChecksum = types.Checksum(MustMarshalJSON(label)) + return nil + }) - p.Labels = append(p.Labels, label) - } + g.Go(func() error { + for labelName, labelValue := range pod.Labels { + label := NewLabel(labelName, labelValue) + label.PodId = p.Id + label.PropertiesChecksum = types.HashStruct(label) - for _, ownerReference := range pod.OwnerReferences { - var blockOwnerDeletion, controller bool - if ownerReference.BlockOwnerDeletion != nil { - blockOwnerDeletion = *ownerReference.BlockOwnerDeletion - } - if ownerReference.Controller != nil { - controller = *ownerReference.Controller + p.Labels = append(p.Labels, label) } - owner := &PodOwner{ - PodMeta: PodMeta{ - PodId: p.Id, - Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, ownerReference.UID))}, - }, - Kind: strcase.Snake(ownerReference.Kind), - Name: ownerReference.Name, - Uid: ownerReference.UID, - BlockOwnerDeletion: types.Bool{ - Bool: blockOwnerDeletion, - Valid: true, - }, - Controller: types.Bool{ - Bool: controller, - Valid: true, - }, - } - owner.PropertiesChecksum = types.Checksum(MustMarshalJSON(owner)) - - p.Owners = append(p.Owners, owner) - } - // https://kubernetes.io/docs/concepts/workloads/pods/init-containers/#resources - for _, container := range pod.Spec.InitContainers { - // Init container must complete successfully before the next one starts, - // so we don't have to sum their resources. - p.CpuLimits = types.MaxInt(p.CpuLimits, container.Resources.Limits.Cpu().MilliValue()) - p.CpuRequests = types.MaxInt(p.CpuRequests, container.Resources.Requests.Cpu().MilliValue()) - p.MemoryLimits = types.MaxInt(p.MemoryLimits, container.Resources.Limits.Memory().MilliValue()) - p.MemoryRequests = types.MaxInt(p.MemoryRequests, container.Resources.Requests.Memory().MilliValue()) - } - - for _, volume := range pod.Spec.Volumes { - if volume.PersistentVolumeClaim != nil { - pvc := &PodPvc{ + for _, ownerReference := range pod.OwnerReferences { + var blockOwnerDeletion, controller bool + if ownerReference.BlockOwnerDeletion != nil { + blockOwnerDeletion = *ownerReference.BlockOwnerDeletion + } + if ownerReference.Controller != nil { + controller = *ownerReference.Controller + } + owner := &PodOwner{ PodMeta: PodMeta{ PodId: p.Id, - Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, volume.Name, volume.PersistentVolumeClaim.ClaimName))}, + Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, ownerReference.UID))}, }, - VolumeName: volume.Name, - ClaimName: volume.PersistentVolumeClaim.ClaimName, - ReadOnly: types.Bool{ - Bool: volume.PersistentVolumeClaim.ReadOnly, + Kind: strcase.Snake(ownerReference.Kind), + Name: ownerReference.Name, + Uid: ownerReference.UID, + BlockOwnerDeletion: types.Bool{ + Bool: blockOwnerDeletion, + Valid: true, + }, + Controller: types.Bool{ + Bool: controller, Valid: true, }, } - pvc.PropertiesChecksum = types.Checksum(MustMarshalJSON(pvc)) + owner.PropertiesChecksum = types.HashStruct(owner) - p.Pvcs = append(p.Pvcs, pvc) - } else { - t, source, err := MarshalFirstNonNilStructFieldToJSON(volume.VolumeSource) - if err != nil { - panic(err) - } + p.Owners = append(p.Owners, owner) + } - vol := &PodVolume{ - PodMeta: PodMeta{ - PodId: p.Id, - Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, volume.Name))}, - }, - VolumeName: volume.Name, - Type: t, - Source: source, - } - vol.PropertiesChecksum = types.Checksum(MustMarshalJSON(vol)) + return nil + }) + + g.Go(func() error { + // https://kubernetes.io/docs/concepts/workloads/pods/init-containers/#resources + for _, container := range pod.Spec.InitContainers { + // Init container must complete successfully before the next one starts, + // so we don't have to sum their resources. + p.CpuLimits = types.MaxInt(p.CpuLimits, container.Resources.Limits.Cpu().MilliValue()) + p.CpuRequests = types.MaxInt(p.CpuRequests, container.Resources.Requests.Cpu().MilliValue()) + p.MemoryLimits = types.MaxInt(p.MemoryLimits, container.Resources.Limits.Memory().MilliValue()) + p.MemoryRequests = types.MaxInt(p.MemoryRequests, container.Resources.Requests.Memory().MilliValue()) + } - p.Volumes = append(p.Volumes, vol) + for _, volume := range pod.Spec.Volumes { + if volume.PersistentVolumeClaim != nil { + pvc := &PodPvc{ + PodMeta: PodMeta{ + PodId: p.Id, + Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, volume.Name, volume.PersistentVolumeClaim.ClaimName))}, + }, + VolumeName: volume.Name, + ClaimName: volume.PersistentVolumeClaim.ClaimName, + ReadOnly: types.Bool{ + Bool: volume.PersistentVolumeClaim.ReadOnly, + Valid: true, + }, + } + pvc.PropertiesChecksum = types.HashStruct(pvc) + + p.Pvcs = append(p.Pvcs, pvc) + } else { + t, source, err := MarshalFirstNonNilStructFieldToJSON(volume.VolumeSource) + if err != nil { + panic(err) + } + + vol := &PodVolume{ + PodMeta: PodMeta{ + PodId: p.Id, + Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, volume.Name))}, + }, + VolumeName: volume.Name, + Type: t, + Source: source, + } + vol.PropertiesChecksum = types.HashStruct(vol) + + p.Volumes = append(p.Volumes, vol) + } } - } + + return nil + }) + + _ = g.Wait() // We don't expect any errors here } func (p *Pod) Relations() []database.Relation { fk := database.WithForeignKey("pod_id") return []database.Relation{ - database.HasMany(p.Containers, fk, database.WithoutCascadeDelete()), + database.HasMany(p.Conditions, fk), + database.HasMany(p.Containers, fk), database.HasMany(p.Owners, fk), database.HasMany(p.Labels, fk), database.HasMany(p.Pvcs, fk), @@ -314,6 +347,21 @@ func (p *Pod) Relations() []database.Relation { } } +func getContainerLogs(clientset *kubernetes.Clientset, pod *kcorev1.Pod, container kcorev1.Container) (string, error) { + req := clientset.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &kcorev1.PodLogOptions{Container: container.Name}) + body, err := req.Stream(context.TODO()) + if err != nil { + return "", err + } + defer body.Close() + logs, err := io.ReadAll(body) + if err != nil { + return "", err + } + + return string(logs), nil +} + var ( _ contracts.Entity = (*Pod)(nil) _ contracts.Resource = (*Pod)(nil) diff --git a/pkg/schema/v1/pvc.go b/pkg/schema/v1/pvc.go index 84b20dd4..160a3cdb 100644 --- a/pkg/schema/v1/pvc.go +++ b/pkg/schema/v1/pvc.go @@ -41,8 +41,8 @@ type Pvc struct { VolumeName string VolumeMode sql.NullString StorageClass sql.NullString - Conditions []*PvcCondition `json:"-" db:"-"` - Labels []*Label `json:"-" db:"-"` + Conditions []*PvcCondition `db:"-" hash:"-"` + Labels []*Label `db:"-" hash:"-"` } type PvcMeta struct { @@ -102,7 +102,7 @@ func (p *Pvc) Obtain(k8s kmetav1.Object) { } } - p.PropertiesChecksum = types.Checksum(MustMarshalJSON(p)) + p.PropertiesChecksum = types.HashStruct(p) for _, condition := range pvc.Status.Conditions { pvcCond := &PvcCondition{ @@ -117,7 +117,7 @@ func (p *Pvc) Obtain(k8s kmetav1.Object) { Reason: condition.Reason, Message: condition.Message, } - pvcCond.PropertiesChecksum = types.Checksum(MustMarshalJSON(pvcCond)) + pvcCond.PropertiesChecksum = types.HashStruct(pvcCond) p.Conditions = append(p.Conditions, pvcCond) } @@ -125,7 +125,7 @@ func (p *Pvc) Obtain(k8s kmetav1.Object) { for labelName, labelValue := range pvc.Labels { label := NewLabel(labelName, labelValue) label.PvcId = p.Id - label.PropertiesChecksum = types.Checksum(MustMarshalJSON(label)) + label.PropertiesChecksum = types.HashStruct(label) p.Labels = append(p.Labels, label) } diff --git a/pkg/schema/v1/replica_set.go b/pkg/schema/v1/replica_set.go index bae3de96..7ffd5151 100644 --- a/pkg/schema/v1/replica_set.go +++ b/pkg/schema/v1/replica_set.go @@ -18,9 +18,9 @@ type ReplicaSet struct { FullyLabeledReplicas int32 ReadyReplicas int32 AvailableReplicas int32 - Conditions []*ReplicaSetCondition `json:"-" db:"-"` - Owners []*ReplicaSetOwner `json:"-" db:"-"` - Labels []*Label `json:"-" db:"-"` + Conditions []*ReplicaSetCondition `hash:"-" db:"-"` + Owners []*ReplicaSetOwner `hash:"-" db:"-"` + Labels []*Label `hash:"-" db:"-"` } type ReplicaSetMeta struct { @@ -75,7 +75,7 @@ func (r *ReplicaSet) Obtain(k8s kmetav1.Object) { r.ReadyReplicas = replicaSet.Status.ReadyReplicas r.AvailableReplicas = replicaSet.Status.AvailableReplicas - r.PropertiesChecksum = types.Checksum(MustMarshalJSON(r)) + r.PropertiesChecksum = types.HashStruct(r) for _, condition := range replicaSet.Status.Conditions { replicaSetCond := &ReplicaSetCondition{ @@ -89,7 +89,7 @@ func (r *ReplicaSet) Obtain(k8s kmetav1.Object) { Reason: condition.Reason, Message: condition.Message, } - replicaSetCond.PropertiesChecksum = types.Checksum(MustMarshalJSON(replicaSetCond)) + replicaSetCond.PropertiesChecksum = types.HashStruct(replicaSetCond) r.Conditions = append(r.Conditions, replicaSetCond) } @@ -120,7 +120,7 @@ func (r *ReplicaSet) Obtain(k8s kmetav1.Object) { Valid: true, }, } - owner.PropertiesChecksum = types.Checksum(MustMarshalJSON(owner)) + owner.PropertiesChecksum = types.HashStruct(owner) r.Owners = append(r.Owners, owner) } @@ -128,7 +128,7 @@ func (r *ReplicaSet) Obtain(k8s kmetav1.Object) { for labelName, labelValue := range replicaSet.Labels { label := NewLabel(labelName, labelValue) label.ReplicaSetId = r.Id - label.PropertiesChecksum = types.Checksum(MustMarshalJSON(label)) + label.PropertiesChecksum = types.HashStruct(label) r.Labels = append(r.Labels, label) } diff --git a/pkg/schema/v1/service.go b/pkg/schema/v1/service.go index 3e78b5c8..733c7867 100644 --- a/pkg/schema/v1/service.go +++ b/pkg/schema/v1/service.go @@ -25,5 +25,5 @@ func (s *Service) Obtain(k8s kmetav1.Object) { s.Id = types.Checksum(s.Namespace + "/" + s.Name) s.Type = string(service.Spec.Type) s.ClusterIP = service.Spec.ClusterIP - s.PropertiesChecksum = types.Checksum(MustMarshalJSON(s)) + s.PropertiesChecksum = types.HashStruct(s) } diff --git a/pkg/schema/v1/stateful_set.go b/pkg/schema/v1/stateful_set.go index 55719c93..78ebc124 100644 --- a/pkg/schema/v1/stateful_set.go +++ b/pkg/schema/v1/stateful_set.go @@ -24,8 +24,8 @@ type StatefulSet struct { CurrentReplicas int32 UpdatedReplicas int32 AvailableReplicas int32 - Conditions []*StatefulSetCondition `json:"-" db:"-"` - Labels []*Label `json:"-" db:"-"` + Conditions []*StatefulSetCondition `db:"-" hash:"-"` + Labels []*Label `db:"-" hash:"-"` } type StatefulSetMeta struct { @@ -89,7 +89,7 @@ func (s *StatefulSet) Obtain(k8s kmetav1.Object) { s.UpdatedReplicas = statefulSet.Status.UpdatedReplicas s.AvailableReplicas = statefulSet.Status.AvailableReplicas - s.PropertiesChecksum = types.Checksum(MustMarshalJSON(s)) + s.PropertiesChecksum = types.HashStruct(s) for _, condition := range statefulSet.Status.Conditions { cond := &StatefulSetCondition{ @@ -103,7 +103,7 @@ func (s *StatefulSet) Obtain(k8s kmetav1.Object) { Reason: condition.Reason, Message: condition.Message, } - cond.PropertiesChecksum = types.Checksum(MustMarshalJSON(cond)) + cond.PropertiesChecksum = types.HashStruct(cond) s.Conditions = append(s.Conditions, cond) } @@ -111,7 +111,7 @@ func (s *StatefulSet) Obtain(k8s kmetav1.Object) { for labelName, labelValue := range statefulSet.Labels { label := NewLabel(labelName, labelValue) label.StatefulSetId = s.Id - label.PropertiesChecksum = types.Checksum(MustMarshalJSON(label)) + label.PropertiesChecksum = types.HashStruct(label) s.Labels = append(s.Labels, label) } diff --git a/pkg/schema/v1/utils.go b/pkg/schema/v1/utils.go index b50c6a11..e9d08e5b 100644 --- a/pkg/schema/v1/utils.go +++ b/pkg/schema/v1/utils.go @@ -21,16 +21,3 @@ func MarshalFirstNonNilStructFieldToJSON(i any) (string, string, error) { return "", "", nil } - -// MustMarshalJSON json encodes the given object. -// TODO: This is just used to generate the checksum of the object properties. -// - This should no longer be necessary once we have implemented a more sophisticated -// - method for hashing a structure. -func MustMarshalJSON(v interface{}) []byte { - b, err := types.MarshalJSON(v) - if err != nil { - panic(err) - } - - return b -} diff --git a/pkg/sync/v1/sync.go b/pkg/sync/v1/sync.go index 72938338..cd89cf9c 100644 --- a/pkg/sync/v1/sync.go +++ b/pkg/sync/v1/sync.go @@ -2,36 +2,40 @@ package v1 import ( "context" - "fmt" "github.com/go-logr/logr" "github.com/icinga/icinga-kubernetes/pkg/com" + "github.com/icinga/icinga-kubernetes/pkg/contracts" "github.com/icinga/icinga-kubernetes/pkg/database" - schemav1 "github.com/icinga/icinga-kubernetes/pkg/schema/v1" "github.com/icinga/icinga-kubernetes/pkg/sync" "github.com/icinga/icinga-kubernetes/pkg/types" "golang.org/x/sync/errgroup" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" + "reflect" ) +type FactoryFunc func() contracts.Entity + type Sync struct { db *database.Database informer cache.SharedIndexInformer log logr.Logger - factory func() schemav1.Resource + factory FactoryFunc + store cache.Store } func NewSync( db *database.Database, informer cache.SharedIndexInformer, log logr.Logger, - factory func() schemav1.Resource, + factory FactoryFunc, ) *Sync { return &Sync{ db: db, informer: informer, log: log, factory: factory, + store: cache.NewStore(ObjectMetaKeyFunc), } } @@ -42,19 +46,61 @@ func (s *Sync) Run(ctx context.Context, features ...sync.Feature) error { if !with.NoWarmup() { if err := s.warmup(ctx, controller); err != nil { + s.log.Error(err, "warmup failed") return err } + + s.log.Info("sync warmup finished") } + s.log.Info("start syncing configs") + return s.sync(ctx, controller, features...) } +// GetState returns the cached entity of the given object. +// It returns an error if it fails to internally generate a key for the specified object, +// and nil if the provided object doesn't have a cached state. +func (s *Sync) GetState(obj interface{}) (contracts.Entity, error) { + item, exist, err := s.store.Get(obj) + if err != nil { + return nil, err + } + + if !exist { + return nil, nil + } + + return item.(contracts.Entity), nil +} + +// Delete removes the given entity and all its references from the cache store. +func (s *Sync) Delete(entity contracts.Entity, cascade bool) { + if _, ok := entity.(database.HasRelations); ok && cascade { + items := s.store.List() + for _, it := range items { + item := it.(contracts.Entity) + if entity.ID().Equal(item.ParentID()) { + // Erase all references of this entity recursively from the cache store as well. + // Example: Remove v1.Pod -> v1.Container -> v1.ContainerMount etc... + s.Delete(item, cascade) + } + } + } + + // We don't know whether there is a cached item by the given hash, so ignore any errors. + _ = s.store.Delete(entity) +} + func (s *Sync) warmup(ctx context.Context, c *sync.Controller) error { g, ctx := errgroup.WithContext(ctx) + s.log.Info("starting sync warmup") + + entity := s.factory() entities, errs := s.db.YieldAll(ctx, func() (interface{}, bool, error) { return s.factory(), true, nil - }, s.db.BuildSelectStmt(s.factory(), &schemav1.Meta{})) + }, s.db.BuildSelectStmt(entity, entity.Fingerprint())) // Let errors from YieldAll() cancel the group. com.ErrgroupReceive(g, errs) @@ -68,10 +114,16 @@ func (s *Sync) warmup(ctx context.Context, c *sync.Controller) error { return nil } - if err := c.Announce(e); err != nil { - fmt.Println(err) + if err := s.store.Add(e.(contracts.Entity).Fingerprint()); err != nil { return err } + + // The controller doesn't need to know about the entities of a k8s sub resource. + if resource, ok := e.(contracts.Resource); ok { + if err := c.Announce(resource); err != nil { + return err + } + } case <-ctx.Done(): return ctx.Err() } @@ -84,7 +136,7 @@ func (s *Sync) warmup(ctx context.Context, c *sync.Controller) error { func (s *Sync) sync(ctx context.Context, c *sync.Controller, features ...sync.Feature) error { sink := sync.NewSink(func(i *sync.Item) interface{} { entity := s.factory() - entity.Obtain(i.Item) + entity.(contracts.Resource).Obtain(i.Item) return entity }, func(k interface{}) interface{} { @@ -97,47 +149,32 @@ func (s *Sync) sync(ctx context.Context, c *sync.Controller, features ...sync.Fe g.Go(func() error { defer runtime.HandleCrash() - err := c.Stream(ctx, sink) - if err != nil { - fmt.Println(err) - } - return err - }) - g.Go(func() error { - defer runtime.HandleCrash() - - err := s.db.UpsertStreamed( - ctx, sink.UpsertCh(), - database.WithCascading(), database.WithOnSuccess(with.OnUpsert())) - if err != nil { - fmt.Println(err) - } - return err + return c.Stream(ctx, sink) }) g.Go(func() error { - defer runtime.HandleCrash() + filterFunc := func(entity contracts.Entity) (bool, error) { + lastState, err := s.GetState(entity) + if err != nil { + return false, err + } - if with.NoDelete() { - for { - select { - case _, more := <-sink.DeleteCh(): - if !more { - return nil - } - case <-ctx.Done(): - return ctx.Err() - } + // Don't upsert the entities if their checksum hasn't been changed. + if lastState == nil || !entity.Checksum().Equal(lastState.Checksum()) { + _ = s.store.Add(entity.Fingerprint()) + return true, nil } - } else { - err := s.db.DeleteStreamed( - ctx, s.factory(), sink.DeleteCh(), - database.WithBlocking(), database.WithCascading(), database.WithOnSuccess(with.OnDelete())) - if err != nil { - fmt.Println(err) - } - return err + + return false, nil } + + return s.db.UpsertStreamed( + ctx, sink.UpsertCh(), + database.WithCascading(), database.WithPreExecution(filterFunc), database.WithOnSuccess(with.OnUpsert()), + ) + }) + g.Go(func() error { + return s.deleteEntities(ctx, s.factory(), sink.DeleteCh(), features...) }) g.Go(func() error { defer runtime.HandleCrash() @@ -153,13 +190,155 @@ func (s *Sync) sync(ctx context.Context, c *sync.Controller, features ...sync.Fe case <-ctx.Done(): return ctx.Err() } - } }) err := g.Wait() if err != nil { - fmt.Println(err) + s.log.Error(err, "sync error") } return err } + +// deleteEntities consumes the entities from the provided delete stream and syncs them to the database. +// It also removes the streamed K8s entity and all its references from the cache store automatically. +// To prevent the sender goroutines of this stream from being blocked, the entities are still consumed +// from the stream even if the sync.WithNoDelete feature is specified. +func (s *Sync) deleteEntities(ctx context.Context, subject contracts.Entity, delete <-chan interface{}, features ...sync.Feature) error { + with := sync.NewFeatures(features...) + + if relations, ok := subject.(database.HasRelations); ok && !with.NoDelete() { + g, ctx := errgroup.WithContext(ctx) + streams := make(map[string]chan interface{}) + for _, relation := range relations.Relations() { + relation := relation + if !relation.CascadeDelete() { + continue + } + + if _, ok := relation.TypePointer().(contracts.Entity); !ok { + // This shouldn't crush the daemon, when some of the k8s types specify a relation + // that doesn't satisfy the contracts.Entity interface. + continue + } + + relationCh := make(chan interface{}) + g.Go(func() error { + defer runtime.HandleCrash() + defer close(relationCh) + + return s.deleteEntities(ctx, relation.TypePointer().(contracts.Entity), relationCh) + }) + streams[database.TableName(relation)] = relationCh + } + + deleteIds := make(chan interface{}) + g.Go(func() error { + defer runtime.HandleCrash() + defer close(deleteIds) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case id, ok := <-delete: + if !ok { + return nil + } + + if _, ok := id.(types.Binary); !ok { + id = types.Binary(id.([]byte)) + } + + subject.SetID(id.(types.Binary)) + items := s.store.List() + // First delete all references before deleting the parent entity. + for _, item := range items { + entity := item.(contracts.Entity) + if subject.ID().Equal(entity.ParentID()) { + for _, relation := range subject.(database.HasRelations).Relations() { + relation := relation + if reflect.TypeOf(relation.TypePointer().(contracts.Entity).Fingerprint()) == reflect.TypeOf(entity) { + select { + case streams[database.TableName(relation)] <- entity.ID(): + case <-ctx.Done(): + return ctx.Err() + } + } + } + } + } + + select { + case deleteIds <- id: + case <-ctx.Done(): + return ctx.Err() + } + + s.Delete(subject, false) + } + } + }) + + g.Go(func() error { + defer runtime.HandleCrash() + + return s.db.DeleteStreamed(ctx, subject, deleteIds, database.WithBlocking(), database.WithOnSuccess(with.OnDelete())) + }) + + return g.Wait() + } + + g, ctx := errgroup.WithContext(ctx) + deleteIds := make(chan interface{}) + g.Go(func() error { + defer runtime.HandleCrash() + defer close(deleteIds) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case id, ok := <-delete: + if !ok { + return nil + } + + if !with.NoDelete() { + select { + case deleteIds <- id: + case <-ctx.Done(): + return ctx.Err() + } + + if _, ok := id.(types.Binary); !ok { + id = types.Binary(id.([]byte)) + } + + subject.SetID(id.(types.Binary)) + s.Delete(subject, false) + } + } + } + }) + + if !with.NoDelete() { + g.Go(func() error { + defer runtime.HandleCrash() + + return s.db.DeleteStreamed(ctx, subject, deleteIds, database.WithBlocking(), database.WithOnSuccess(with.OnDelete())) + }) + } + + return g.Wait() +} + +// ObjectMetaKeyFunc provides a custom implementation of object key extraction for caching. +// The given object has to satisfy the contracts.IDer interface if it's not an explicit key. +func ObjectMetaKeyFunc(obj interface{}) (string, error) { + if _, ok := obj.(cache.ExplicitKey); ok { + return cache.MetaNamespaceKeyFunc(obj) + } + + return obj.(contracts.IDer).ID().String(), nil +} diff --git a/pkg/types/objectpacker.go b/pkg/types/objectpacker.go index 3f89bedf..ef9e842a 100644 --- a/pkg/types/objectpacker.go +++ b/pkg/types/objectpacker.go @@ -5,11 +5,74 @@ import ( "encoding/binary" "fmt" "github.com/pkg/errors" + "golang.org/x/exp/slices" "io" "reflect" "sort" + "sync" ) +var ( + structFields = map[reflect.Type][]StructField{} + mu sync.Mutex +) + +// StructField represents a single struct field, just like reflect.StructField, but with way less member fields. +type StructField struct { + Name string // This field name is only used for sorting struct fields slice. + Index []int // This index is just used for lookup. +} + +// HashStruct generates the SHA-1 checksum of all extracted fields of the given struct. +// By default, this will hash all struct fields except an embedded struct, anonymous and unexported fields. +// Additionally, you can also exclude some struct fields by using the `hash:"-"` tag. +func HashStruct(subject interface{}) Binary { + v := reflect.ValueOf(subject) + if v.Kind() == reflect.Pointer { + v = v.Elem() + } + + fields := getFields(v) + values := make([]interface{}, len(fields)) + for _, field := range fields { + values = append(values, v.FieldByIndex(field.Index).Interface()) + } + + return Checksum(MustPackSlice(values...)) +} + +// getFields returns a slice of StructField extracted from the given subject. +// By default, this will hash all struct fields except an embedded struct, anonymous and unexported fields. +// Additionally, you can also exclude some struct fields by using the `hash:"-"` tag. +func getFields(subject reflect.Value) []StructField { + mu.Lock() + defer mu.Unlock() + + var fields []StructField + + fields = structFields[subject.Type()] + if fields == nil { + for _, field := range reflect.VisibleFields(subject.Type()) { + // We don't want an embedded struct to be part of the generated hash! + if field.Type.Kind() == reflect.Struct || !field.IsExported() || field.Anonymous { + continue + } + + if field.Tag.Get("hash") != "ignore" && field.Tag.Get("hash") != "-" { + fields = append(fields, StructField{Name: field.Name, Index: field.Index}) + } + } + + slices.SortStableFunc(fields, func(a, b StructField) bool { + return a.Name < b.Name + }) + + structFields[subject.Type()] = fields + } + + return fields +} + // MustPackSlice calls PackAny using items and panics if there was an error. func MustPackSlice(items ...interface{}) []byte { var buf bytes.Buffer @@ -28,6 +91,8 @@ func MustPackSlice(items ...interface{}) []byte { // PackAny(false) => 0x1 // PackAny(true) => 0x2 // PackAny(float64(42)) => 0x3 ieee754_binary64_bigendian(42) +// PackAny(int(42)) => 0x7 int64_binary64_bigendian(42) +// PackAny(uint(42)) => 0x8 uint64_binary64_bigendian(42) // PackAny("exämple") => 0x4 uint64_bigendian(len([]byte("exämple"))) []byte("exämple") // PackAny([]uint8{0x42}) => 0x4 uint64_bigendian(len([]uint8{0x42})) []uint8{0x42} // PackAny([1]uint8{0x42}) => 0x4 uint64_bigendian(len([1]uint8{0x42})) [1]uint8{0x42} @@ -66,6 +131,18 @@ func packValue(in reflect.Value, out io.Writer) error { } return binary.Write(out, binary.BigEndian, in.Float()) + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + if _, err := out.Write([]byte{7}); err != nil { + return err + } + + return binary.Write(out, binary.BigEndian, in.Int()) + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + if _, err := out.Write([]byte{8}); err != nil { + return err + } + + return binary.Write(out, binary.BigEndian, in.Uint()) case reflect.Array, reflect.Slice: if typ := in.Type(); typ.Elem() == tByte { if kind == reflect.Array { diff --git a/schema/mysql/schema.sql b/schema/mysql/schema.sql index 0ff534a4..130064b5 100644 --- a/schema/mysql/schema.sql +++ b/schema/mysql/schema.sql @@ -1,5 +1,6 @@ CREATE TABLE namespace ( id binary(20) NOT NULL COMMENT 'sha1(name)', + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', namespace varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, /* TODO: Remove. A namespace does not have a namespace. */ name varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, @@ -10,17 +11,20 @@ CREATE TABLE namespace ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE namespace_condition ( + id binary(20) NOT NULL COMMENT 'sha1(namespace.id + type)', namespace_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', type varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, status varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, last_transition bigint unsigned NOT NULL, reason varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, message varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, - PRIMARY KEY (namespace_id, type) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE node ( id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', namespace varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, name varchar(253) COLLATE utf8mb4_unicode_ci NOT NULL, uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, @@ -39,26 +43,31 @@ CREATE TABLE node ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE node_condition ( + id binary(20) NOT NULL COMMENT 'sha1(node.id + type)', node_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', type varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, status varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, last_heartbeat bigint unsigned NOT NULL, last_transition bigint unsigned NOT NULL, reason varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, message varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, - PRIMARY KEY (node_id, type) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE node_volume ( + id binary(20) NOT NULL COMMENT 'sha1(node.id + name)', node_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', name varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, device_path varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, mounted enum('n', 'y') COLLATE utf8mb4_unicode_ci NOT NULL, - PRIMARY KEY (node_id, name) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE pod ( id binary(20) NOT NULL COMMENT 'sha1(namespace/name)', + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', namespace varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, name varchar(253) COLLATE utf8mb4_unicode_ci NOT NULL, uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, @@ -80,44 +89,53 @@ CREATE TABLE pod ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE pod_condition ( + id binary(20) NOT NULL COMMENT 'sha1(pod.id + type)', pod_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', type varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, status varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, last_probe bigint unsigned NULL DEFAULT NULL, last_transition bigint unsigned NOT NULL, reason varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, message varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, - PRIMARY KEY (pod_id, type) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE pod_owner ( + id binary(20) NOT NULL COMMENT 'sha1(pod.id + uid)', pod_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', kind enum('daemon_set', 'node', 'replica_set', 'stateful_set') COLLATE utf8mb4_unicode_ci NOT NULL, name varchar(253) COLLATE utf8mb4_unicode_ci NOT NULL, uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, controller enum('n', 'y') COLLATE utf8mb4_unicode_ci NOT NULL, block_owner_deletion enum('n', 'y') COLLATE utf8mb4_unicode_ci NOT NULL, - PRIMARY KEY (pod_id, uid) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE pod_pvc ( + id binary(20) NOT NULL COMMENT 'sha1(pod.id + volume_name + claim_name)', pod_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', volume_name varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, claim_name varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, read_only enum('n', 'y') COLLATE utf8mb4_unicode_ci NOT NULL, - PRIMARY KEY (pod_id, volume_name, claim_name) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE pod_volume ( + id binary(20) NOT NULL COMMENT 'sha1(pod.id + volume_name)', pod_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', volume_name varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, type varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, source longtext NOT NULL, - PRIMARY KEY (pod_id, volume_name) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE container ( id binary(20) NOT NULL COMMENT 'sha1(pod.namespace/pod.name/name)', + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', pod_id binary(20) NOT NULL, name varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, image varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, @@ -135,25 +153,28 @@ CREATE TABLE container ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE container_device ( + id binary(20) NOT NULL COMMENT 'sha1(pod.id + container.id + name)', container_id binary(20) NOT NULL, - pod_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', name varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, path varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, - PRIMARY KEY (container_id, name) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE container_mount ( + id binary(20) NOT NULL COMMENT 'sha1(pod.id + container.id + volume_name)', container_id binary(20) NOT NULL, - pod_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', volume_name varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, path varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, sub_path varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, read_only enum('n', 'y') COLLATE utf8mb4_unicode_ci NOT NULL, - PRIMARY KEY (container_id, volume_name) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE deployment ( id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', namespace varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, name varchar(253) COLLATE utf8mb4_unicode_ci NOT NULL, uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, @@ -173,17 +194,21 @@ CREATE TABLE deployment ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE deployment_condition ( + id binary(20) NOT NULL COMMENT 'sha1(deployment.id + type)', deployment_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', type enum('available', 'progressing', 'replica_failure') COLLATE utf8mb4_unicode_ci NOT NULL, status varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, last_update bigint unsigned NOT NULL, last_transition bigint unsigned NOT NULL, reason varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, message varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, - PRIMARY KEY (deployment_id, type) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE service ( + id binary(20) NOT NULL COMMENT 'sha1(namespace/name)', + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', namespace varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, name varchar(253) COLLATE utf8mb4_unicode_ci NOT NULL, uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, @@ -191,11 +216,12 @@ CREATE TABLE service ( type varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, cluster_ip varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, created bigint unsigned NOT NULL, - PRIMARY KEY (namespace, name) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE replica_set ( id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', namespace varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, name varchar(253) COLLATE utf8mb4_unicode_ci NOT NULL, uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, @@ -211,17 +237,21 @@ CREATE TABLE replica_set ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE replica_set_condition ( + id binary(20) NOT NULL COMMENT 'sha1(replica_set.id + type)', replica_set_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', type enum('replica_failure') COLLATE utf8mb4_unicode_ci NOT NULL, status varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, last_transition bigint unsigned NOT NULL, reason varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, message varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, - PRIMARY KEY (replica_set_id, type) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE replica_set_owner ( + id binary(20) NOT NULL COMMENT 'sha1(replica_set.id + uuid)', replica_set_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', kind enum('deployment') COLLATE utf8mb4_unicode_ci NOT NULL, name varchar(253) COLLATE utf8mb4_unicode_ci NOT NULL, uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, @@ -232,6 +262,7 @@ CREATE TABLE replica_set_owner ( CREATE TABLE daemon_set ( id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', namespace varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, name varchar(253) COLLATE utf8mb4_unicode_ci NOT NULL, uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, @@ -250,17 +281,20 @@ CREATE TABLE daemon_set ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE daemon_set_condition ( + id binary(20) NOT NULL COMMENT 'sha1(daemon_set.id + type)', daemon_set_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', type varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, status varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, last_transition bigint unsigned NOT NULL, reason varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, message varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, - PRIMARY KEY (daemon_set_id, type) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE stateful_set ( id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', namespace varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, name varchar(253) COLLATE utf8mb4_unicode_ci NOT NULL, uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, @@ -283,60 +317,41 @@ CREATE TABLE stateful_set ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE stateful_set_condition ( + id binary(20) NOT NULL COMMENT 'sha1(stateful_set.id + type)', stateful_set_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', type varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, status varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, last_transition bigint unsigned NOT NULL, reason varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, message varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, - PRIMARY KEY (stateful_set_id, type) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE label ( id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', + pod_id binary(20) DEFAULT NULL, + replica_set_id binary(20) DEFAULT NULL, + deployment_id binary(20) DEFAULT NULL, + daemon_set_id binary(20) DEFAULT NULL, + stateful_set_id binary(20) DEFAULT NULL, + pvc_id binary(20) DEFAULT NULL, name varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, value varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, - PRIMARY KEY (id) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; - -CREATE TABLE pod_label ( - pod_id binary(20) NOT NULL, - label_id binary(20) NOT NULL, - PRIMARY KEY (pod_id, label_id) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; - -CREATE TABLE replica_set_label ( - replica_set_id binary(20) NOT NULL, - label_id binary(20) NOT NULL, - PRIMARY KEY (replica_set_id, label_id) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; - -CREATE TABLE deployment_label ( - deployment_id binary(20) NOT NULL, - label_id binary(20) NOT NULL, - PRIMARY KEY (deployment_id, label_id) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; -CREATE TABLE daemon_set_label ( - daemon_set_id binary(20) NOT NULL, - label_id binary(20) NOT NULL, - PRIMARY KEY (daemon_set_id, label_id) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; + PRIMARY KEY (id), -CREATE TABLE stateful_set_label ( - stateful_set_id binary(20) NOT NULL, - label_id binary(20) NOT NULL, - PRIMARY KEY (stateful_set_id, label_id) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; - -CREATE TABLE pvc_label ( - pvc_id binary(20) NOT NULL, - label_id binary(20) NOT NULL, - PRIMARY KEY (pvc_id, label_id) + -- The CONSTRAINT below ensures that each row is allowed to have non-NULL values in one of these constraints at a time. + CONSTRAINT nonnulls_label_consumers_check CHECK ( + (pod_id IS NOT NULL) + (replica_set_id IS NOT NULL) + (deployment_id IS NOT NULL) + (daemon_set_id IS NOT NULL) + + (stateful_set_id IS NOT NULL) + (pvc_id IS NOT NULL) = 1 + ) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE event ( id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', namespace varchar(63) NOT NULL, name varchar(253) NOT NULL, uid varchar(255) NOT NULL, @@ -372,6 +387,7 @@ CREATE TABLE pod_metrics ( CREATE TABLE pvc ( id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', namespace varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, name varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, @@ -389,18 +405,21 @@ CREATE TABLE pvc ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE pvc_condition ( + id binary(20) NOT NULL COMMENT 'sha1(pvc.id + type)', pvc_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', type varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, status varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, last_probe bigint unsigned NULL DEFAULT NULL, last_transition bigint unsigned NOT NULL, reason varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, message varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, - PRIMARY KEY (pvc_id, type) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE persistent_volume ( id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', namespace varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, name varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, @@ -420,7 +439,9 @@ CREATE TABLE persistent_volume ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE persistent_volume_claim_ref ( + id binary(20) NOT NULL COMMENT 'sha1(persistent_volume.id + uuid)', persistent_volume_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', kind varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, name varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,