Skip to content

Commit e9725bd

Browse files
committed
update-db
1 parent 6bc0032 commit e9725bd

File tree

3 files changed

+265
-32
lines changed

3 files changed

+265
-32
lines changed

database/cleanup.go

Lines changed: 187 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"log"
77
"strings"
88
"time"
9+
"context"
10+
"sync"
911

1012
"github.com/hhftechnology/middleware-manager/util"
1113
)
@@ -30,7 +32,17 @@ func DefaultCleanupOptions() CleanupOptions {
3032
}
3133
}
3234

33-
// CleanupDuplicateServices removes service duplication from the database
35+
// Add this function locally if util package doesn't exist
36+
func normalizeID(id string) string {
37+
// Extract the base name (everything before the first @)
38+
baseName := id
39+
if idx := strings.Index(id, "@"); idx > 0 {
40+
baseName = id[:idx]
41+
}
42+
return baseName
43+
}
44+
45+
// CleanupDuplicateServices - CORRECTED VERSION
3446
func (db *DB) CleanupDuplicateServices(opts CleanupOptions) error {
3547
if opts.LogLevel >= 1 {
3648
log.Println("Starting cleanup of duplicate services...")
@@ -49,18 +61,17 @@ func (db *DB) CleanupDuplicateServices(opts CleanupOptions) error {
4961
Config string
5062
}
5163
uniqueServices := make(map[string]serviceInfo)
52-
5364
var servicesToDelete []string
5465

55-
// Process each service
66+
// Process each service - COMPLETE the duplicate detection logic
5667
for rows.Next() {
5768
var id, name, typ, configStr string
5869
if err := rows.Scan(&id, &name, &typ, &configStr); err != nil {
5970
return fmt.Errorf("failed to scan service: %w", err)
6071
}
6172

6273
// Get normalized ID
63-
normalizedID := util.NormalizeID(id)
74+
normalizedID := normalizeID(id) // Use local function instead of util.NormalizeID
6475

6576
// If we've already seen this normalized ID, check which one to keep
6677
if existing, found := uniqueServices[normalizedID]; found {
@@ -114,47 +125,137 @@ func (db *DB) CleanupDuplicateServices(opts CleanupOptions) error {
114125
if err := rows.Err(); err != nil {
115126
return fmt.Errorf("error iterating services: %w", err)
116127
}
117-
128+
118129
if len(servicesToDelete) == 0 {
119130
if opts.LogLevel >= 1 {
120131
log.Println("No duplicate services found.")
121132
}
122133
return nil
123134
}
124-
135+
125136
if opts.DryRun {
126137
log.Printf("DRY RUN: Would delete %d duplicate services", len(servicesToDelete))
127138
for _, id := range servicesToDelete {
128139
log.Printf(" - %s", id)
129140
}
130141
return nil
131142
}
143+
144+
// Use timeout transaction to prevent indefinite locks
145+
ctx := context.Background()
146+
timeout := 30 * time.Second
132147

133-
// Delete duplicates in a transaction
134-
return db.WithTransaction(func(tx *sql.Tx) error {
135-
for _, id := range servicesToDelete {
136-
if opts.LogLevel >= 1 {
137-
log.Printf("Deleting duplicate service: %s", id)
148+
return db.WithTimeoutTransaction(ctx, timeout, func(tx *sql.Tx) error {
149+
// Process in smaller batches to reduce lock time
150+
batchSize := opts.MaxDeleteBatch
151+
if batchSize <= 0 {
152+
batchSize = 50 // Default batch size
153+
}
154+
155+
for i := 0; i < len(servicesToDelete); i += batchSize {
156+
end := i + batchSize
157+
if end > len(servicesToDelete) {
158+
end = len(servicesToDelete)
138159
}
139160

140-
// First remove any resource_service references
141-
if _, err := tx.Exec("DELETE FROM resource_services WHERE service_id = ?", id); err != nil {
142-
return fmt.Errorf("failed to delete resource_service references for %s: %w", id, err)
143-
}
161+
batch := servicesToDelete[i:end]
144162

145-
// Then delete the service
146-
if _, err := tx.Exec("DELETE FROM services WHERE id = ?", id); err != nil {
147-
return fmt.Errorf("failed to delete service %s: %w", id, err)
163+
// Use batch DELETE with IN clause for better performance
164+
if len(batch) > 1 {
165+
placeholders := strings.Repeat("?,", len(batch)-1) + "?"
166+
args := make([]interface{}, len(batch))
167+
for i, id := range batch {
168+
args[i] = id
169+
}
170+
171+
// First remove relationships in batch
172+
_, err := tx.Exec(
173+
fmt.Sprintf("DELETE FROM resource_services WHERE service_id IN (%s)", placeholders),
174+
args...,
175+
)
176+
if err != nil {
177+
return fmt.Errorf("failed to delete service relationships: %w", err)
178+
}
179+
180+
// Then delete services in batch
181+
_, err = tx.Exec(
182+
fmt.Sprintf("DELETE FROM services WHERE id IN (%s)", placeholders),
183+
args...,
184+
)
185+
if err != nil {
186+
return fmt.Errorf("failed to delete services: %w", err)
187+
}
188+
189+
if opts.LogLevel >= 1 {
190+
log.Printf("Deleted batch of %d services", len(batch))
191+
}
192+
} else {
193+
// Single item - original logic
194+
id := batch[0]
195+
if _, err := tx.Exec("DELETE FROM resource_services WHERE service_id = ?", id); err != nil {
196+
return fmt.Errorf("failed to delete resource_service references for %s: %w", id, err)
197+
}
198+
if _, err := tx.Exec("DELETE FROM services WHERE id = ?", id); err != nil {
199+
return fmt.Errorf("failed to delete service %s: %w", id, err)
200+
}
148201
}
149202
}
150203

151-
if opts.LogLevel >= 1 {
152-
log.Printf("Cleanup complete. Removed %d duplicate services", len(servicesToDelete))
153-
}
154204
return nil
155205
})
156206
}
157207

208+
// CleanupManager - CORRECTED VERSION with proper DB reference
209+
type CleanupManager struct {
210+
db *DB // This should match your actual DB type
211+
cleanupMutex sync.Mutex
212+
isCleanupRunning bool
213+
}
214+
215+
func NewCleanupManager(database *DB) *CleanupManager {
216+
return &CleanupManager{
217+
db: database,
218+
cleanupMutex: sync.Mutex{},
219+
isCleanupRunning: false,
220+
}
221+
}
222+
223+
func (cm *CleanupManager) PerformFullCleanup(opts CleanupOptions) error {
224+
cm.cleanupMutex.Lock()
225+
defer cm.cleanupMutex.Unlock()
226+
227+
if cm.isCleanupRunning {
228+
return fmt.Errorf("cleanup already in progress")
229+
}
230+
231+
cm.isCleanupRunning = true
232+
defer func() {
233+
cm.isCleanupRunning = false
234+
}()
235+
236+
// Add warning log
237+
if opts.LogLevel >= 1 {
238+
log.Println("⚠️ Database cleanup starting - this may cause brief service interruptions")
239+
}
240+
241+
// First clean up services
242+
if err := cm.db.CleanupDuplicateServices(opts); err != nil {
243+
return fmt.Errorf("service cleanup failed: %w", err)
244+
}
245+
246+
// Then clean up resources
247+
if err := cm.db.CleanupDuplicateResources(opts); err != nil {
248+
return fmt.Errorf("resource cleanup failed: %w", err)
249+
}
250+
251+
// Finally clean up orphaned relationships
252+
if err := cm.db.CleanupOrphanedRelationships(opts); err != nil {
253+
return fmt.Errorf("relationship cleanup failed: %w", err)
254+
}
255+
256+
return nil
257+
}
258+
158259
// CleanupDuplicateResources removes resource duplication from the database
159260
func (db *DB) CleanupDuplicateResources(opts CleanupOptions) error {
160261
if opts.LogLevel >= 1 {
@@ -402,4 +503,69 @@ func (db *DB) PerformFullCleanup(opts CleanupOptions) error {
402503
}
403504

404505
return nil
506+
}
507+
508+
// CleanupOrphanedRelationships removes relationship rows that reference missing resources, services or middlewares.
509+
func (db *DB) CleanupOrphanedRelationships(opts CleanupOptions) error {
510+
if opts.LogLevel >= 1 {
511+
log.Println("Starting cleanup of orphaned relationships...")
512+
}
513+
514+
queries := []struct {
515+
desc string
516+
qry string
517+
}{
518+
{"orphaned resource_services by missing service", "SELECT COUNT(*) FROM resource_services rs LEFT JOIN services s ON rs.service_id = s.id WHERE s.id IS NULL"},
519+
{"orphaned resource_services by missing resource", "SELECT COUNT(*) FROM resource_services rs LEFT JOIN resources r ON rs.resource_id = r.id WHERE r.id IS NULL"},
520+
{"orphaned resource_middlewares by missing middleware", "SELECT COUNT(*) FROM resource_middlewares rm LEFT JOIN middlewares m ON rm.middleware_id = m.id WHERE m.id IS NULL"},
521+
{"orphaned resource_middlewares by missing resource", "SELECT COUNT(*) FROM resource_middlewares rm LEFT JOIN resources r ON rm.resource_id = r.id WHERE r.id IS NULL"},
522+
}
523+
524+
// Dry run: just report counts
525+
if opts.DryRun {
526+
for _, q := range queries {
527+
var count int64
528+
if err := db.QueryRow(q.qry).Scan(&count); err != nil {
529+
// Non-fatal: log and continue
530+
if opts.LogLevel >= 0 {
531+
log.Printf("DRY RUN: failed to count %s: %v", q.desc, err)
532+
}
533+
continue
534+
}
535+
log.Printf("DRY RUN: %s: %d", q.desc, count)
536+
}
537+
return nil
538+
}
539+
540+
// Execute deletes in a transaction
541+
return db.WithTransaction(func(tx *sql.Tx) error {
542+
delQueries := []struct {
543+
desc string
544+
qry string
545+
}{
546+
{"delete resource_services with missing service", "DELETE FROM resource_services WHERE service_id NOT IN (SELECT id FROM services)"},
547+
{"delete resource_services with missing resource", "DELETE FROM resource_services WHERE resource_id NOT IN (SELECT id FROM resources)"},
548+
{"delete resource_middlewares with missing middleware", "DELETE FROM resource_middlewares WHERE middleware_id NOT IN (SELECT id FROM middlewares)"},
549+
{"delete resource_middlewares with missing resource", "DELETE FROM resource_middlewares WHERE resource_id NOT IN (SELECT id FROM resources)"},
550+
}
551+
552+
for _, dq := range delQueries {
553+
res, err := tx.Exec(dq.qry)
554+
if err != nil {
555+
return fmt.Errorf("failed to %s: %w", dq.desc, err)
556+
}
557+
if opts.LogLevel >= 1 {
558+
if n, err := res.RowsAffected(); err == nil {
559+
log.Printf("Deleted %d rows: %s", n, dq.desc)
560+
} else {
561+
log.Printf("Deleted rows (unknown count): %s", dq.desc)
562+
}
563+
}
564+
}
565+
566+
if opts.LogLevel >= 1 {
567+
log.Println("Orphaned relationship cleanup complete.")
568+
}
569+
return nil
570+
})
405571
}

database/db.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,26 @@ type DB struct {
1919
*sql.DB
2020
}
2121

22+
// EnableWALMode configures the SQLite connection for WAL mode and reasonable defaults.
23+
func (db *DB) EnableWALMode() error {
24+
// Set busy timeout to avoid "database is locked" errors under contention.
25+
if _, err := db.Exec("PRAGMA busy_timeout = 5000"); err != nil {
26+
return fmt.Errorf("failed to set busy_timeout: %w", err)
27+
}
28+
29+
// Enable WAL journal mode.
30+
if _, err := db.Exec("PRAGMA journal_mode = WAL"); err != nil {
31+
return fmt.Errorf("failed to set journal_mode=WAL: %w", err)
32+
}
33+
34+
// Use NORMAL synchronous mode for better performance while keeping reasonable durability.
35+
if _, err := db.Exec("PRAGMA synchronous = NORMAL"); err != nil {
36+
return fmt.Errorf("failed to set synchronous=NORMAL: %w", err)
37+
}
38+
39+
return nil
40+
}
41+
2242
// TraefikConfig represents the structure of the Traefik configuration
2343
type TraefikConfig struct {
2444
HTTP struct {
@@ -37,6 +57,27 @@ type TraefikConfig struct {
3757
} `yaml:"udp,omitempty"`
3858
}
3959

60+
func NewDB(dbPath string) (*DB, error) {
61+
db, err := sql.Open("sqlite3", dbPath)
62+
if err != nil {
63+
return nil, err
64+
}
65+
66+
dbWrapper := &DB{db}
67+
68+
// Enable WAL mode and configure for concurrency
69+
if err := dbWrapper.EnableWALMode(); err != nil {
70+
log.Printf("Warning: Failed to enable WAL mode: %v", err)
71+
}
72+
73+
// Run migrations
74+
if err := runMigrations(db); err != nil {
75+
return nil, err
76+
}
77+
78+
return dbWrapper, nil
79+
}
80+
4081
// InitDB initializes the database connection
4182
func InitDB(dbPath string) (*DB, error) {
4283
// Create parent directory if it doesn't exist

services/config_generator.go

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -86,17 +86,17 @@ func (cg *ConfigGenerator) Start(interval time.Duration) {
8686
log.Printf("Initial config generation failed: %v", err)
8787
}
8888

89-
for {
90-
select {
91-
case <-ticker.C:
92-
if err := cg.generateConfig(); err != nil {
93-
log.Printf("Config generation failed: %v", err)
94-
}
95-
case <-cg.stopChan:
96-
log.Println("Config generator stopped")
97-
return
98-
}
99-
}
89+
for {
90+
select {
91+
case <-ticker.C:
92+
if err := cg.generateConfigWithRetry(); err != nil { // Use retry version
93+
log.Printf("Config generation failed: %v", err)
94+
}
95+
case <-cg.stopChan:
96+
log.Println("Config generator stopped")
97+
return
98+
}
99+
}
100100
}
101101
// Add this helper function at the top of the file with other utility functions
102102
func normalizeServiceID(id string) string {
@@ -119,6 +119,32 @@ func (cg *ConfigGenerator) Stop() {
119119
cg.isRunning = false
120120
}
121121

122+
func (cg *ConfigGenerator) generateConfigWithRetry() error {
123+
maxRetries := 3
124+
baseDelay := 1 * time.Second
125+
126+
for attempt := 0; attempt < maxRetries; attempt++ {
127+
err := cg.generateConfig()
128+
if err == nil {
129+
return nil
130+
}
131+
132+
// Check if it's a database locked error
133+
if strings.Contains(strings.ToLower(err.Error()), "database is locked") {
134+
if attempt < maxRetries-1 {
135+
delay := baseDelay * time.Duration(1<<attempt) // Exponential backoff
136+
log.Printf("⚠️ Database locked on attempt %d, retrying in %v", attempt+1, delay)
137+
time.Sleep(delay)
138+
continue
139+
}
140+
}
141+
142+
return err
143+
}
144+
145+
return fmt.Errorf("config generation failed after %d attempts", maxRetries)
146+
}
147+
122148
// generateConfig generates Traefik configuration files
123149
func (cg *ConfigGenerator) generateConfig() error {
124150
log.Println("Generating Traefik configuration...")

0 commit comments

Comments
 (0)