Skip to content

Commit af8d0f3

Browse files
authored
Merge pull request #17 from ydb-platform/parallel-write
replaced for loop to errgroup with goroutines
2 parents 922a92d + 34d2788 commit af8d0f3

File tree

1 file changed

+21
-17
lines changed

1 file changed

+21
-17
lines changed

internal/storage/ydb.go

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/ydb-platform/ydb-go-sdk/v3/table"
1717
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
1818
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
19+
"golang.org/x/sync/errgroup"
1920

2021
"github.com/ydb-platform/fluent-bit-ydb/internal/config"
2122
"github.com/ydb-platform/fluent-bit-ydb/internal/log"
@@ -378,33 +379,36 @@ func (s *YDB) Write(events []*model.Event) error {
378379
if portion > sz {
379380
portion = sz
380381
}
381-
position := 0
382+
var (
383+
position = 0
384+
writes = errgroup.Group{}
385+
)
382386
for position < sz {
383387
finish := position + portion
384388
if finish > sz {
385389
finish = sz
386390
}
387391
part := rows[position:finish]
388-
err = s.db.Table().Do(context.Background(),
389-
func(ctx context.Context, sess table.Session) error {
390-
return sess.BulkUpsert(ctx, path.Join(s.db.Name(), s.cfg.TablePath), types.ListValue(part...))
391-
},
392-
)
393-
if err != nil {
394-
if ydb.IsOperationErrorSchemeError(err) {
395-
log.Warn("Detected scheme error, trying to resolve field mapping from table description")
396-
resolveErr := s.resolveFieldMapping(context.Background())
397-
if resolveErr != nil {
398-
return errors.Join(err, resolveErr)
399-
}
400-
}
392+
writes.Go(func() error {
393+
return s.db.Table().Do(context.Background(),
394+
func(ctx context.Context, sess table.Session) error {
395+
return sess.BulkUpsert(ctx, path.Join(s.db.Name(), s.cfg.TablePath), types.ListValue(part...))
396+
},
397+
table.WithIdempotent(),
398+
)
399+
})
400+
position = finish
401+
}
401402

402-
return err
403+
err = writes.Wait()
404+
if err != nil && ydb.IsOperationErrorSchemeError(err) {
405+
log.Warn("Detected scheme error, trying to resolve field mapping from table description")
406+
if resolveErr := s.resolveFieldMapping(context.Background()); resolveErr != nil {
407+
return errors.Join(err, resolveErr)
403408
}
404-
position = finish
405409
}
406410

407-
return nil
411+
return err
408412
}
409413

410414
func (s *YDB) Exit() error {

0 commit comments

Comments
 (0)