Skip to content

Commit 277e491

Browse files
Merge pull request #44 from hhftechnology/dev
Dev
2 parents 89ed4ae + c629337 commit 277e491

File tree

8 files changed

+1331
-830
lines changed

8 files changed

+1331
-830
lines changed

database/cleanup.go

Lines changed: 405 additions & 0 deletions
Large diffs are not rendered by default.

database/transaction.go

Lines changed: 75 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package database
22

33
import (
4+
"context"
45
"database/sql"
56
"fmt"
67
"log"
8+
"time"
79
)
810

911
// TxFn represents a function that uses a transaction
@@ -42,58 +44,81 @@ func (db *DB) WithTransaction(fn TxFn) error {
4244
return nil
4345
}
4446

45-
// QueryRow executes a query that returns a single row and scans the result into the provided destination
46-
func (db *DB) QueryRowSafe(query string, dest interface{}, args ...interface{}) error {
47-
row := db.QueryRow(query, args...)
48-
if err := row.Scan(dest); err != nil {
49-
if err == sql.ErrNoRows {
50-
return ErrNotFound
51-
}
52-
return fmt.Errorf("scan failed: %w", err)
53-
}
54-
return nil
55-
}
56-
57-
// ExecSafe executes a statement and returns the result summary
58-
func (db *DB) ExecSafe(query string, args ...interface{}) (sql.Result, error) {
59-
result, err := db.Exec(query, args...)
60-
if err != nil {
61-
return nil, fmt.Errorf("exec failed: %w", err)
62-
}
63-
return result, nil
64-
}
65-
66-
// CustomError types for database operations
67-
var (
68-
ErrNotFound = fmt.Errorf("record not found")
69-
ErrDuplicate = fmt.Errorf("duplicate record")
70-
ErrConstraint = fmt.Errorf("constraint violation")
71-
)
72-
73-
// ExecTx executes a statement within a transaction and returns the result
74-
func ExecTx(tx *sql.Tx, query string, args ...interface{}) (sql.Result, error) {
75-
result, err := tx.Exec(query, args...)
76-
if err != nil {
77-
return nil, fmt.Errorf("exec in transaction failed: %w", err)
78-
}
79-
return result, nil
80-
}
81-
82-
// GetRowsAffected is a helper to get rows affected from a result
83-
func GetRowsAffected(result sql.Result) (int64, error) {
84-
affected, err := result.RowsAffected()
85-
if err != nil {
86-
return 0, fmt.Errorf("failed to get rows affected: %w", err)
47+
// WithTimeoutTransaction wraps a function with a transaction that has a timeout
48+
func (db *DB) WithTimeoutTransaction(ctx context.Context, timeout time.Duration, fn TxFn) error {
49+
// Create a context with timeout
50+
ctx, cancel := context.WithTimeout(ctx, timeout)
51+
defer cancel()
52+
53+
// Create a done channel to signal completion
54+
done := make(chan error, 1)
55+
56+
// Run the transaction in a goroutine
57+
go func() {
58+
done <- db.WithTransaction(fn)
59+
}()
60+
61+
// Wait for either context timeout or transaction completion
62+
select {
63+
case <-ctx.Done():
64+
// Context timed out
65+
return fmt.Errorf("transaction timed out after %v: %w", timeout, ctx.Err())
66+
case err := <-done:
67+
// Transaction completed
68+
return err
8769
}
88-
return affected, nil
8970
}
9071

91-
// GetLastInsertID is a helper to get last insert ID from a result
92-
func GetLastInsertID(result sql.Result) (int64, error) {
93-
id, err := result.LastInsertId()
94-
if err != nil {
95-
return 0, fmt.Errorf("failed to get last insert ID: %w", err)
96-
}
97-
return id, nil
72+
// BatchTransaction executes multiple operations in a single transaction
73+
// All operations must succeed or the transaction is rolled back
74+
func (db *DB) BatchTransaction(operations []TxFn) error {
75+
return db.WithTransaction(func(tx *sql.Tx) error {
76+
for i, op := range operations {
77+
if err := op(tx); err != nil {
78+
return fmt.Errorf("operation %d failed: %w", i, err)
79+
}
80+
}
81+
return nil
82+
})
9883
}
9984

85+
// UpdateInTransaction updates a record in a transaction
86+
func (db *DB) UpdateInTransaction(table string, id string, updates map[string]interface{}) error {
87+
return db.WithTransaction(func(tx *sql.Tx) error {
88+
// Build the update statement
89+
query := fmt.Sprintf("UPDATE %s SET ", table)
90+
var params []interface{}
91+
92+
i := 0
93+
for field, value := range updates {
94+
if i > 0 {
95+
query += ", "
96+
}
97+
query += field + " = ?"
98+
params = append(params, value)
99+
i++
100+
}
101+
102+
// Add the WHERE clause and updated_at
103+
query += ", updated_at = ? WHERE id = ?"
104+
params = append(params, time.Now(), id)
105+
106+
// Execute the update
107+
result, err := tx.Exec(query, params...)
108+
if err != nil {
109+
return fmt.Errorf("update failed: %w", err)
110+
}
111+
112+
// Check if any rows were affected
113+
rowsAffected, err := result.RowsAffected()
114+
if err != nil {
115+
return fmt.Errorf("failed to get rows affected: %w", err)
116+
}
117+
118+
if rowsAffected == 0 {
119+
return fmt.Errorf("no rows affected, record with ID %s not found", id)
120+
}
121+
122+
return nil
123+
})
124+
}

main.go

Lines changed: 113 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -82,108 +82,119 @@ func DiscoverTraefikAPI() (string, error) {
8282
}
8383

8484
func main() {
85-
log.Println("Starting Middleware Manager...")
86-
87-
var debug bool
88-
flag.BoolVar(&debug, "debug", false, "Enable debug mode")
89-
flag.Parse()
90-
91-
cfg := loadConfiguration(debug)
92-
93-
if os.Getenv("TRAEFIK_API_URL") == "" {
94-
if discoveredURL, err := DiscoverTraefikAPI(); err == nil && discoveredURL != "" {
95-
log.Printf("Auto-discovered Traefik API URL: %s", discoveredURL)
96-
cfg.TraefikAPIURL = discoveredURL
97-
}
98-
}
99-
100-
db, err := database.InitDB(cfg.DBPath)
101-
if err != nil {
102-
log.Fatalf("Failed to initialize database: %v", err)
103-
}
104-
defer db.Close()
105-
106-
configDir := cfg.ConfigDir
107-
if err := config.EnsureConfigDirectory(configDir); err != nil {
108-
log.Printf("Warning: Failed to create config directory: %v", err)
109-
}
110-
111-
if err := config.SaveTemplateFile(configDir); err != nil {
112-
log.Printf("Warning: Failed to save default middleware templates: %v", err)
113-
}
114-
115-
if err := config.LoadDefaultTemplates(db); err != nil {
116-
log.Printf("Warning: Failed to load default middleware templates: %v", err)
117-
}
118-
119-
if err := config.SaveTemplateServicesFile(configDir); err != nil {
120-
log.Printf("Warning: Failed to save default service templates: %v", err)
121-
}
122-
123-
if err := config.LoadDefaultServiceTemplates(db); err != nil {
124-
log.Printf("Warning: Failed to load default service templates: %v", err)
125-
}
126-
127-
configManager, err := services.NewConfigManager(filepath.Join(configDir, "config.json"))
128-
if err != nil {
129-
log.Fatalf("Failed to initialize config manager: %v", err)
130-
}
131-
132-
configManager.EnsureDefaultDataSources(cfg.PangolinAPIURL, cfg.TraefikAPIURL)
133-
134-
stopChan := make(chan struct{})
135-
136-
resourceWatcher, err := services.NewResourceWatcher(db, configManager)
137-
if err != nil {
138-
log.Fatalf("Failed to create resource watcher: %v", err)
139-
}
140-
go resourceWatcher.Start(cfg.CheckInterval)
141-
142-
configGenerator := services.NewConfigGenerator(db, cfg.TraefikConfDir, configManager)
143-
go configGenerator.Start(cfg.GenerateInterval)
144-
145-
serverConfig := api.ServerConfig{
146-
Port: cfg.Port,
147-
UIPath: cfg.UIPath,
148-
Debug: cfg.Debug,
149-
AllowCORS: cfg.AllowCORS,
150-
CORSOrigin: cfg.CORSOrigin,
151-
}
152-
153-
server := api.NewServer(db.DB, serverConfig, configManager, cfg.TraefikStaticConfigPath, cfg.PluginsJSONURL)
154-
go func() {
155-
if err := server.Start(); err != nil {
156-
log.Printf("Server error: %v", err)
157-
close(stopChan)
158-
}
159-
}()
160-
161-
signalChan := make(chan os.Signal, 1)
162-
signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)
163-
164-
serviceWatcher, err := services.NewServiceWatcher(db, configManager)
165-
if err != nil {
166-
log.Printf("Warning: Failed to create service watcher: %v", err)
167-
serviceWatcher = nil
168-
} else {
169-
go serviceWatcher.Start(cfg.ServiceInterval)
170-
}
171-
172-
select {
173-
case <-signalChan:
174-
log.Println("Received shutdown signal")
175-
case <-stopChan:
176-
log.Println("Received stop signal from server")
177-
}
178-
179-
log.Println("Shutting down...")
180-
resourceWatcher.Stop()
181-
if serviceWatcher != nil {
182-
serviceWatcher.Stop()
183-
}
184-
configGenerator.Stop()
185-
server.Stop()
186-
log.Println("Middleware Manager stopped")
85+
log.Println("Starting Middleware Manager...")
86+
87+
var debug bool
88+
flag.BoolVar(&debug, "debug", false, "Enable debug mode")
89+
flag.Parse()
90+
91+
cfg := loadConfiguration(debug)
92+
93+
if os.Getenv("TRAEFIK_API_URL") == "" {
94+
if discoveredURL, err := DiscoverTraefikAPI(); err == nil && discoveredURL != "" {
95+
log.Printf("Auto-discovered Traefik API URL: %s", discoveredURL)
96+
cfg.TraefikAPIURL = discoveredURL
97+
}
98+
}
99+
100+
db, err := database.InitDB(cfg.DBPath)
101+
if err != nil {
102+
log.Fatalf("Failed to initialize database: %v", err)
103+
}
104+
defer db.Close()
105+
106+
configDir := cfg.ConfigDir
107+
if err := config.EnsureConfigDirectory(configDir); err != nil {
108+
log.Printf("Warning: Failed to create config directory: %v", err)
109+
}
110+
111+
if err := config.SaveTemplateFile(configDir); err != nil {
112+
log.Printf("Warning: Failed to save default middleware templates: %v", err)
113+
}
114+
115+
if err := config.LoadDefaultTemplates(db); err != nil {
116+
log.Printf("Warning: Failed to load default middleware templates: %v", err)
117+
}
118+
119+
if err := config.SaveTemplateServicesFile(configDir); err != nil {
120+
log.Printf("Warning: Failed to save default service templates: %v", err)
121+
}
122+
123+
if err := config.LoadDefaultServiceTemplates(db); err != nil {
124+
log.Printf("Warning: Failed to load default service templates: %v", err)
125+
}
126+
127+
// Run comprehensive database cleanup on startup
128+
log.Println("Performing full database cleanup...")
129+
cleanupOpts := database.DefaultCleanupOptions()
130+
cleanupOpts.LogLevel = 2 // More verbose logging during startup
131+
132+
if err := db.PerformFullCleanup(cleanupOpts); err != nil {
133+
log.Printf("Warning: Database cleanup encountered issues: %v", err)
134+
} else {
135+
log.Println("Database cleanup completed successfully")
136+
}
137+
138+
configManager, err := services.NewConfigManager(filepath.Join(configDir, "config.json"))
139+
if err != nil {
140+
log.Fatalf("Failed to initialize config manager: %v", err)
141+
}
142+
143+
configManager.EnsureDefaultDataSources(cfg.PangolinAPIURL, cfg.TraefikAPIURL)
144+
145+
stopChan := make(chan struct{})
146+
147+
resourceWatcher, err := services.NewResourceWatcher(db, configManager)
148+
if err != nil {
149+
log.Fatalf("Failed to create resource watcher: %v", err)
150+
}
151+
go resourceWatcher.Start(cfg.CheckInterval)
152+
153+
configGenerator := services.NewConfigGenerator(db, cfg.TraefikConfDir, configManager)
154+
go configGenerator.Start(cfg.GenerateInterval)
155+
156+
serverConfig := api.ServerConfig{
157+
Port: cfg.Port,
158+
UIPath: cfg.UIPath,
159+
Debug: cfg.Debug,
160+
AllowCORS: cfg.AllowCORS,
161+
CORSOrigin: cfg.CORSOrigin,
162+
}
163+
164+
server := api.NewServer(db.DB, serverConfig, configManager, cfg.TraefikStaticConfigPath, cfg.PluginsJSONURL)
165+
go func() {
166+
if err := server.Start(); err != nil {
167+
log.Printf("Server error: %v", err)
168+
close(stopChan)
169+
}
170+
}()
171+
172+
signalChan := make(chan os.Signal, 1)
173+
signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)
174+
175+
serviceWatcher, err := services.NewServiceWatcher(db, configManager)
176+
if err != nil {
177+
log.Printf("Warning: Failed to create service watcher: %v", err)
178+
serviceWatcher = nil
179+
} else {
180+
go serviceWatcher.Start(cfg.ServiceInterval)
181+
}
182+
183+
select {
184+
case <-signalChan:
185+
log.Println("Received shutdown signal")
186+
case <-stopChan:
187+
log.Println("Received stop signal from server")
188+
}
189+
190+
log.Println("Shutting down...")
191+
resourceWatcher.Stop()
192+
if serviceWatcher != nil {
193+
serviceWatcher.Stop()
194+
}
195+
configGenerator.Stop()
196+
server.Stop()
197+
log.Println("Middleware Manager stopped")
187198
}
188199

189200
func loadConfiguration(debug bool) Configuration {

0 commit comments

Comments
 (0)