@@ -406,7 +406,7 @@ func (db *Database) DeleteStreamed(
406406 defer runtime .HandleCrash ()
407407 defer close (ch )
408408
409- return db .DeleteStreamed (ctx , relation , ch , features ... )
409+ return db .DeleteStreamed (ctx , relation , ch , WithCascading (), WithBlocking () )
410410 })
411411 streams [TableName (relation )] = ch
412412 }
@@ -494,8 +494,33 @@ func (db *Database) DeleteStreamed(
494494
495495// UpsertStreamed bulk upserts the specified entities via NamedBulkExec.
496496// The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream.
497- // Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
498- // concurrency is controlled via Options.MaxConnectionsPerTable.
497+ // Bulk size is controlled via Options.MaxPlaceholdersPerStatement and concurrency is controlled via
498+ // Options.MaxConnectionsPerTable.
499+ //
500+ // This sync process consists of the following steps:
501+ //
502+ // - It initially copies the first item from the specified stream and checks if this entity type provides relations.
503+ // If so, it first traverses all these relations recursively and starts a separate goroutine and caches the streams
504+ // for the started goroutine/relations type.
505+ //
506+ // - After the relations have been resolved, another goroutine is started which consumes from the specified `entities`
507+ // chan and performs the following actions for each of the streamed entities:
508+ //
509+ // - If the consumed entity doesn't satisfy the contracts.Entity interface, it will just forward that entity to the
510+ // next stage.
511+ //
512+ // - When the entity does satisfy the contracts.Entity, it applies the filter func on this entity (which hopefully
513+ // should check for its checksums), and forwards the entity to the `forward` chan only if the filter function
514+ // returns true and initiates a database upsert stream. Regardless, whether the function returns true, it will
515+ // stream each of the child entity with the `relation.Stream()` method to the respective cached stream of the relation.
516+ //
517+ // However, when the first item doesn't satisfy the database.HasRelations interface, it will just use only two
518+ // stages for the streamed entities to be upserted:
519+ //
520+ // - The first stage just consumes from the source stream (the `entities` chan) and applies the filter function (if any)
521+ // on each of the entities. This won't forward entities for which the filter function didn't also return true as well.
522+ //
523+ // - The second stage just performs a database upsert queries for entities that were forwarded from the previous one.
499524func (db * Database ) UpsertStreamed (
500525 ctx context.Context , entities <- chan interface {}, features ... Feature ,
501526) error {
@@ -520,7 +545,7 @@ func (db *Database) UpsertStreamed(
520545 defer runtime .HandleCrash ()
521546 defer close (ch )
522547
523- return db .UpsertStreamed (ctx , ch )
548+ return db .UpsertStreamed (ctx , ch , WithCascading (), WithPreExecution ( with . preExecution ) )
524549 })
525550 streams [TableName (relation )] = ch
526551 }
@@ -535,19 +560,30 @@ func (db *Database) UpsertStreamed(
535560
536561 for {
537562 select {
538- case entity , more := <- source :
563+ case e , more := <- source :
539564 if ! more {
540565 return nil
541566 }
542567
543- select {
544- case forward <- entity :
545- case <- ctx .Done ():
546- return ctx .Err ()
568+ entity , ok := e .(contracts.Entity )
569+ shouldUpsert := true
570+ if ok && with .preExecution != nil {
571+ shouldUpsert , err = with .preExecution (entity )
572+ if err != nil {
573+ return err
574+ }
575+ }
576+
577+ if shouldUpsert {
578+ select {
579+ case forward <- e :
580+ case <- ctx .Done ():
581+ return ctx .Err ()
582+ }
547583 }
548584
549585 select {
550- case dup <- entity :
586+ case dup <- e :
551587 case <- ctx .Done ():
552588 return ctx .Err ()
553589 }
@@ -591,8 +627,50 @@ func (db *Database) UpsertStreamed(
591627 return g .Wait ()
592628 }
593629
594- return db .NamedBulkExec (
595- ctx , stmt , db .BatchSizeByPlaceholders (placeholders ), sem , forward , com .NeverSplit [any ], features ... )
630+ upsertEntities := make (chan interface {})
631+ g , ctx := errgroup .WithContext (ctx )
632+ g .Go (func () error {
633+ defer runtime .HandleCrash ()
634+ defer close (upsertEntities )
635+
636+ for {
637+ select {
638+ case <- ctx .Done ():
639+ return ctx .Err ()
640+ case e , ok := <- forward :
641+ if ! ok {
642+ return nil
643+ }
644+
645+ entity , ok := e .(contracts.Entity )
646+ shouldUpsert := true
647+ if ok && with .preExecution != nil {
648+ shouldUpsert , err = with .preExecution (entity )
649+ if err != nil {
650+ return err
651+ }
652+ }
653+
654+ if shouldUpsert {
655+ select {
656+ case upsertEntities <- entity :
657+ case <- ctx .Done ():
658+ return ctx .Err ()
659+ }
660+ }
661+ }
662+ }
663+ })
664+
665+ g .Go (func () error {
666+ defer runtime .HandleCrash ()
667+
668+ return db .NamedBulkExec (
669+ ctx , stmt , db .BatchSizeByPlaceholders (placeholders ), sem , upsertEntities , com .NeverSplit [any ], features ... ,
670+ )
671+ })
672+
673+ return g .Wait ()
596674}
597675
598676// YieldAll executes the query with the supplied scope,
0 commit comments