Skip to content

Commit c6b8caa

Browse files
authored
chore(indexworker): use structured logging (#2269)
Uses structured logging in the indexworker to make it easier to query events.
1 parent 9f39cad commit c6b8caa

File tree

1 file changed

+120
-33
lines changed

1 file changed

+120
-33
lines changed

internal/indexworker/indexworker.go

Lines changed: 120 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,14 @@ import (
2020
var ErrAdvisoryLockAlreadyAcquired = errors.New("advisory lock already acquired by another process")
2121
var ErrExtensionNotFound = errors.New("extension not found")
2222

23+
type Outcome string
24+
25+
const (
26+
OutcomeSuccess Outcome = "success"
27+
OutcomeFailure Outcome = "failure"
28+
OutcomeSkipped Outcome = "skipped"
29+
)
30+
2331
// CreateIndexes ensures that the necessary indexes on the users table exist.
2432
// If the indexes already exist and are valid, it skips creation.
2533
// It uses a Postgres advisory lock to prevent concurrent index creation
@@ -67,45 +75,58 @@ func CreateIndexes(ctx context.Context, config *conf.GlobalConfiguration, le *lo
6775
lockQuery := fmt.Sprintf("SELECT pg_try_advisory_lock(hashtext('%s')::bigint)", lockName)
6876

6977
if err := db.RawQuery(lockQuery).First(&lockAcquired); err != nil {
70-
le.Errorf("Failed to attempt advisory lock acquisition: %+v", err)
78+
le.WithFields(logrus.Fields{
79+
"outcome": OutcomeFailure,
80+
"code": "advisory_lock_acquisition_failed",
81+
}).WithError(err).Error("Failed to attempt advisory lock acquisition")
7182
return err
7283
}
7384

7485
if !lockAcquired {
75-
le.Infof("Another process is currently creating indexes. Skipping index creation.")
86+
le.WithFields(logrus.Fields{
87+
"outcome": OutcomeSkipped,
88+
"code": "advisory_lock_already_acquired",
89+
}).Info("Another process is currently holding the advisory lock, skipping index creation")
7690
return ErrAdvisoryLockAlreadyAcquired
7791
}
7892

79-
le.Infof("Successfully acquired advisory lock for index creation.")
93+
le.Debug("Successfully acquired advisory lock for index creation")
8094

8195
// Ensure lock is released on function exit
8296
defer func() {
8397
unlockQuery := fmt.Sprintf("SELECT pg_advisory_unlock(hashtext('%s')::bigint)", lockName)
8498
var unlocked bool
8599
if err := db.RawQuery(unlockQuery).First(&unlocked); err != nil {
86100
if ctx.Err() != nil {
87-
le.Infof("Context cancelled. Advisory lock will be released upon session termination.")
101+
le.Debug("Context cancelled, advisory lock will be released upon session termination")
88102
} else {
89-
le.Errorf("Failed to release advisory lock: %+v", err)
103+
le.WithError(err).Error("Failed to release advisory lock")
90104
}
91105
} else if unlocked {
92-
le.Infof("Successfully released advisory lock.")
106+
le.Debug("Successfully released advisory lock")
93107
} else {
94-
le.Warnf("Advisory lock was not held when attempting to release.")
108+
le.Debug("Advisory lock was not held when attempting to release")
95109
}
96110
}()
97111

98112
// Ensure either auth_trgm or pg_trgm extension is installed
99113
extName, err := ensureTrgmExtension(db, config.DB.Namespace, le)
100114
if err != nil {
101-
le.Errorf("Failed to ensure trgm extension is available: %+v", err)
115+
le.WithFields(logrus.Fields{
116+
"outcome": OutcomeFailure,
117+
"code": "trgm_extension_unavailable",
118+
}).WithError(err).Error("Failed to ensure trgm extension is available")
102119
return err
103120
}
104121

105122
// Look up which schema the trgm extension is installed in
106123
trgmSchema, err := getTrgmExtensionSchema(db, extName)
107124
if err != nil {
108-
le.Errorf("Failed to find %s extension schema: %+v", extName, err)
125+
le.WithFields(logrus.Fields{
126+
"outcome": OutcomeFailure,
127+
"code": "extension_schema_not_found",
128+
"extension": extName,
129+
}).WithError(err).Error("Failed to find extension schema")
109130
return ErrExtensionNotFound
110131
}
111132

@@ -118,64 +139,104 @@ func CreateIndexes(ctx context.Context, config *conf.GlobalConfiguration, le *lo
118139
// Check existing indexes and their statuses. If all exist and are valid, skip creation.
119140
existingIndexes, err := getIndexStatuses(db, config.DB.Namespace, indexNames)
120141
if err != nil {
121-
le.Warnf("Failed to check existing indexes: %+v. Proceeding with index creation.", err)
142+
le.WithError(err).Warn("Failed to check existing index statuses, proceeding with index creation")
122143
} else {
123144
if len(existingIndexes) == len(indexes) {
124145
allHealthy := true
125146
for _, idx := range existingIndexes {
126147
if !idx.IsValid || !idx.IsReady {
127-
le.Infof("Index %s exists but is not healthy (valid: %v, ready: %v)", idx.IndexName, idx.IsValid, idx.IsReady)
148+
le.WithFields(logrus.Fields{
149+
"code": "index_unhealthy",
150+
"index_name": idx.IndexName,
151+
"index_valid": idx.IsValid,
152+
"index_ready": idx.IsReady,
153+
}).Info("Index exists but is not healthy")
128154
allHealthy = false
129155
break
130156
}
131157
}
132158

133159
if allHealthy {
134-
le.Infof("All %d indexes on auth.users already exist and are ready. Skipping index creation.", len(indexes))
160+
le.WithFields(logrus.Fields{
161+
"outcome": OutcomeSkipped,
162+
"code": "indexes_already_exist",
163+
"index_count": len(indexes),
164+
}).Debug("All indexes on auth.users already exist and are ready, skipping index creation")
135165
return nil
136166
}
137167
} else {
138-
le.Infof("Found %d of %d expected indexes. Proceeding with index creation.", len(existingIndexes), len(indexes))
168+
le.WithFields(logrus.Fields{
169+
"code": "indexes_missing",
170+
"existing_count": len(existingIndexes),
171+
"expected_count": len(indexes),
172+
}).Info("Found fewer indexes than expected, proceeding with index creation")
139173
}
140174
}
141175

142176
userCount, err := getApproximateUserCount(db, config.DB.Namespace)
143177
if err != nil {
144-
le.Warnf("Failed to get approximate user count: %+v. Proceeding with index creation.", err)
178+
le.WithError(err).Warn("Failed to get approximate user count, proceeding with index creation")
145179
}
146-
le.Infof("User count: %d. Starting index creation...", userCount)
180+
le.WithFields(logrus.Fields{
181+
"code": "index_creation_starting",
182+
"user_count": userCount,
183+
}).Info("Starting index creation")
147184

148185
// First, clean up any invalid indexes from previous interrupted attempts
149186
dropInvalidIndexes(db, le, config.DB.Namespace, indexNames)
150187

151188
// Create indexes one by one
152189
var failedIndexes []string
190+
var succeededIndexes []string
153191
totalStartTime := time.Now()
154192

155193
for _, idx := range indexes {
156194
startTime := time.Now()
157-
le.Infof("Creating index: %s", idx.name)
195+
le.WithFields(logrus.Fields{
196+
"code": "index_creating",
197+
"index_name": idx.name,
198+
}).Info("Creating index")
158199

159200
if err := db.RawQuery(idx.query).Exec(); err != nil {
160201
duration := time.Since(startTime).Milliseconds()
161-
162-
le.Errorf("Failed to create index %s after %d ms: %v", idx.name, duration, err)
202+
le.WithFields(logrus.Fields{
203+
"code": "index_creation_failed",
204+
"index_name": idx.name,
205+
"duration_ms": duration,
206+
}).WithError(err).Error("Failed to create index")
163207
failedIndexes = append(failedIndexes, idx.name)
164208
} else {
165209
duration := time.Since(startTime).Milliseconds()
166-
le.Infof("Successfully created index %s in %d ms", idx.name, duration)
210+
le.WithFields(logrus.Fields{
211+
"code": "index_created",
212+
"index_name": idx.name,
213+
"duration_ms": duration,
214+
}).Info("Successfully created index")
215+
succeededIndexes = append(succeededIndexes, idx.name)
167216
}
168217
}
169218

170219
totalDuration := time.Since(totalStartTime).Milliseconds()
171220

172221
if len(failedIndexes) > 0 {
173-
le.Warnf("Index creation completed in %d ms with some failures: %v", totalDuration, failedIndexes)
222+
le.WithFields(logrus.Fields{
223+
"outcome": OutcomeFailure,
224+
"code": "index_creation_partial_failure",
225+
"duration_ms": totalDuration,
226+
"failed_indexes": failedIndexes,
227+
"succeeded_indexes": succeededIndexes,
228+
}).Error("Index creation completed with some failures")
229+
174230
return fmt.Errorf("failed to create indexes: %v", failedIndexes)
175-
} else {
176-
le.Infof("All indexes created successfully in %d ms", totalDuration)
177231
}
178232

233+
le.WithFields(logrus.Fields{
234+
"outcome": OutcomeSuccess,
235+
"code": "index_creation_completed",
236+
"duration_ms": totalDuration,
237+
"succeeded_indexes": succeededIndexes,
238+
}).Info("All indexes created successfully")
239+
179240
return nil
180241
}
181242

@@ -249,19 +310,30 @@ func ensureTrgmExtension(db *pop.Connection, authSchema string, le *logrus.Entry
249310

250311
if authTrgmStatus.Available {
251312
if !authTrgmStatus.Installed {
252-
le.Infof("auth_trgm extension is available but not installed. Installing...")
313+
le.Debug("auth_trgm extension is available but not installed, installing")
314+
253315
if err := installExtension(db, "auth_trgm", authSchema); err != nil {
254-
le.Errorf("Failed to install auth_trgm extension: %v", err)
316+
le.WithFields(logrus.Fields{
317+
"outcome": OutcomeFailure,
318+
"code": "extension_install_failed",
319+
"extension": "auth_trgm",
320+
}).WithError(err).Error("Failed to install auth_trgm extension")
321+
255322
return "", fmt.Errorf("auth_trgm extension is available but failed to install: %w", err)
256323
}
257-
le.Infof("Successfully installed auth_trgm extension")
324+
325+
le.WithFields(logrus.Fields{
326+
"code": "extension_installed",
327+
"extension": "auth_trgm",
328+
}).Info("Successfully installed auth_trgm extension")
258329
} else {
259-
le.Infof("auth_trgm extension is already installed")
330+
le.Debug("auth_trgm extension is already installed")
260331
}
332+
261333
return "auth_trgm", nil
262334
}
263335

264-
le.Infof("auth_trgm extension is not available, checking pg_trgm...")
336+
le.Debug("auth_trgm extension is not available, checking pg_trgm")
265337

266338
pgTrgmStatus, err := getExtensionStatus(db, "pg_trgm")
267339
if err != nil {
@@ -273,14 +345,23 @@ func ensureTrgmExtension(db *pop.Connection, authSchema string, le *logrus.Entry
273345
}
274346

275347
if !pgTrgmStatus.Installed {
276-
le.Infof("pg_trgm extension is available but not installed. Installing...")
348+
le.Debug("pg_trgm extension is available but not installed, installing")
349+
277350
if err := installExtension(db, "pg_trgm", "pg_catalog"); err != nil {
278-
le.Errorf("Failed to install pg_trgm extension: %v", err)
351+
le.WithFields(logrus.Fields{
352+
"code": "extension_install_failed",
353+
"extension": "pg_trgm",
354+
}).WithError(err).Error("Failed to install pg_trgm extension")
355+
279356
return "", fmt.Errorf("pg_trgm extension is available but failed to install: %w", err)
280357
}
281-
le.Infof("Successfully installed pg_trgm extension")
358+
359+
le.WithFields(logrus.Fields{
360+
"code": "extension_installed",
361+
"extension": "pg_trgm",
362+
}).Info("Successfully installed pg_trgm extension")
282363
} else {
283-
le.Infof("pg_trgm extension is already installed")
364+
le.Debug("pg_trgm extension is already installed")
284365
}
285366

286367
return "pg_trgm", nil
@@ -397,10 +478,16 @@ func dropInvalidIndexes(db *pop.Connection, le *logrus.Entry, namespace string,
397478
var invalidIndexes []invalidIndex
398479
if err := db.RawQuery(cleanupQuery).All(&invalidIndexes); err == nil && len(invalidIndexes) > 0 {
399480
for _, idx := range invalidIndexes {
400-
le.Warnf("Dropping invalid index from previous interrupted run: %s", idx.IndexName)
481+
le.WithFields(logrus.Fields{
482+
"code": "dropping_invalid_index",
483+
"index_name": idx.IndexName,
484+
}).Info("Dropping invalid index from previous interrupted run")
401485
dropQuery := fmt.Sprintf("DROP INDEX CONCURRENTLY IF EXISTS %q.%s", namespace, idx.IndexName)
402486
if err := db.RawQuery(dropQuery).Exec(); err != nil {
403-
le.Errorf("Failed to drop invalid index %s: %v", idx.IndexName, err)
487+
le.WithFields(logrus.Fields{
488+
"code": "drop_invalid_index_failed",
489+
"index_name": idx.IndexName,
490+
}).WithError(err).Error("Failed to drop invalid index")
404491
}
405492
}
406493
}

0 commit comments

Comments
 (0)