Skip to content

Commit 0b868c1

Browse files
authored
chore: capture parameter labels in jobsdb update job status stats (#6495)
🔒 Scanned for secrets using gitleaks 8.28.0 # Description Until now, in `JobsDB#UpdateJobStatus`, all emitted metrics used the provided `parameterFilters` argument as their labels. However, when updating job statuses for a mixed set of parameter filters, we had to pass an empty argument, resulting in metrics without labels. Now, we are capturing parameter filter stat labels (`source_id` & `destination_id`) within `JobsDB#UpdateJobStatus` dynamically whenever the `parameterFilters` argument is empty. To achieve that, the internal method `doUpdateJobStatusInTx` returns additional statistic counters for both the number of jobs and the total payload bytes. Thus, the return type now serves two key purposes: 1. invalidating the `noResultsCache`. 2. Recording `jobsdb_updated_jobs` and `jobsdb_updated_bytes` metrics with proper labels. ## Linear Ticket resolves PIPE-2524 ## Security - [x] The code changed/added as part of this pull request won't create any security issues with how the software is being used.
1 parent 6c0493e commit 0b868c1

File tree

3 files changed

+915
-38
lines changed

3 files changed

+915
-38
lines changed

jobsdb/jobsdb.go

Lines changed: 131 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -603,7 +603,7 @@ type ParameterFilterT struct {
603603
}
604604

605605
func (p ParameterFilterT) String() string {
606-
return "Name=" + p.Name + ",Value=" + p.Value
606+
return p.Name + ":" + p.Value
607607
}
608608

609609
func (p ParameterFilterT) GetName() string {
@@ -2341,7 +2341,74 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, lastDS bool, param
23412341
}, true, nil
23422342
}
23432343

2344-
func (jd *Handle) updateJobStatusDSInTx(ctx context.Context, tx *Tx, ds dataSetT, statusList []*JobStatusT, tags statTags) (updatedStates map[string]map[string]map[ParameterFilterT]struct{}, err error) {
2344+
// updateJobStatusStats is a map containing statistics of job status updates grouped by: workspace -> state -> set of params (stringified) -> stats
2345+
type updateJobStatusStats map[workspaceIDKey]map[jobStateKey]map[parameterFiltersKey]*UpdateJobStatusStats
2346+
2347+
// workspaceIDKey represents workspace id as key
2348+
type workspaceIDKey string
2349+
2350+
// jobStateKey represents job state as key (failed, succeeded, etc)
2351+
type jobStateKey string
2352+
2353+
// parameterFiltersKey represents a list of job parameter filters (stringified) as key
2354+
type parameterFiltersKey string
2355+
2356+
// Merges metrics from two updateJobStatusStats together
2357+
func (ujss updateJobStatusStats) Merge(other updateJobStatusStats) {
2358+
for ws, states := range other {
2359+
if _, ok := ujss[ws]; !ok {
2360+
ujss[ws] = make(map[jobStateKey]map[parameterFiltersKey]*UpdateJobStatusStats)
2361+
}
2362+
for state, paramsMetrics := range states {
2363+
if _, ok := ujss[ws][state]; !ok {
2364+
ujss[ws][state] = make(map[parameterFiltersKey]*UpdateJobStatusStats)
2365+
}
2366+
for params, metrics := range paramsMetrics {
2367+
existingMetrics, ok := ujss[ws][state][params]
2368+
if !ok {
2369+
existingMetrics = &UpdateJobStatusStats{parameters: metrics.parameters}
2370+
ujss[ws][state][params] = existingMetrics
2371+
}
2372+
existingMetrics.count += metrics.count
2373+
existingMetrics.bytes += metrics.bytes
2374+
}
2375+
}
2376+
}
2377+
}
2378+
2379+
// Aggregates metrics by state across all workspaces
2380+
func (ujss updateJobStatusStats) StatsByState() map[jobStateKey]map[parameterFiltersKey]*UpdateJobStatusStats {
2381+
result := make(map[jobStateKey]map[parameterFiltersKey]*UpdateJobStatusStats)
2382+
for _, states := range ujss {
2383+
for state, paramsMetrics := range states {
2384+
if _, ok := result[state]; !ok {
2385+
result[state] = make(map[parameterFiltersKey]*UpdateJobStatusStats)
2386+
}
2387+
for params, metrics := range paramsMetrics {
2388+
existingMetrics, ok := result[state][params]
2389+
if !ok {
2390+
existingMetrics = &UpdateJobStatusStats{parameters: metrics.parameters}
2391+
result[state][params] = existingMetrics
2392+
}
2393+
existingMetrics.count += metrics.count
2394+
existingMetrics.bytes += metrics.bytes
2395+
}
2396+
}
2397+
}
2398+
return result
2399+
}
2400+
2401+
// Stats for jobs grouped by status and parameters
2402+
type UpdateJobStatusStats struct {
2403+
// job parameters
2404+
parameters ParameterFilterList
2405+
// number of jobs
2406+
count int
2407+
// total size of error responses in bytes
2408+
bytes int
2409+
}
2410+
2411+
func (jd *Handle) updateJobStatusDSInTx(ctx context.Context, tx *Tx, ds dataSetT, statusList []*JobStatusT, tags statTags) (updatedStates updateJobStatusStats, err error) {
23452412
if len(statusList) == 0 {
23462413
return updatedStates, err
23472414
}
@@ -2350,9 +2417,9 @@ func (jd *Handle) updateJobStatusDSInTx(ctx context.Context, tx *Tx, ds dataSetT
23502417
"update_job_status_ds_time",
23512418
&tags,
23522419
).RecordDuration()()
2353-
// workspace -> state -> params
2354-
updatedStates = map[string]map[string]map[ParameterFilterT]struct{}{}
2420+
updatedStates = updateJobStatusStats{}
23552421
store := func() error {
2422+
updatedStates = updateJobStatusStats{} // reset in case of retry
23562423
stmt, err := tx.PrepareContext(ctx, pq.CopyIn(ds.JobStatusTable, "job_id", "job_state", "attempt", "exec_time",
23572424
"retry_time", "error_code", "error_response", "parameters"))
23582425
if err != nil {
@@ -2362,18 +2429,29 @@ func (jd *Handle) updateJobStatusDSInTx(ctx context.Context, tx *Tx, ds dataSetT
23622429
defer func() { _ = stmt.Close() }()
23632430
for _, status := range statusList {
23642431
// Handle the case when google analytics returns gif in response
2365-
if _, ok := updatedStates[status.WorkspaceId]; !ok {
2366-
updatedStates[status.WorkspaceId] = make(map[string]map[ParameterFilterT]struct{})
2432+
if _, ok := updatedStates[workspaceIDKey(status.WorkspaceId)]; !ok {
2433+
updatedStates[workspaceIDKey(status.WorkspaceId)] = make(map[jobStateKey]map[parameterFiltersKey]*UpdateJobStatusStats)
23672434
}
2368-
if _, ok := updatedStates[status.WorkspaceId][status.JobState]; !ok {
2369-
updatedStates[status.WorkspaceId][status.JobState] = make(map[ParameterFilterT]struct{})
2435+
if _, ok := updatedStates[workspaceIDKey(status.WorkspaceId)][jobStateKey(status.JobState)]; !ok {
2436+
updatedStates[workspaceIDKey(status.WorkspaceId)][jobStateKey(status.JobState)] = make(map[parameterFiltersKey]*UpdateJobStatusStats)
23702437
}
2438+
var parameters ParameterFilterList
2439+
var parametersKey parameterFiltersKey
23712440
if status.JobParameters != nil {
23722441
for _, param := range cacheParameterFilters {
23732442
v := gjson.GetBytes(status.JobParameters, param).Str
2374-
updatedStates[status.WorkspaceId][status.JobState][ParameterFilterT{Name: param, Value: v}] = struct{}{}
2443+
parameters = append(parameters, ParameterFilterT{Name: param, Value: v})
23752444
}
2445+
parametersKey = parameterFiltersKey(parameters.String())
2446+
2447+
}
2448+
pm, ok := updatedStates[workspaceIDKey(status.WorkspaceId)][jobStateKey(status.JobState)][parametersKey]
2449+
if !ok {
2450+
pm = &UpdateJobStatusStats{parameters: parameters}
2451+
updatedStates[workspaceIDKey(status.WorkspaceId)][jobStateKey(status.JobState)][parametersKey] = pm
23762452
}
2453+
pm.count++
2454+
pm.bytes += len(status.ErrorResponse)
23772455

23782456
if !utf8.ValidString(string(status.ErrorResponse)) {
23792457
status.ErrorResponse = []byte(`{}`)
@@ -2838,38 +2916,53 @@ func (jd *Handle) internalUpdateJobStatusInTx(ctx context.Context, tx *Tx, dsLis
28382916

28392917
tx.AddSuccessListener(func() {
28402918
// clear cache
2841-
for ds, dsKeys := range updatedStatesByDS {
2842-
if len(dsKeys) == 0 { // if no keys, we need to invalidate all keys
2919+
for ds, dsStats := range updatedStatesByDS {
2920+
if len(dsStats) == 0 { // if no keys, we need to invalidate all keys
28432921
jd.noResultsCache.Invalidate(ds.Index, "", nil, nil, nil)
28442922
}
2845-
for workspace, wsKeys := range dsKeys {
2846-
if len(wsKeys) == 0 { // if no keys, we need to invalidate all keys
2847-
jd.noResultsCache.Invalidate(ds.Index, workspace, nil, nil, nil)
2923+
for workspace, wsStats := range dsStats {
2924+
if len(wsStats) == 0 { // if no keys, we need to invalidate all keys
2925+
jd.noResultsCache.Invalidate(ds.Index, string(workspace), nil, nil, nil)
28482926
}
2849-
for state, parametersMap := range wsKeys {
2850-
stateList := []string{state}
2851-
if len(parametersMap) == 0 { // if no keys, we need to invalidate all keys
2852-
jd.noResultsCache.Invalidate(ds.Index, workspace, customValFilters, stateList, nil)
2853-
}
2854-
parameterFilters := lo.Keys(parametersMap)
2855-
jd.noResultsCache.Invalidate(ds.Index, workspace, customValFilters, stateList, parameterFilters)
2927+
for state, parametersStats := range wsStats {
2928+
stateList := []string{string(state)}
2929+
parameterFilters := lo.UniqBy( // gather unique parameter filters
2930+
lo.FlatMap(
2931+
lo.Values(parametersStats), // from all JobStatusMetrics
2932+
func(ujss *UpdateJobStatusStats, _ int) []ParameterFilterT {
2933+
return ujss.parameters
2934+
},
2935+
),
2936+
func(pf ParameterFilterT) string {
2937+
return pf.String() // uniqueness by string representation
2938+
},
2939+
)
2940+
// invalidate cache for this combination
2941+
jd.noResultsCache.Invalidate(ds.Index, string(workspace), customValFilters, stateList, parameterFilters)
28562942
}
28572943
}
28582944
}
28592945
})
2946+
2947+
// use the aggregated stats from updateJobStatusInTx
28602948
tx.AddSuccessListener(func() {
2861-
statTags := tags.getStatsTags(jd.tablePrefix)
2862-
statusCounters := make(map[string]lo.Tuple2[int, int], 0)
2863-
for i := range statusList {
2864-
t := statusCounters[statusList[i].JobState]
2865-
t.A++ // job count
2866-
t.B += len(statusList[i].ErrorResponse) // bytes count
2867-
statusCounters[statusList[i].JobState] = t
2868-
}
2869-
for state, count := range statusCounters {
2870-
statTags["jobState"] = state
2871-
jd.stats.NewTaggedStat("jobsdb_updated_jobs", stats.CountType, statTags).Count(count.A)
2872-
jd.stats.NewTaggedStat("jobsdb_updated_bytes", stats.CountType, statTags).Count(count.B)
2949+
merged := updateJobStatusStats{}
2950+
for _, dsStats := range updatedStatesByDS {
2951+
merged.Merge(dsStats)
2952+
}
2953+
statsByState := merged.StatsByState()
2954+
for state, parametersMap := range statsByState {
2955+
for _, metrics := range parametersMap {
2956+
statTags := tags.getStatsTags(jd.tablePrefix)
2957+
statTags["jobState"] = string(state)
2958+
if len(parameterFilters) == 0 { // only add parameter filters if not already set
2959+
for _, pf := range metrics.parameters {
2960+
statTags[pf.Name] = pf.Value
2961+
}
2962+
}
2963+
jd.stats.NewTaggedStat("jobsdb_updated_jobs", stats.CountType, statTags).Count(metrics.count)
2964+
jd.stats.NewTaggedStat("jobsdb_updated_bytes", stats.CountType, statTags).Count(metrics.bytes)
2965+
}
28732966
}
28742967
})
28752968

@@ -2881,7 +2974,7 @@ doUpdateJobStatusInTx updates the status of a batch of jobs
28812974
customValFilters[] is passed, so we can efficiently mark empty cache
28822975
Later we can move this to query
28832976
*/
2884-
func (jd *Handle) doUpdateJobStatusInTx(ctx context.Context, tx *Tx, dsList []dataSetT, dsRangeList []dataSetRangeT, statusList []*JobStatusT, tags statTags) (updatedStatesByDS map[dataSetT]map[string]map[string]map[ParameterFilterT]struct{}, err error) {
2977+
func (jd *Handle) doUpdateJobStatusInTx(ctx context.Context, tx *Tx, dsList []dataSetT, dsRangeList []dataSetRangeT, statusList []*JobStatusT, tags statTags) (updatedStatesByDS map[dataSetT]updateJobStatusStats, err error) {
28852978
if len(statusList) == 0 {
28862979
return updatedStatesByDS, err
28872980
}
@@ -2893,7 +2986,7 @@ func (jd *Handle) doUpdateJobStatusInTx(ctx context.Context, tx *Tx, dsList []da
28932986

28942987
// We scan through the list of jobs and map them to DS
28952988
var lastPos int
2896-
updatedStatesByDS = make(map[dataSetT]map[string]map[string]map[ParameterFilterT]struct{})
2989+
updatedStatesByDS = make(map[dataSetT]updateJobStatusStats)
28972990
for _, ds := range dsRangeList {
28982991
minID := ds.minJobID
28992992
maxID := ds.maxJobID
@@ -2914,7 +3007,7 @@ func (jd *Handle) doUpdateJobStatusInTx(ctx context.Context, tx *Tx, dsList []da
29143007
logger.NewIntField("prevPos", int64(i-1)),
29153008
)
29163009
}
2917-
var updatedStates map[string]map[string]map[ParameterFilterT]struct{}
3010+
var updatedStates updateJobStatusStats
29183011
updatedStates, err = jd.updateJobStatusDSInTx(ctx, tx, ds.ds, statusList[lastPos:i], tags)
29193012
if err != nil {
29203013
return updatedStatesByDS, err
@@ -2935,7 +3028,7 @@ func (jd *Handle) doUpdateJobStatusInTx(ctx context.Context, tx *Tx, dsList []da
29353028
logger.NewIntField("prevJobID", statusList[i-1].JobID),
29363029
logger.NewIntField("index", int64(i)),
29373030
)
2938-
var updatedStates map[string]map[string]map[ParameterFilterT]struct{}
3031+
var updatedStates updateJobStatusStats
29393032
updatedStates, err = jd.updateJobStatusDSInTx(ctx, tx, ds.ds, statusList[lastPos:i], tags)
29403033
if err != nil {
29413034
return updatedStatesByDS, err
@@ -2957,7 +3050,7 @@ func (jd *Handle) doUpdateJobStatusInTx(ctx context.Context, tx *Tx, dsList []da
29573050
jd.logger.Debugn("RangeEnd",
29583051
logger.NewIntField("jobID", statusList[lastPos].JobID),
29593052
logger.NewIntField("lenStatusList", int64(len(statusList))))
2960-
var updatedStates map[string]map[string]map[ParameterFilterT]struct{}
3053+
var updatedStates updateJobStatusStats
29613054
updatedStates, err = jd.updateJobStatusDSInTx(ctx, tx, dsList[len(dsList)-1], statusList[lastPos:], tags)
29623055
if err != nil {
29633056
return updatedStatesByDS, err

0 commit comments

Comments
 (0)