Skip to content

Commit 8fa09d7

Browse files
benbjohnsonCopilot
andcommitted
Implement retention enforcement (#661)
Co-authored-by: Copilot <[email protected]>
1 parent 2c8f275 commit 8fa09d7

File tree

8 files changed

+222
-231
lines changed

8 files changed

+222
-231
lines changed

cmd/litestream/main.go

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -223,9 +223,8 @@ func (c *Config) CompactionLevels() litestream.CompactionLevels {
223223

224224
for i, lvl := range c.Levels {
225225
levels = append(levels, &litestream.CompactionLevel{
226-
Level: i + 1,
227-
Interval: lvl.Interval,
228-
Retention: lvl.Retention,
226+
Level: i + 1,
227+
Interval: lvl.Interval,
229228
})
230229
}
231230

@@ -397,14 +396,12 @@ func NewDBFromConfig(dbc *DBConfig) (*litestream.DB, error) {
397396

398397
// ReplicaConfig represents the configuration for a single replica in a database.
399398
type ReplicaConfig struct {
400-
Type string `yaml:"type"` // "file", "s3"
401-
Name string `yaml:"name"` // Deprecated
402-
Path string `yaml:"path"`
403-
URL string `yaml:"url"`
404-
Retention *time.Duration `yaml:"retention"`
405-
RetentionCheckInterval *time.Duration `yaml:"retention-check-interval"`
406-
SyncInterval *time.Duration `yaml:"sync-interval"`
407-
ValidationInterval *time.Duration `yaml:"validation-interval"`
399+
Type string `yaml:"type"` // "file", "s3"
400+
Name string `yaml:"name"` // Deprecated
401+
Path string `yaml:"path"`
402+
URL string `yaml:"url"`
403+
SyncInterval *time.Duration `yaml:"sync-interval"`
404+
ValidationInterval *time.Duration `yaml:"validation-interval"`
408405

409406
// S3 settings
410407
AccessKeyID string `yaml:"access-key-id"`
@@ -441,18 +438,9 @@ func NewReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *litestream.Re
441438

442439
// Build replica.
443440
r := litestream.NewReplica(db)
444-
if v := c.Retention; v != nil {
445-
r.Retention = *v
446-
}
447-
if v := c.RetentionCheckInterval; v != nil {
448-
r.RetentionCheckInterval = *v
449-
}
450441
if v := c.SyncInterval; v != nil {
451442
r.SyncInterval = *v
452443
}
453-
if v := c.ValidationInterval; v != nil {
454-
r.ValidationInterval = *v
455-
}
456444
for _, str := range c.Age.Identities {
457445
identities, err := age.ParseIdentities(strings.NewReader(str))
458446
if err != nil {

compaction_level.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@ type CompactionLevel struct {
1616

1717
// The frequency that the level is compacted from the previous level.
1818
Interval time.Duration
19-
20-
// The duration that files in this level are stored.
21-
Retention time.Duration
2219
}
2320

2421
// PrevCompactionAt returns the time when the last compaction occurred.

db.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1263,6 +1263,13 @@ func (db *DB) Compact(ctx context.Context, dstLevel int) (*ltx.FileInfo, error)
12631263
db.maxLTXFileInfos.m[dstLevel] = info
12641264
db.maxLTXFileInfos.Unlock()
12651265

1266+
// If this is L1, clean up L0 files that are below the minTXID.
1267+
if dstLevel == 1 {
1268+
if err := db.EnforceRetentionByTXID(ctx, 0, maxTXID); err != nil {
1269+
db.Logger.Error("enforce L0 retention", "error", err)
1270+
}
1271+
}
1272+
12661273
return info, nil
12671274
}
12681275

@@ -1284,6 +1291,91 @@ func (db *DB) Snapshot(ctx context.Context) (*ltx.FileInfo, error) {
12841291
return info, nil
12851292
}
12861293

1294+
// EnforceSnapshotRetention enforces retention of the snapshot level in the database by timestamp.
1295+
func (db *DB) EnforceSnapshotRetention(ctx context.Context, timestamp time.Time) (minSnapshotTXID ltx.TXID, err error) {
1296+
db.Logger.Debug("enforcing snapshot retention", "timestamp", timestamp)
1297+
1298+
itr, err := db.Replica.Client.LTXFiles(ctx, SnapshotLevel, 0)
1299+
if err != nil {
1300+
return 0, fmt.Errorf("fetch ltx files: %w", err)
1301+
}
1302+
defer itr.Close()
1303+
1304+
var deleted []*ltx.FileInfo
1305+
var lastInfo *ltx.FileInfo
1306+
for itr.Next() {
1307+
info := itr.Item()
1308+
lastInfo = info
1309+
1310+
// If this snapshot is before the retention timestamp, mark it for deletion.
1311+
if info.CreatedAt.Before(timestamp) {
1312+
deleted = append(deleted, info)
1313+
continue
1314+
}
1315+
1316+
// Track the lowest snapshot TXID so we can enforce retention in lower levels.
1317+
// This is only tracked for snapshots not marked for deletion.
1318+
if minSnapshotTXID == 0 || info.MaxTXID < minSnapshotTXID {
1319+
minSnapshotTXID = info.MaxTXID
1320+
}
1321+
}
1322+
1323+
// If this is the snapshot level, we need to ensure that at least one snapshot exists.
1324+
if len(deleted) > 0 && deleted[len(deleted)-1] == lastInfo {
1325+
deleted = deleted[:len(deleted)-1]
1326+
}
1327+
1328+
// Remove all files marked for deletion.
1329+
for _, info := range deleted {
1330+
db.Logger.Info("deleting ltx file", "level", SnapshotLevel, "minTXID", info.MinTXID, "maxTXID", info.MaxTXID)
1331+
}
1332+
if err := db.Replica.Client.DeleteLTXFiles(ctx, deleted); err != nil {
1333+
return 0, fmt.Errorf("remove ltx files: %w", err)
1334+
}
1335+
1336+
return minSnapshotTXID, nil
1337+
}
1338+
1339+
// EnforceRetentionByTXID enforces retention so that any LTX files below
1340+
// the target TXID are deleted. Always keep at least one file.
1341+
func (db *DB) EnforceRetentionByTXID(ctx context.Context, level int, txID ltx.TXID) (err error) {
1342+
db.Logger.Debug("enforcing retention", "level", level, "txid", txID)
1343+
1344+
itr, err := db.Replica.Client.LTXFiles(ctx, level, 0)
1345+
if err != nil {
1346+
return fmt.Errorf("fetch ltx files: %w", err)
1347+
}
1348+
defer itr.Close()
1349+
1350+
var deleted []*ltx.FileInfo
1351+
var lastInfo *ltx.FileInfo
1352+
for itr.Next() {
1353+
info := itr.Item()
1354+
lastInfo = info
1355+
1356+
// If this file's maxTXID is below the target TXID, mark it for deletion.
1357+
if info.MaxTXID < txID {
1358+
deleted = append(deleted, info)
1359+
continue
1360+
}
1361+
}
1362+
1363+
// Ensure we don't delete the last file.
1364+
if len(deleted) > 0 && deleted[len(deleted)-1] == lastInfo {
1365+
deleted = deleted[:len(deleted)-1]
1366+
}
1367+
1368+
// Remove all files marked for deletion.
1369+
for _, info := range deleted {
1370+
db.Logger.Info("deleting ltx file", "level", level, "minTXID", info.MinTXID, "maxTXID", info.MaxTXID)
1371+
}
1372+
if err := db.Replica.Client.DeleteLTXFiles(ctx, deleted); err != nil {
1373+
return fmt.Errorf("remove ltx files: %w", err)
1374+
}
1375+
1376+
return nil
1377+
}
1378+
12871379
// monitor runs in a separate goroutine and monitors the database & WAL.
12881380
func (db *DB) monitor() {
12891381
ticker := time.NewTicker(db.MonitorInterval)

db_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,80 @@ func TestDB_Snapshot(t *testing.T) {
497497
}
498498
}
499499

500+
func TestDB_EnforceRetention(t *testing.T) {
501+
db, sqldb := MustOpenDBs(t)
502+
defer MustCloseDBs(t, db, sqldb)
503+
db.Replica = litestream.NewReplica(db)
504+
db.Replica.Client = NewFileReplicaClient(t)
505+
506+
// Create table and sync initial state
507+
if _, err := sqldb.Exec(`CREATE TABLE t (id INT);`); err != nil {
508+
t.Fatal(err)
509+
} else if err := db.Sync(context.Background()); err != nil {
510+
t.Fatal(err)
511+
}
512+
513+
// Create multiple snapshots with delays to test retention
514+
for i := 0; i < 3; i++ {
515+
if _, err := sqldb.Exec(`INSERT INTO t (id) VALUES (?)`, i); err != nil {
516+
t.Fatal(err)
517+
} else if err := db.Sync(context.Background()); err != nil {
518+
t.Fatal(err)
519+
}
520+
521+
if _, err := db.Snapshot(context.Background()); err != nil {
522+
t.Fatal(err)
523+
}
524+
525+
// Sleep between snapshots to create time differences
526+
time.Sleep(100 * time.Millisecond)
527+
}
528+
529+
// Get list of snapshots before retention
530+
itr, err := db.Replica.Client.LTXFiles(context.Background(), litestream.SnapshotLevel, 0)
531+
if err != nil {
532+
t.Fatal(err)
533+
}
534+
var beforeCount int
535+
for itr.Next() {
536+
beforeCount++
537+
}
538+
itr.Close()
539+
540+
if beforeCount != 3 {
541+
t.Fatalf("expected 3 snapshots before retention, got %d", beforeCount)
542+
}
543+
544+
// Enforce retention to remove older snapshots
545+
retentionTime := time.Now().Add(-150 * time.Millisecond)
546+
if minSnapshotTXID, err := db.EnforceSnapshotRetention(context.Background(), retentionTime); err != nil {
547+
t.Fatal(err)
548+
} else if got, want := minSnapshotTXID, ltx.TXID(4); got != want {
549+
t.Fatalf("MinSnapshotTXID=%s, want %s", got, want)
550+
}
551+
552+
// Verify snapshots after retention
553+
itr, err = db.Replica.Client.LTXFiles(context.Background(), litestream.SnapshotLevel, 0)
554+
if err != nil {
555+
t.Fatal(err)
556+
}
557+
var afterCount int
558+
for itr.Next() {
559+
afterCount++
560+
}
561+
itr.Close()
562+
563+
// Should have at least one snapshot remaining
564+
if afterCount < 1 {
565+
t.Fatal("expected at least 1 snapshot after retention")
566+
}
567+
568+
// Should have fewer snapshots than before
569+
if afterCount >= beforeCount {
570+
t.Fatalf("expected fewer snapshots after retention, before=%d after=%d", beforeCount, afterCount)
571+
}
572+
}
573+
500574
func newDB(tb testing.TB, path string) *litestream.DB {
501575
tb.Helper()
502576
tb.Logf("db=%s", path)

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ require (
1515
github.com/mattn/go-sqlite3 v1.14.19
1616
github.com/pkg/sftp v1.13.6
1717
github.com/prometheus/client_golang v1.17.0
18-
github.com/superfly/ltx v0.3.17
18+
github.com/superfly/ltx v0.3.18
1919
golang.org/x/crypto v0.17.0
2020
golang.org/x/sync v0.5.0
2121
golang.org/x/sys v0.15.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
166166
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
167167
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
168168
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
169-
github.com/superfly/ltx v0.3.17 h1:7cFPsrPYxgfGlqyhyKQtE9tx6mLZmcBUOrCUx1J6ju8=
170-
github.com/superfly/ltx v0.3.17/go.mod h1:Nf50QAIXU/ET4ua3AuQ2fh31MbgNQZA7r/DYx6Os77s=
169+
github.com/superfly/ltx v0.3.18 h1:sAIww45DNoTvFD1fRfrhDTFKnKGca5/hGS4esSIFnfM=
170+
github.com/superfly/ltx v0.3.18/go.mod h1:Nf50QAIXU/ET4ua3AuQ2fh31MbgNQZA7r/DYx6Os77s=
171171
github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4=
172172
github.com/yosida95/uritemplate/v3 v3.0.2/go.mod h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4=
173173
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=

0 commit comments

Comments
 (0)