Skip to content

Commit 6ea33d1

Browse files
jrauh01lippserd
authored andcommitted
Structs for upsert and delete
1 parent 901d354 commit 6ea33d1

File tree

2 files changed

+141
-89
lines changed

2 files changed

+141
-89
lines changed

database/db.go

Lines changed: 6 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -732,67 +732,25 @@ func (db *DB) CreateIgnoreStreamed(
732732
)
733733
}
734734

735-
func WithOnSuccessUpsert(onSuccess ...OnSuccess[Entity]) ExecOption {
736-
return func(options *ExecOptions) {
737-
options.onSuccess = onSuccess
738-
}
739-
}
740-
741-
func WithStatement(stmt string, placeholders int) ExecOption {
742-
return func(options *ExecOptions) {
743-
options.stmt = stmt
744-
options.placeholders = placeholders
745-
}
746-
}
747-
748-
type ExecOption func(options *ExecOptions)
749-
750-
type ExecOptions struct {
751-
onSuccess []OnSuccess[Entity]
752-
stmt string
753-
placeholders int
754-
}
755-
756-
func NewExecOptions(execOpts ...ExecOption) *ExecOptions {
757-
execOptions := &ExecOptions{}
758-
759-
for _, option := range execOpts {
760-
option(execOptions)
761-
}
762-
763-
return execOptions
764-
}
765-
766735
// UpsertStreamed bulk upserts the specified entities via NamedBulkExec.
767736
// The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream.
768737
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
769738
// concurrency is controlled via Options.MaxConnectionsPerTable.
770739
// Entities for which the query ran successfully will be passed to onSuccess.
771740
func (db *DB) UpsertStreamed(
772-
ctx context.Context, entities <-chan Entity, execOpts ...ExecOption,
741+
ctx context.Context, entities <-chan Entity, onSuccess ...OnSuccess[Entity],
773742
) error {
774-
775-
execOptions := NewExecOptions(execOpts...)
776-
777743
first, forward, err := com.CopyFirst(ctx, entities)
778744
if err != nil {
779745
return errors.Wrap(err, "can't copy first entity")
780746
}
781747

782748
sem := db.GetSemaphoreForTable(TableName(first))
783-
var stmt string
784-
var placeholders int
785-
786-
if execOptions.stmt != "" {
787-
stmt = execOptions.stmt
788-
placeholders = execOptions.placeholders
789-
} else {
790-
stmt, placeholders = db.BuildUpsertStmt(first)
791-
}
749+
stmt, placeholders := db.BuildUpsertStmt(first)
792750

793751
return db.NamedBulkExec(
794752
ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem,
795-
forward, SplitOnDupId[Entity], execOptions.onSuccess...,
753+
forward, SplitOnDupId[Entity], onSuccess...,
796754
)
797755
}
798756

@@ -811,58 +769,17 @@ func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan Entity) error
811769
return db.NamedBulkExecTx(ctx, stmt, db.Options.MaxRowsPerTransaction, sem, forward)
812770
}
813771

814-
func WithOnSuccessDelete(onSuccess ...OnSuccess[any]) DeleteOption {
815-
return func(options *DeleteOptions) {
816-
options.onSuccess = onSuccess
817-
}
818-
}
819-
820-
func ByColumn(column string) DeleteOption {
821-
return func(options *DeleteOptions) {
822-
options.column = column
823-
}
824-
}
825-
826-
type DeleteOption func(options *DeleteOptions)
827-
828-
type DeleteOptions struct {
829-
onSuccess []OnSuccess[any]
830-
column string
831-
}
832-
833-
func NewDeleteOptions(execOpts ...DeleteOption) *DeleteOptions {
834-
deleteOptions := &DeleteOptions{}
835-
836-
for _, option := range execOpts {
837-
option(deleteOptions)
838-
}
839-
840-
return deleteOptions
841-
}
842-
843772
// DeleteStreamed bulk deletes the specified ids via BulkExec.
844773
// The delete statement is created using BuildDeleteStmt with the passed entityType.
845774
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
846775
// concurrency is controlled via Options.MaxConnectionsPerTable.
847776
// IDs for which the query ran successfully will be passed to onSuccess.
848777
func (db *DB) DeleteStreamed(
849-
ctx context.Context, entityType Entity, ids <-chan interface{}, deleteOpts ...DeleteOption,
778+
ctx context.Context, entityType Entity, ids <-chan interface{}, onSuccess ...OnSuccess[any],
850779
) error {
851-
852-
deleteOptions := NewDeleteOptions(deleteOpts...)
853-
854780
sem := db.GetSemaphoreForTable(TableName(entityType))
855-
856-
var stmt string
857-
858-
if deleteOptions.column != "" {
859-
stmt = fmt.Sprintf("DELETE FROM %s WHERE %s IN (?)", TableName(entityType), deleteOptions.column)
860-
} else {
861-
stmt = db.BuildDeleteStmt(entityType)
862-
}
863-
864781
return db.BulkExec(
865-
ctx, stmt, db.Options.MaxPlaceholdersPerStatement, sem, ids, deleteOptions.onSuccess...,
782+
ctx, db.BuildDeleteStmt(entityType), db.Options.MaxPlaceholdersPerStatement, sem, ids, onSuccess...,
866783
)
867784
}
868785

@@ -878,7 +795,7 @@ func (db *DB) Delete(
878795
}
879796
close(idsCh)
880797

881-
return db.DeleteStreamed(ctx, entityType, idsCh, WithOnSuccessDelete(onSuccess...))
798+
return db.DeleteStreamed(ctx, entityType, idsCh, onSuccess...)
882799
}
883800

884801
// ExecTx executes the provided function within a database transaction.

database/optionally.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package database
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/icinga/icinga-go-library/com"
7+
"github.com/pkg/errors"
8+
)
9+
10+
// Upsert inserts new rows into a table or updates rows of a table if the primary key already exists.
11+
type Upsert interface {
12+
// Stream bulk upserts the specified entities via NamedBulkExec.
13+
// If not explicitly specified, the upsert statement is created using
14+
// BuildUpsertStmt with the first entity from the entities stream.
15+
Stream(ctx context.Context, entities <-chan Entity) error
16+
}
17+
18+
// UpsertOption is a functional option for NewUpsert.
19+
type UpsertOption func(u *upsert)
20+
21+
// WithOnUpsert adds callback(s) to bulk upserts. Entities for which the
22+
// operation was performed successfully are passed to the callbacks.
23+
func WithOnUpsert(onUpsert ...OnSuccess[Entity]) UpsertOption {
24+
return func(u *upsert) {
25+
u.onUpsert = onUpsert
26+
}
27+
}
28+
29+
// WithStatement uses the specified statement for bulk upserts instead of automatically creating one.
30+
func WithStatement(stmt string, placeholders int) UpsertOption {
31+
return func(u *upsert) {
32+
u.stmt = stmt
33+
u.placeholders = placeholders
34+
}
35+
}
36+
37+
// NewUpsert creates a new Upsert initalized with a database.
38+
func NewUpsert(db *DB, options ...UpsertOption) Upsert {
39+
u := &upsert{db: db}
40+
41+
for _, option := range options {
42+
option(u)
43+
}
44+
45+
return u
46+
}
47+
48+
type upsert struct {
49+
db *DB
50+
onUpsert []OnSuccess[Entity]
51+
stmt string
52+
placeholders int
53+
}
54+
55+
func (u *upsert) Stream(ctx context.Context, entities <-chan Entity) error {
56+
first, forward, err := com.CopyFirst(ctx, entities)
57+
if err != nil {
58+
return errors.Wrap(err, "can't copy first entity")
59+
}
60+
61+
sem := u.db.GetSemaphoreForTable(TableName(first))
62+
var stmt string
63+
var placeholders int
64+
65+
if u.stmt != "" {
66+
stmt = u.stmt
67+
placeholders = u.placeholders
68+
} else {
69+
stmt, placeholders = u.db.BuildUpsertStmt(first)
70+
}
71+
72+
return u.db.NamedBulkExec(
73+
ctx, stmt, u.db.BatchSizeByPlaceholders(placeholders), sem,
74+
forward, SplitOnDupId[Entity], u.onUpsert...,
75+
)
76+
}
77+
78+
// Delete deletes rows of a table.
79+
type Delete interface {
80+
// Stream bulk deletes rows from the table specified in from using the given args stream via BulkExec.
81+
// Unless explicitly specified, the DELETE statement is created using BuildDeleteStmt.
82+
Stream(ctx context.Context, from any, args <-chan any) error
83+
}
84+
85+
// DeleteOption is a functional option for NewDelete.
86+
type DeleteOption func(options *delete)
87+
88+
// WithOnDelete adds callback(s) to bulk deletes. Arguments for which the
89+
// operation was performed successfully are passed to the callbacks.
90+
func WithOnDelete(onDelete ...OnSuccess[any]) DeleteOption {
91+
return func(d *delete) {
92+
d.onDelete = onDelete
93+
}
94+
}
95+
96+
// ByColumn uses the given column for the WHERE clause that the rows must
97+
// satisfy in order to be deleted, instead of automatically using ID.
98+
func ByColumn(column string) DeleteOption {
99+
return func(d *delete) {
100+
d.column = column
101+
}
102+
}
103+
104+
// NewDelete creates a new Delete initalized with a database.
105+
func NewDelete(db *DB, options ...DeleteOption) Delete {
106+
d := &delete{db: db}
107+
108+
for _, option := range options {
109+
option(d)
110+
}
111+
112+
return d
113+
}
114+
115+
type delete struct {
116+
db *DB
117+
column string
118+
onDelete []OnSuccess[any]
119+
}
120+
121+
func (d *delete) Stream(ctx context.Context, from any, args <-chan any) error {
122+
var stmt string
123+
124+
if d.column != "" {
125+
stmt = fmt.Sprintf(`DELETE FROM "%s" WHERE %s IN (?)`, TableName(from), d.column)
126+
} else {
127+
stmt = d.db.BuildDeleteStmt(from)
128+
}
129+
130+
sem := d.db.GetSemaphoreForTable(TableName(from))
131+
132+
return d.db.BulkExec(
133+
ctx, stmt, d.db.Options.MaxPlaceholdersPerStatement, sem, args, d.onDelete...,
134+
)
135+
}

0 commit comments

Comments
 (0)