Skip to content

Commit cced7ae

Browse files
authored
fix: remove source labels in repo query stats (#6483)
🔒 Scanned for secrets using gitleaks 8.28.0 # Description - Remove sourceId labels from warehouse repo query stats. ## Linear Ticket - Resolves INT-4323 ## Security - [x] The code changed/added as part of this pull request won't create any security issues with how the software is being used. <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Removes sourceId labels from warehouse repo query stats and downgrades a snowpipe streaming polling log line to debug. > > - **Warehouse Repo (stats tags)**: > - Drop `sourceId` from `TimerStat` tags across repos: `load.go` (`Insert`, `DistinctTableName`), `schema.go` (`Insert`, `GetNamespace`), `source.go` (`Insert`), `staging.go` (`Insert`, `Pending`, `CountPendingForSource`), `staging_snapshot.go` (`Insert`, `GetLatest`), `table_upload.go` (`GetByJobRunTaskRun`), `upload.go` (`LastCreatedAt`, `GetLatestUploadInfo`, `RetryFailedBatches`, `GetSyncLatencies`). > - Update tests in `warehouse/internal/repo/repo_test.go` to align with removed `sourceId` tags. > - **Snowpipe Streaming**: > - Change polling log for in-progress flushing from `Infon` to `Debugn` in `snowpipestreaming.go`. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 4bdd695. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent aa8438c commit cced7ae

File tree

9 files changed

+9
-32
lines changed

9 files changed

+9
-32
lines changed

router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -676,7 +676,7 @@ func isInProgress(statusRes *model.StatusResponse, info *importInfo, log logger.
676676

677677
// Case 3: Flushing in progress - continue polling
678678
if latestInsertedOffset > latestCommittedOffset {
679-
log.Infon("Flushing in progress, continuing to poll")
679+
log.Debugn("Flushing in progress, continuing to poll")
680680
return true, nil
681681
}
682682

warehouse/internal/repo/load.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ func (lf *LoadFiles) Delete(ctx context.Context, uploadID int64) error {
7676
// Insert loadFiles into the database.
7777
func (lf *LoadFiles) Insert(ctx context.Context, loadFiles []model.LoadFile) error {
7878
defer lf.TimerStat("insert", stats.Tags{
79-
"sourceId": loadFiles[0].SourceID,
8079
"destId": loadFiles[0].DestinationID,
8180
"destType": loadFiles[0].DestinationType,
8281
})()
@@ -265,8 +264,7 @@ func (lf *LoadFiles) DistinctTableName(
265264
endID int64,
266265
) ([]string, error) {
267266
defer lf.TimerStat("distinct_table_name", stats.Tags{
268-
"sourceId": sourceID,
269-
"destId": destinationID,
267+
"destId": destinationID,
270268
})()
271269

272270
rows, err := lf.db.QueryContext(ctx, `

warehouse/internal/repo/repo_test.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ func TestStatsEmission(t *testing.T) {
6161
},
6262
}))
6363
require.Greater(t, statsStore.Get("warehouse_repo_query_wh_load_files_insert_duration_seconds", stats.Tags{
64-
"sourceId": "source_id",
6564
"destId": "destination_id",
6665
"destType": "destination_type",
6766
}).LastDuration(), time.Duration(0))
@@ -74,15 +73,13 @@ func TestStatsEmission(t *testing.T) {
7473
})
7574
require.NoError(t, err)
7675
require.Greater(t, statsStore.Get("warehouse_repo_query_wh_schemas_insert_duration_seconds", stats.Tags{
77-
"sourceId": "source_id",
7876
"destId": "destination_id",
7977
"destType": "destination_type",
8078
}).LastDuration(), time.Duration(0))
8179

8280
_, err = repoStagingFileSchemaSnapshots.Insert(ctx, "source_id", "destination_id", "workspace_id", json.RawMessage(`{}`))
8381
require.NoError(t, err)
8482
require.Greater(t, statsStore.Get("warehouse_repo_query_wh_staging_file_schema_snapshots_insert_duration_seconds", stats.Tags{
85-
"sourceId": "source_id",
8683
"destId": "destination_id",
8784
"workspaceId": "workspace_id",
8885
}).LastDuration(), time.Duration(0))
@@ -94,7 +91,6 @@ func TestStatsEmission(t *testing.T) {
9491
}).WithSchema(json.RawMessage(`{}`))))
9592
require.NoError(t, err)
9693
require.Greater(t, statsStore.Get("warehouse_repo_query_wh_staging_files_insert_duration_seconds", stats.Tags{
97-
"sourceId": "source_id",
9894
"destId": "destination_id",
9995
"workspaceId": "workspace_id",
10096
}).LastDuration(), time.Duration(0))
@@ -115,13 +111,11 @@ func TestStatsEmission(t *testing.T) {
115111
})
116112
require.NoError(t, err)
117113
require.Greater(t, statsStore.Get("warehouse_repo_query_wh_async_jobs_insert_duration_seconds", stats.Tags{
118-
"sourceId": "source_id",
119114
"destId": "destination_id",
120115
"workspaceId": "workspace_id",
121116
}).LastDuration(), time.Duration(0))
122117

123118
require.Greater(t, statsStore.Get("warehouse_repo_query_wh_uploads_create_with_staging_files_duration_seconds", stats.Tags{
124-
"sourceId": "source_id",
125119
"destId": "destination_id",
126120
"destType": "destination_type",
127121
"workspaceId": "workspace_id",

warehouse/internal/repo/schema.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ func NewWHSchemas(db *sqlmiddleware.DB, conf *config.Config, log logger.Logger,
7272
// If Warehouse.enableTableLevelSchema is true in config, it also inserts/updates table-level schemas for each table in the schema.
7373
func (sh *WHSchema) Insert(ctx context.Context, whSchema *model.WHSchema) error {
7474
defer sh.TimerStat("insert", stats.Tags{
75-
"sourceId": whSchema.SourceID,
7675
"destId": whSchema.DestinationID,
7776
"destType": whSchema.DestinationType,
7877
})()
@@ -478,8 +477,7 @@ func parseWHSchemas(rows *sqlmiddleware.Rows) ([]*model.WHSchema, error) {
478477

479478
func (sh *WHSchema) GetNamespace(ctx context.Context, sourceID, destID string) (string, error) {
480479
defer sh.TimerStat("get_namespace", stats.Tags{
481-
"sourceId": sourceID,
482-
"destId": destID,
480+
"destId": destID,
483481
})()
484482

485483
query := `SELECT namespace FROM ` + whSchemaTableName + `

warehouse/internal/repo/source.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ func (s *Source) Insert(ctx context.Context, sourceJobs []model.SourceJob) ([]in
5454
return nil, errors.New("empty sourceJobs")
5555
}
5656
defer (*repo)(s).TimerStat("insert", stats.Tags{
57-
"sourceId": sourceJobs[0].SourceID,
5857
"destId": sourceJobs[0].DestinationID,
5958
"workspaceId": sourceJobs[0].WorkspaceID,
6059
})()

warehouse/internal/repo/staging.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,6 @@ func (sf *StagingFiles) Insert(ctx context.Context, stagingFile *model.StagingFi
157157
}
158158

159159
defer sf.TimerStat("insert", stats.Tags{
160-
"sourceId": stagingFile.SourceID,
161160
"destId": stagingFile.DestinationID,
162161
"workspaceId": stagingFile.WorkspaceID,
163162
})()
@@ -391,8 +390,7 @@ func (sf *StagingFiles) GetForUploadID(ctx context.Context, uploadID int64) ([]*
391390

392391
func (sf *StagingFiles) Pending(ctx context.Context, sourceID, destinationID string) ([]*model.StagingFile, error) {
393392
defer sf.TimerStat("pending", stats.Tags{
394-
"sourceId": sourceID,
395-
"destId": destinationID,
393+
"destId": destinationID,
396394
})()
397395

398396
var (
@@ -437,9 +435,7 @@ func (sf *StagingFiles) Pending(ctx context.Context, sourceID, destinationID str
437435
}
438436

439437
func (sf *StagingFiles) CountPendingForSource(ctx context.Context, sourceID string) (int64, error) {
440-
defer sf.TimerStat("count_pending_for_source", stats.Tags{
441-
"sourceId": sourceID,
442-
})()
438+
defer sf.TimerStat("count_pending_for_source", nil)()
443439

444440
return sf.countPending(ctx, `source_id = $1`, sourceID)
445441
}

warehouse/internal/repo/staging_snapshot.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ func NewStagingFileSchemaSnapshots(db *sqlmiddleware.DB, opts ...Opt) *StagingFi
4444
// Insert inserts a new schema snapshot into the database and returns its auto-generated ID.
4545
func (r *StagingFileSchemaSnapshots) Insert(ctx context.Context, sourceID, destinationID, workspaceID string, schemaBytes json.RawMessage) (uuid.UUID, error) {
4646
defer (*repo)(r).TimerStat("insert", stats.Tags{
47-
"sourceId": sourceID,
4847
"destId": destinationID,
4948
"workspaceId": workspaceID,
5049
})()
@@ -75,8 +74,7 @@ func (r *StagingFileSchemaSnapshots) Insert(ctx context.Context, sourceID, desti
7574
// Returns ErrNoSchemaSnapshot if not found.
7675
func (r *StagingFileSchemaSnapshots) GetLatest(ctx context.Context, sourceID, destinationID string) (*model.StagingFileSchemaSnapshot, error) {
7776
defer (*repo)(r).TimerStat("get_latest", stats.Tags{
78-
"sourceId": sourceID,
79-
"destId": destinationID,
77+
"destId": destinationID,
8078
})()
8179

8280
query := `

warehouse/internal/repo/table_upload.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -413,8 +413,7 @@ func (tu *TableUploads) GetByJobRunTaskRun(
413413
taskRunID string,
414414
) ([]model.TableUpload, error) {
415415
defer tu.TimerStat("get_by_job_run_task_run", stats.Tags{
416-
"sourceId": sourceID,
417-
"destId": destinationID,
416+
"destId": destinationID,
418417
})()
419418

420419
rows, err := tu.db.QueryContext(ctx, `

warehouse/internal/repo/upload.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,6 @@ func (u *Uploads) CreateWithStagingFiles(ctx context.Context, upload model.Uploa
156156
}
157157

158158
defer (*repo)(u).TimerStat("create_with_staging_files", stats.Tags{
159-
"sourceId": upload.SourceID,
160159
"destId": upload.DestinationID,
161160
"destType": upload.DestinationType,
162161
"workspaceId": upload.WorkspaceID,
@@ -659,8 +658,7 @@ func (u *Uploads) ResetInProgress(ctx context.Context, destType string) error {
659658

660659
func (u *Uploads) LastCreatedAt(ctx context.Context, sourceID, destinationID string) (time.Time, error) {
661660
defer (*repo)(u).TimerStat("last_created_at", stats.Tags{
662-
"sourceId": sourceID,
663-
"destId": destinationID,
661+
"destId": destinationID,
664662
})()
665663

666664
row := u.db.QueryRowContext(ctx, `
@@ -1068,8 +1066,7 @@ func (u *Uploads) RetryCount(ctx context.Context, opts model.RetryOptions) (int6
10681066

10691067
func (u *Uploads) GetLatestUploadInfo(ctx context.Context, sourceID, destinationID string) (*model.LatestUploadInfo, error) {
10701068
defer (*repo)(u).TimerStat("get_latest_upload_info", stats.Tags{
1071-
"sourceId": sourceID,
1072-
"destId": destinationID,
1069+
"destId": destinationID,
10731070
})()
10741071

10751072
var latestUploadInfo model.LatestUploadInfo
@@ -1309,7 +1306,6 @@ func (u *Uploads) RetryFailedBatches(
13091306
req model.RetryFailedBatchesRequest,
13101307
) (int64, error) {
13111308
defer (*repo)(u).TimerStat("retry_failed_batches", stats.Tags{
1312-
"sourceId": req.SourceID,
13131309
"destId": req.DestinationID,
13141310
"workspaceId": req.WorkspaceID,
13151311
"status": req.Status,
@@ -1479,7 +1475,6 @@ func (u *Uploads) GetSyncLatencies(ctx context.Context, request model.SyncLatenc
14791475
defer (*repo)(u).TimerStat("get_sync_latencies", stats.Tags{
14801476
"destId": request.DestinationID,
14811477
"workspaceId": request.WorkspaceID,
1482-
"sourceId": request.SourceID,
14831478
})()
14841479

14851480
aggregationSQL := getLatencyAggregationSQL(request.AggregationType)

0 commit comments

Comments
 (0)