Skip to content

Commit 2c8f275

Browse files
committed
Integration testing, fix misc bugs (#657)
1 parent ec08e9c commit 2c8f275

File tree

13 files changed

+154
-75
lines changed

13 files changed

+154
-75
lines changed

cmd/litestream/main.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/benbjohnson/litestream/abs"
2121
"github.com/benbjohnson/litestream/file"
2222
"github.com/benbjohnson/litestream/gcs"
23+
"github.com/benbjohnson/litestream/internal"
2324
"github.com/benbjohnson/litestream/s3"
2425
"github.com/benbjohnson/litestream/sftp"
2526
_ "github.com/mattn/go-sqlite3"
@@ -157,6 +158,9 @@ type Config struct {
157158
// Only includes L1 through the last non-snapshot level.
158159
Levels []*CompactionLevelConfig `yaml:"levels"`
159160

161+
// Snapshot configuration
162+
Snapshot SnapshotConfig `yaml:"snapshot"`
163+
160164
// List of databases to manage.
161165
DBs []*DBConfig `yaml:"dbs"`
162166

@@ -179,6 +183,12 @@ type Config struct {
179183
ConfigPath string `yaml:"-"`
180184
}
181185

186+
// SnapshotConfig configures snapshots.
187+
type SnapshotConfig struct {
188+
Interval time.Duration `yaml:"interval"`
189+
Retention time.Duration `yaml:"retention"`
190+
}
191+
182192
// LoggingConfig configures logging.
183193
type LoggingConfig struct {
184194
Level string `yaml:"level"`
@@ -278,12 +288,12 @@ func ReadConfigFile(filename string, expandEnv bool) (_ Config, err error) {
278288

279289
logOptions := slog.HandlerOptions{
280290
Level: slog.LevelInfo,
281-
ReplaceAttr: replaceAttr,
291+
ReplaceAttr: internal.ReplaceAttr,
282292
}
283293

284294
switch strings.ToUpper(config.Logging.Level) {
285295
case "TRACE":
286-
logOptions.Level = litestream.LevelTrace
296+
logOptions.Level = internal.LevelTrace
287297
case "DEBUG":
288298
logOptions.Level = slog.LevelDebug
289299
case "INFO":
@@ -308,13 +318,6 @@ func ReadConfigFile(filename string, expandEnv bool) (_ Config, err error) {
308318
return config, nil
309319
}
310320

311-
func replaceAttr(groups []string, a slog.Attr) slog.Attr {
312-
if a.Key == slog.LevelKey && a.Value.Any().(slog.Level) == litestream.LevelTrace {
313-
a.Value = slog.StringValue("TRACE")
314-
}
315-
return a
316-
}
317-
318321
// CompactionLevelConfig the configuration for a single level of compaction.
319322
type CompactionLevelConfig struct {
320323
Interval time.Duration `yaml:"interval"`

cmd/litestream/replicate.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,11 @@ func (c *ReplicateCommand) Run() (err error) {
116116

117117
levels := c.Config.CompactionLevels()
118118
c.Store = litestream.NewStore(dbs, levels)
119+
c.Store.SnapshotInterval = c.Config.Snapshot.Interval
120+
c.Store.SnapshotRetention = c.Config.Snapshot.Retention
121+
if err := c.Store.Open(context.Background()); err != nil {
122+
return fmt.Errorf("cannot open store: %w", err)
123+
}
119124

120125
// Notify user that initialization is done.
121126
for _, db := range c.Store.DBs() {

db.go

Lines changed: 52 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,7 @@ func (db *DB) verifyHeadersMatch() error {
467467
}
468468
hdr1 := dec.Header()
469469
if salt1 != hdr1.WALSalt1 || salt2 != hdr1.WALSalt2 {
470-
db.Logger.Log(LevelTrace, "salt mismatch",
470+
db.Logger.Log(internal.LevelTrace, "salt mismatch",
471471
"path", ltxPath,
472472
"wal", [2]uint32{salt1, salt2},
473473
"ltx", [2]uint32{hdr1.WALSalt1, hdr1.WALSalt2})
@@ -662,7 +662,7 @@ func (db *DB) verify(ctx context.Context) (info syncInfo, err error) {
662662
salt2 := binary.BigEndian.Uint32(hdr0[20:])
663663

664664
if salt1 != dec.Header().WALSalt1 || salt2 != dec.Header().WALSalt2 {
665-
db.Logger.Log(ctx, LevelTrace, "wal restarted",
665+
db.Logger.Log(ctx, internal.LevelTrace, "wal restarted",
666666
"salt1", salt1,
667667
"salt2", salt2)
668668

@@ -698,7 +698,7 @@ func (db *DB) verify(ctx context.Context) (info syncInfo, err error) {
698698
if ok, err := db.ltxDecoderContains(dec, pgno, buf[WALFrameHeaderSize:]); err != nil {
699699
return info, fmt.Errorf("ltx contains: %w", err)
700700
} else if !ok {
701-
db.Logger.Log(ctx, LevelTrace, "cannot find last page in last ltx file", "pgno", pgno)
701+
db.Logger.Log(ctx, internal.LevelTrace, "cannot find last page in last ltx file", "pgno", pgno, "offset", prevWALOffset)
702702
info.reason = "last page does not exist in last ltx file, wal overwritten by another process"
703703
return info, nil
704704
}
@@ -709,8 +709,6 @@ func (db *DB) verify(ctx context.Context) (info syncInfo, err error) {
709709
}
710710

711711
func (db *DB) ltxDecoderContains(dec *ltx.Decoder, pgno uint32, data []byte) (bool, error) {
712-
println("dbg/ltxDecoderContains", pgno, "\n", internal.Hexdump(data))
713-
714712
buf := make([]byte, dec.Header().PageSize)
715713
for {
716714
var hdr ltx.PageHeader
@@ -721,11 +719,9 @@ func (db *DB) ltxDecoderContains(dec *ltx.Decoder, pgno uint32, data []byte) (bo
721719
}
722720

723721
if pgno != hdr.Pgno {
724-
println("dbg/ltxDecoderContains.ltx", pgno, "!=", hdr.Pgno)
725722
continue
726723
}
727724
if !bytes.Equal(data, buf) {
728-
println("dbg/ltxDecoderContains.data.mismatch\n", internal.Hexdump(data))
729725
continue
730726
}
731727
return true, nil
@@ -750,12 +746,21 @@ func (db *DB) sync(ctx context.Context, checkpointing bool, info syncInfo) error
750746
txID := pos.TXID + 1
751747

752748
filename := db.LTXPath(0, txID, txID)
753-
db.Logger.Debug("sync",
749+
750+
logArgs := []any{
754751
"txid", txID.String(),
755-
"chkpt", checkpointing,
756-
"snap", info.snapshotting,
757752
"offset", info.offset,
758-
"reason", info.reason)
753+
}
754+
if checkpointing {
755+
logArgs = append(logArgs, "chkpt", "true")
756+
}
757+
if info.snapshotting {
758+
logArgs = append(logArgs, "snap", "true")
759+
}
760+
if info.reason != "" {
761+
logArgs = append(logArgs, "reason", info.reason)
762+
}
763+
db.Logger.Debug("sync", logArgs...)
759764

760765
// Prevent internal checkpoints during sync. Ignore if already in a checkpoint.
761766
if !checkpointing {
@@ -785,7 +790,8 @@ func (db *DB) sync(ctx context.Context, checkpointing bool, info syncInfo) error
785790
// If we cannot verify the previous frame
786791
var pfmError *PrevFrameMismatchError
787792
if rd, err = NewWALReaderWithOffset(walFile, info.offset, info.salt1, info.salt2, db.Logger); errors.As(err, &pfmError) {
788-
db.Logger.Log(ctx, LevelTrace, "prev frame mismatch, snapshotting", "err", pfmError.Err)
793+
db.Logger.Log(ctx, internal.LevelTrace, "prev frame mismatch, snapshotting", "err", pfmError.Err)
794+
info.offset = WALHeaderSize
789795
if rd, err = NewWALReader(walFile, db.Logger); err != nil {
790796
return fmt.Errorf("new wal reader, after reset")
791797
}
@@ -795,17 +801,23 @@ func (db *DB) sync(ctx context.Context, checkpointing bool, info syncInfo) error
795801
}
796802

797803
// Build a mapping of changed page numbers and their latest content.
798-
pageMap, sz, walCommit, err := rd.PageMap(ctx)
804+
pageMap, maxOffset, walCommit, err := rd.PageMap(ctx)
799805
if err != nil {
800806
return fmt.Errorf("page map: %w", err)
801807
}
802808
if walCommit > 0 {
803809
commit = walCommit
804810
}
805811

812+
var sz int64
813+
if maxOffset > 0 {
814+
sz = maxOffset - info.offset
815+
}
816+
assert(sz >= 0, fmt.Sprintf("wal size must be positive: sz=%d, maxOffset=%d, info.offset=%d", sz, maxOffset, info.offset))
817+
806818
// Exit if we have no new WAL pages and we aren't snapshotting.
807819
if !info.snapshotting && sz == 0 {
808-
db.Logger.Log(ctx, LevelTrace, "sync: skip", "reason", "no new wal pages")
820+
db.Logger.Log(ctx, internal.LevelTrace, "sync: skip", "reason", "no new wal pages")
809821
return nil
810822
}
811823

@@ -824,14 +836,15 @@ func (db *DB) sync(ctx context.Context, checkpointing bool, info syncInfo) error
824836
uid, gid := internal.Fileinfo(db.fileInfo)
825837
_ = os.Chown(tmpFilename, uid, gid)
826838

827-
db.Logger.Log(ctx, LevelTrace, "encode header",
839+
db.Logger.Log(ctx, internal.LevelTrace, "encode header",
828840
"txid", txID.String(),
829841
"commit", commit,
830842
"walOffset", info.offset,
831843
"walSize", sz,
832844
"salt1", rd.salt1,
833845
"salt2", rd.salt2)
834846

847+
timestamp := time.Now()
835848
enc := ltx.NewEncoder(ltxFile)
836849
if err := enc.EncodeHeader(ltx.Header{
837850
Version: ltx.Version,
@@ -840,7 +853,7 @@ func (db *DB) sync(ctx context.Context, checkpointing bool, info syncInfo) error
840853
Commit: commit,
841854
MinTXID: txID,
842855
MaxTXID: txID,
843-
Timestamp: time.Now().UnixMilli(),
856+
Timestamp: timestamp.UnixMilli(),
844857
WALOffset: info.offset,
845858
WALSize: sz,
846859
WALSalt1: rd.salt1,
@@ -876,9 +889,19 @@ func (db *DB) sync(ctx context.Context, checkpointing bool, info syncInfo) error
876889

877890
// Atomically rename file to final path.
878891
if err := os.Rename(tmpFilename, filename); err != nil {
892+
delete(db.maxLTXFileInfos.m, 0) // clear cache if in unknown state
879893
return fmt.Errorf("rename ltx file: %w", err)
880894
}
881895

896+
// Update file info cache for L0.
897+
db.maxLTXFileInfos.m[0] = &ltx.FileInfo{
898+
Level: 0,
899+
MinTXID: txID,
900+
MaxTXID: txID,
901+
CreatedAt: time.Now(),
902+
Size: enc.N(),
903+
}
904+
882905
db.Logger.Debug("db sync", "status", "ok")
883906

884907
return nil
@@ -902,7 +925,7 @@ func (db *DB) writeLTXFromDB(ctx context.Context, enc *ltx.Encoder, walFile *os.
902925

903926
// If page exists in the WAL, read from there.
904927
if offset, ok := pageMap[pgno]; ok {
905-
db.Logger.Log(ctx, LevelTrace, "encode page from wal", "txid", enc.Header().MinTXID, "offset", offset, "pgno", pgno)
928+
db.Logger.Log(ctx, internal.LevelTrace, "encode page from wal", "txid", enc.Header().MinTXID, "offset", offset, "pgno", pgno, "type", "db+wal")
906929

907930
if n, err := walFile.ReadAt(data, offset+WALFrameHeaderSize); err != nil {
908931
return fmt.Errorf("read page %d @ %d: %w", pgno, offset, err)
@@ -917,7 +940,7 @@ func (db *DB) writeLTXFromDB(ctx context.Context, enc *ltx.Encoder, walFile *os.
917940
}
918941

919942
offset := int64(pgno-1) * int64(db.pageSize)
920-
db.Logger.Log(ctx, LevelTrace, "encode page from database", "offset", offset, "pgno", pgno)
943+
db.Logger.Log(ctx, internal.LevelTrace, "encode page from database", "offset", offset, "pgno", pgno)
921944

922945
// Otherwise read directly from the database file.
923946
if _, err := db.f.ReadAt(data, offset); err != nil {
@@ -943,7 +966,7 @@ func (db *DB) writeLTXFromWAL(ctx context.Context, enc *ltx.Encoder, walFile *os
943966
for _, pgno := range pgnos {
944967
offset := pageMap[pgno]
945968

946-
db.Logger.Log(ctx, LevelTrace, "encode page from wal", "txid", enc.Header().MinTXID, "offset", offset, "pgno", pgno)
969+
db.Logger.Log(ctx, internal.LevelTrace, "encode page from wal", "txid", enc.Header().MinTXID, "offset", offset, "pgno", pgno, "type", "walonly")
947970

948971
// Read source page using page map.
949972
if n, err := walFile.ReadAt(data, offset+WALFrameHeaderSize); err != nil {
@@ -1077,6 +1100,10 @@ func (db *DB) execCheckpoint(mode string) (err error) {
10771100

10781101
// SnapshotReader returns the current position of the database & a reader that contains a full database snapshot.
10791102
func (db *DB) SnapshotReader(ctx context.Context) (ltx.Pos, io.Reader, error) {
1103+
if db.pageSize == 0 {
1104+
return ltx.Pos{}, nil, fmt.Errorf("page size not initialized yet")
1105+
}
1106+
10801107
pos, err := db.Pos()
10811108
if err != nil {
10821109
return pos, nil, fmt.Errorf("pos: %w", err)
@@ -1113,7 +1140,7 @@ func (db *DB) SnapshotReader(ctx context.Context) (ltx.Pos, io.Reader, error) {
11131140
}
11141141

11151142
// Build a mapping of changed page numbers and their latest content.
1116-
pageMap, sz, walCommit, err := rd.PageMap(ctx)
1143+
pageMap, maxOffset, walCommit, err := rd.PageMap(ctx)
11171144
if err != nil {
11181145
pw.CloseWithError(fmt.Errorf("page map: %w", err))
11191146
return
@@ -1122,6 +1149,11 @@ func (db *DB) SnapshotReader(ctx context.Context) (ltx.Pos, io.Reader, error) {
11221149
commit = walCommit
11231150
}
11241151

1152+
var sz int64
1153+
if maxOffset > 0 {
1154+
sz = maxOffset - rd.Offset()
1155+
}
1156+
11251157
db.Logger.Debug("encode snapshot header",
11261158
"txid", pos.TXID.String(),
11271159
"commit", commit,

db_test.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package litestream_test
33
import (
44
"context"
55
"database/sql"
6+
"flag"
67
"hash/crc64"
78
"log/slog"
89
"os"
@@ -12,9 +13,12 @@ import (
1213
"time"
1314

1415
"github.com/benbjohnson/litestream"
16+
"github.com/benbjohnson/litestream/internal"
1517
"github.com/superfly/ltx"
1618
)
1719

20+
var logLevel = flag.String("log.level", "debug", "")
21+
1822
func TestDB_Path(t *testing.T) {
1923
db := newDB(t, "/tmp/db")
2024
if got, want := db.Path(), `/tmp/db`; got != want {
@@ -496,9 +500,16 @@ func TestDB_Snapshot(t *testing.T) {
496500
func newDB(tb testing.TB, path string) *litestream.DB {
497501
tb.Helper()
498502
tb.Logf("db=%s", path)
503+
504+
level := slog.LevelDebug
505+
if strings.ToLower(*logLevel) == "trace" {
506+
level = internal.LevelTrace
507+
}
508+
499509
db := litestream.NewDB(path)
500510
db.Logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
501-
Level: slog.LevelDebug,
511+
Level: level,
512+
ReplaceAttr: internal.ReplaceAttr,
502513
}))
503514
return db
504515
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ require (
1515
github.com/mattn/go-sqlite3 v1.14.19
1616
github.com/pkg/sftp v1.13.6
1717
github.com/prometheus/client_golang v1.17.0
18-
github.com/superfly/ltx v0.3.16
18+
github.com/superfly/ltx v0.3.17
1919
golang.org/x/crypto v0.17.0
2020
golang.org/x/sync v0.5.0
2121
golang.org/x/sys v0.15.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
166166
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
167167
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
168168
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
169-
github.com/superfly/ltx v0.3.16 h1:osibxhNf/4+DOA4fKuPxXVMPKvqoHgtCdy0tYb+AdVQ=
170-
github.com/superfly/ltx v0.3.16/go.mod h1:Nf50QAIXU/ET4ua3AuQ2fh31MbgNQZA7r/DYx6Os77s=
169+
github.com/superfly/ltx v0.3.17 h1:7cFPsrPYxgfGlqyhyKQtE9tx6mLZmcBUOrCUx1J6ju8=
170+
github.com/superfly/ltx v0.3.17/go.mod h1:Nf50QAIXU/ET4ua3AuQ2fh31MbgNQZA7r/DYx6Os77s=
171171
github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4=
172172
github.com/yosida95/uritemplate/v3 v3.0.2/go.mod h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4=
173173
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=

internal/internal.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@ package internal
22

33
import (
44
"io"
5+
"log/slog"
56
"os"
67
"syscall"
78

89
"github.com/prometheus/client_golang/prometheus"
910
"github.com/prometheus/client_golang/prometheus/promauto"
1011
)
1112

13+
const LevelTrace = slog.Level(slog.LevelDebug - 4)
14+
1215
// ReadCloser wraps a reader to also attach a separate closer.
1316
type ReadCloser struct {
1417
r io.Reader
@@ -127,6 +130,16 @@ func MkdirAll(path string, fi os.FileInfo) error {
127130
return nil
128131
}
129132

133+
func ReplaceAttr(groups []string, a slog.Attr) slog.Attr {
134+
if a.Key == slog.LevelKey {
135+
switch a.Value.Any() {
136+
case LevelTrace:
137+
a.Value = slog.StringValue("TRACE")
138+
}
139+
}
140+
return a
141+
}
142+
130143
// Shared replica metrics.
131144
var (
132145
OperationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{

litestream.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"errors"
77
"fmt"
88
"io"
9-
"log/slog"
109
"os"
1110
"path"
1211
"path/filepath"
@@ -50,8 +49,6 @@ var (
5049
LogFlags = 0
5150
)
5251

53-
const LevelTrace = slog.Level(slog.LevelDebug - 4)
54-
5552
func init() {
5653
sql.Register("litestream-sqlite3", &sqlite3.SQLiteDriver{
5754
ConnectHook: func(conn *sqlite3.SQLiteConn) error {

0 commit comments

Comments
 (0)