Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions pkg/contracts/contracts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package contracts

import (
"github.com/icinga/icinga-kubernetes/pkg/types"
kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// Entity is implemented by every Icinga k8s type.
// It just encapsulates other essential interfaces for an entity but doesn't have its own methods.
type Entity interface {
IDer
ParentIDer
Checksumer
FingerPrinter
}

// FingerPrinter is implemented by every Icinga k8s type.
type FingerPrinter interface {
// Fingerprint returns the columns of this type, which are retrieved from the
// database during the initial config dump and are used for the config delta and for caching.
Fingerprint() FingerPrinter
}

// Checksumer is implemented by every Icinga k8s type that maintains its own database table.
type Checksumer interface {
// Checksum computes and returns the sha1 value of this type.
Checksum() types.Binary
}

// IDer is implemented by every Icinga k8s type that provides a unique identifier.
type IDer interface {
// ID returns the unique identifier of this entity as a binary.
ID() types.Binary
SetID(id types.Binary)
}

// ParentIDer is implemented by every Icinga k8s type that provides a unique parent identifier.
// This is a no-op for all types by default. Currently, it's only implemented by all entities of
// a k8s entity sub resources.
type ParentIDer interface {
// ParentID returns the parent id of this entity.
ParentID() types.Binary
}

type Resource interface {
kmetav1.Object
Entity

Obtain(k8s kmetav1.Object)
}

type Meta struct {
Id types.Binary `db:"id"`
PropertiesChecksum types.Binary `hash:"-"`
}

func (m *Meta) Checksum() types.Binary {
return m.PropertiesChecksum
}

func (m *Meta) ID() types.Binary {
return m.Id
}

func (m *Meta) SetID(id types.Binary) {
m.Id = id
}

func (m *Meta) Fingerprint() FingerPrinter {
return m
}

func (m *Meta) ParentID() types.Binary {
return nil
}

// Assert interface compliance.
var (
_ FingerPrinter = (*Meta)(nil)
_ Checksumer = (*Meta)(nil)
_ IDer = (*Meta)(nil)
_ ParentIDer = (*Meta)(nil)
_ Entity = (*Meta)(nil)
)
180 changes: 147 additions & 33 deletions pkg/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/go-sql-driver/mysql"
"github.com/icinga/icinga-kubernetes/pkg/backoff"
"github.com/icinga/icinga-kubernetes/pkg/com"
"github.com/icinga/icinga-kubernetes/pkg/contracts"
"github.com/icinga/icinga-kubernetes/pkg/periodic"
"github.com/icinga/icinga-kubernetes/pkg/retry"
"github.com/icinga/icinga-kubernetes/pkg/strcase"
Expand All @@ -27,6 +28,8 @@ import (

var registerDriversOnce sync.Once

type FactoryFunc func() (interface{}, bool, error)

// Database is a wrapper around sqlx.DB with bulk execution,
// statement building, streaming and logging capabilities.
type Database struct {
Expand Down Expand Up @@ -259,6 +262,12 @@ func (db *Database) BulkExec(

counter.Add(uint64(len(b)))

if f.onSuccess != nil {
if err := f.onSuccess(ctx, b); err != nil {
return err
}
}

return nil
},
IsRetryable,
Expand Down Expand Up @@ -397,7 +406,7 @@ func (db *Database) DeleteStreamed(
defer runtime.HandleCrash()
defer close(ch)

return db.DeleteStreamed(ctx, relation, ch, features...)
return db.DeleteStreamed(ctx, relation, ch, WithCascading(), WithBlocking())
})
streams[TableName(relation)] = ch
}
Expand Down Expand Up @@ -485,8 +494,33 @@ func (db *Database) DeleteStreamed(

// UpsertStreamed bulk upserts the specified entities via NamedBulkExec.
// The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream.
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
// concurrency is controlled via Options.MaxConnectionsPerTable.
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and concurrency is controlled via
// Options.MaxConnectionsPerTable.
//
// This sync process consists of the following steps:
//
// - It initially copies the first item from the specified stream and checks if this entity type provides relations.
// If so, it first traverses all these relations recursively and starts a separate goroutine and caches the streams
// for the started goroutine/relations type.
//
// - After the relations have been resolved, another goroutine is started which consumes from the specified `entities`
// chan and performs the following actions for each of the streamed entities:
//
// - If the consumed entity doesn't satisfy the contracts.Entity interface, it will just forward that entity to the
// next stage.
//
// - When the entity does satisfy the contracts.Entity, it applies the filter func on this entity (which hopefully
// should check for its checksums), and forwards the entity to the `forward` chan only if the filter function
// returns true and initiates a database upsert stream. Regardless, whether the function returns true, it will
// stream each of the child entity with the `relation.Stream()` method to the respective cached stream of the relation.
//
// However, when the first item doesn't satisfy the database.HasRelations interface, it will just use only two
// stages for the streamed entities to be upserted:
//
// - The first stage just consumes from the source stream (the `entities` chan) and applies the filter function (if any)
// on each of the entities. This won't forward entities for which the filter function didn't also return true as well.
//
// - The second stage just performs a database upsert queries for entities that were forwarded from the previous one.
func (db *Database) UpsertStreamed(
ctx context.Context, entities <-chan interface{}, features ...Feature,
) error {
Expand All @@ -511,7 +545,7 @@ func (db *Database) UpsertStreamed(
defer runtime.HandleCrash()
defer close(ch)

return db.UpsertStreamed(ctx, ch)
return db.UpsertStreamed(ctx, ch, WithCascading(), WithPreExecution(with.preExecution))
})
streams[TableName(relation)] = ch
}
Expand All @@ -526,19 +560,30 @@ func (db *Database) UpsertStreamed(

for {
select {
case entity, more := <-source:
case e, more := <-source:
if !more {
return nil
}

select {
case forward <- entity:
case <-ctx.Done():
return ctx.Err()
entity, ok := e.(contracts.Entity)
shouldUpsert := true
if ok && with.preExecution != nil {
shouldUpsert, err = with.preExecution(entity)
if err != nil {
return err
}
}

if shouldUpsert {
select {
case forward <- e:
case <-ctx.Done():
return ctx.Err()
}
}

select {
case dup <- entity:
case dup <- e:
case <-ctx.Done():
return ctx.Err()
}
Expand Down Expand Up @@ -582,50 +627,119 @@ func (db *Database) UpsertStreamed(
return g.Wait()
}

return db.NamedBulkExec(
ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, com.NeverSplit[any], features...)
upsertEntities := make(chan interface{})
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
defer runtime.HandleCrash()
defer close(upsertEntities)

for {
select {
case <-ctx.Done():
return ctx.Err()
case e, ok := <-forward:
if !ok {
return nil
}

entity, ok := e.(contracts.Entity)
shouldUpsert := true
if ok && with.preExecution != nil {
shouldUpsert, err = with.preExecution(entity)
if err != nil {
return err
}
}

if shouldUpsert {
select {
case upsertEntities <- entity:
case <-ctx.Done():
return ctx.Err()
}
}
}
}
})

g.Go(func() error {
defer runtime.HandleCrash()

return db.NamedBulkExec(
ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, upsertEntities, com.NeverSplit[any], features...,
)
})

return g.Wait()
}

// YieldAll executes the query with the supplied scope,
// scans each resulting row into an entity returned by the factory function,
// and streams them into a returned channel.
func (db *Database) YieldAll(ctx context.Context, factoryFunc func() (interface{}, error), query string, scope ...interface{}) (<-chan interface{}, <-chan error) {
func (db *Database) YieldAll(ctx context.Context, factoryFunc FactoryFunc, query string, scope ...interface{}) (<-chan interface{}, <-chan error) {
g, ctx := errgroup.WithContext(ctx)
entities := make(chan interface{}, 1)

g.Go(func() error {
defer runtime.HandleCrash()
defer close(entities)

var counter com.Counter
defer db.periodicLog(ctx, query, &counter).Stop()
var run func(ctx context.Context, factory FactoryFunc, query string, scope ...interface{}) error

rows, err := db.query(ctx, query, scope...)
if err != nil {
return CantPerformQuery(err, query)
}
run = func(ctx context.Context, factory FactoryFunc, query string, scope ...interface{}) error {
g, ctx := errgroup.WithContext(ctx)
var counter com.Counter
defer db.periodicLog(ctx, query, &counter).Stop()

defer rows.Close()

for rows.Next() {
e, err := factoryFunc()
rows, err := db.query(ctx, query, scope...)
if err != nil {
return errors.Wrap(err, "can't create entity")
return CantPerformQuery(err, query)
}
defer func() { _ = rows.Close() }()

if err = rows.StructScan(e); err != nil {
return errors.Wrapf(err, "can't store query result into a %T: %s", e, query)
}
for rows.Next() {
e, selectRecursive, err := factory()
if err != nil {
return errors.Wrap(err, "can't create entity")
}

select {
case entities <- e:
counter.Inc()
case <-ctx.Done():
return ctx.Err()
if err = rows.StructScan(e); err != nil {
return errors.Wrapf(err, "can't store query result into a %T: %s", e, query)
}

select {
case entities <- e:
counter.Inc()
case <-ctx.Done():
return ctx.Err()
}

if relations, ok := e.(HasRelations); ok && selectRecursive {
for _, relation := range relations.Relations() {
relation := relation
fingerprint, ok := relation.TypePointer().(contracts.FingerPrinter)
if !ok || !relation.CascadeSelect() {
continue
}

g.Go(func() error {
query := db.BuildSelectStmt(relation.TypePointer(), fingerprint.Fingerprint())
query += fmt.Sprintf(` WHERE %s=?`, db.quoter.QuoteIdentifier(relation.ForeignKey()))

factory := func() (interface{}, bool, error) {
return relation.TypePointer().(contracts.Entity), true, nil
}

return run(ctx, factory, query, e.(contracts.IDer).ID())
})
}
}
}

return g.Wait()
}

return nil
return run(ctx, factoryFunc, query, scope...)
})

return entities, com.WaitAsync(g)
Expand Down
16 changes: 13 additions & 3 deletions pkg/database/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ package database

import (
"github.com/icinga/icinga-kubernetes/pkg/com"
"github.com/icinga/icinga-kubernetes/pkg/contracts"
)

type Feature func(*Features)

type PreExecFunc func(contracts.Entity) (bool, error)

type Features struct {
blocking bool
cascading bool
onSuccess com.ProcessBulk[any]
blocking bool
cascading bool
onSuccess com.ProcessBulk[any]
preExecution PreExecFunc
}

func NewFeatures(features ...Feature) *Features {
Expand All @@ -33,6 +37,12 @@ func WithCascading() Feature {
}
}

func WithPreExecution(preExec PreExecFunc) Feature {
return func(f *Features) {
f.preExecution = preExec
}
}

func WithOnSuccess(fn com.ProcessBulk[any]) Feature {
return func(f *Features) {
f.onSuccess = fn
Expand Down
Loading