Skip to content

Commit 05e619b

Browse files
julianbrostyhabteab
andcommitted
Trigger time based escalations
Co-authored-by: Yonas Habteab <[email protected]>
1 parent 3cdb662 commit 05e619b

File tree

4 files changed

+185
-72
lines changed

4 files changed

+185
-72
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/icinga/icinga-notifications
22

3-
go 1.20
3+
go 1.21
44

55
require (
66
github.com/creasty/defaults v1.7.0

internal/incident/incident.go

Lines changed: 159 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,14 @@ type Incident struct {
3535

3636
incidentRowID int64
3737

38+
// timer calls RetriggerEscalations the next time any escalation could be reached on the incident.
39+
//
40+
// For example, if there are escalations configured for incident_age>=1h and incident_age>=2h, if the incident
41+
// is less than an hour old, timer will fire 1h after incident start, if the incident is between 1h and 2h
42+
// old, timer will fire after 2h, and if the incident is already older than 2h, no future escalations can
43+
// be reached solely based on the incident aging, so no more timer is necessary and timer stores nil.
44+
timer *time.Timer
45+
3846
db *icingadb.DB
3947
logger *zap.SugaredLogger
4048
runtimeConfig *config.RuntimeConfig
@@ -147,7 +155,12 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event, created bo
147155
}
148156

149157
// Re-evaluate escalations based on the newly evaluated rules.
150-
if _, err := i.evaluateEscalations(ctx, tx, ev, causedBy); err != nil {
158+
escalations, err := i.evaluateEscalations(ev.Time)
159+
if err != nil {
160+
return err
161+
}
162+
163+
if err := i.triggerEscalations(ctx, tx, ev, causedBy, escalations); err != nil {
151164
return err
152165
}
153166

@@ -165,6 +178,68 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event, created bo
165178
return i.notifyContacts(ctx, ev, notifications)
166179
}
167180

181+
// RetriggerEscalations tries to re-evaluate the escalations and notify contacts.
182+
func (i *Incident) RetriggerEscalations(ev *event.Event) {
183+
i.Lock()
184+
defer i.Unlock()
185+
186+
i.runtimeConfig.RLock()
187+
defer i.runtimeConfig.RUnlock()
188+
189+
if !i.RecoveredAt.IsZero() {
190+
// Incident is recovered in the meantime.
191+
return
192+
}
193+
194+
if !time.Now().After(ev.Time) {
195+
i.logger.DPanicw("Event from the future", zap.Time("event_time", ev.Time), zap.Any("event", ev))
196+
return
197+
}
198+
199+
escalations, err := i.evaluateEscalations(ev.Time)
200+
if err != nil {
201+
i.logger.Errorw("Reevaluating time-based escalations failed", zap.Error(err))
202+
return
203+
}
204+
205+
if len(escalations) == 0 {
206+
i.logger.Debug("Reevaluated escalations, no new escalations triggered")
207+
return
208+
}
209+
210+
var notifications []*NotificationEntry
211+
ctx := context.Background()
212+
err = utils.RunInTx(ctx, i.db, func(tx *sqlx.Tx) error {
213+
err := ev.Sync(ctx, tx, i.db, i.Object.ID)
214+
if err != nil {
215+
return err
216+
}
217+
218+
if err = i.triggerEscalations(ctx, tx, ev, types.Int{}, escalations); err != nil {
219+
return err
220+
}
221+
222+
channels := make(contactChannels)
223+
for _, escalation := range escalations {
224+
channels.loadEscalationRecipientsChannel(escalation, i, ev.Time)
225+
}
226+
227+
notifications, err = i.addPendingNotifications(ctx, tx, ev, channels, types.Int{})
228+
229+
return err
230+
})
231+
if err != nil {
232+
i.logger.Errorw("Reevaluating time-based escalations failed", zap.Error(err))
233+
} else {
234+
if err = i.notifyContacts(ctx, ev, notifications); err != nil {
235+
i.logger.Errorw("Failed to notify reevaluated escalation recipients", zap.Error(err))
236+
return
237+
}
238+
239+
i.logger.Info("Successfully reevaluated time-based escalations")
240+
}
241+
}
242+
168243
func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, ev *event.Event) (types.Int, error) {
169244
var causedByHistoryId types.Int
170245
oldSeverity := i.Severity
@@ -214,6 +289,10 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx,
214289

215290
return types.Int{}, errors.New("can't insert incident closed history to the database")
216291
}
292+
293+
if i.timer != nil {
294+
i.timer.Stop()
295+
}
217296
}
218297

219298
i.Severity = newSeverity
@@ -312,14 +391,25 @@ func (i *Incident) evaluateRules(ctx context.Context, tx *sqlx.Tx, eventID int64
312391
return causedBy, nil
313392
}
314393

315-
// evaluateEscalations evaluates this incidents rule escalations if they aren't already.
316-
// Returns whether a new escalation triggered or an error on database failure.
317-
func (i *Incident) evaluateEscalations(ctx context.Context, tx *sqlx.Tx, ev *event.Event, causedBy types.Int) (bool, error) {
394+
// evaluateEscalations evaluates this incidents rule escalations to be triggered if they aren't already.
395+
// Returns the newly evaluated escalations to be triggered or an error on database failure.
396+
func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Escalation, error) {
318397
if i.EscalationState == nil {
319398
i.EscalationState = make(map[int64]*EscalationState)
320399
}
321400

322-
newEscalationMatched := false
401+
// Escalations are reevaluated now, reset any existing timer, if there might be future time-based escalations,
402+
// this function will start a new timer.
403+
if i.timer != nil {
404+
i.logger.Info("Stopping reevaluate timer due to escalation evaluation")
405+
i.timer.Stop()
406+
i.timer = nil
407+
}
408+
409+
filterContext := &rule.EscalationFilter{IncidentAge: eventTime.Sub(i.StartedAt), IncidentSeverity: i.Severity}
410+
411+
var escalations []*rule.Escalation
412+
retryAfter := rule.RetryNever
323413

324414
for rID := range i.Rules {
325415
r := i.runtimeConfig.Rules[rID]
@@ -336,67 +426,93 @@ func (i *Incident) evaluateEscalations(ctx context.Context, tx *sqlx.Tx, ev *eve
336426
if escalation.Condition == nil {
337427
matched = true
338428
} else {
339-
cond := &rule.EscalationFilter{
340-
IncidentAge: time.Since(i.StartedAt),
341-
IncidentSeverity: i.Severity,
342-
}
343-
344429
var err error
345-
matched, err = escalation.Condition.Eval(cond)
430+
matched, err = escalation.Condition.Eval(filterContext)
346431
if err != nil {
347432
i.logger.Warnw(
348433
"Failed to evaluate escalation condition", zap.String("rule", r.Name),
349434
zap.String("escalation", escalation.DisplayName()), zap.Error(err),
350435
)
351436

352437
matched = false
438+
} else if !matched {
439+
incidentAgeFilter := filterContext.ReevaluateAfter(escalation.Condition)
440+
retryAfter = min(retryAfter, incidentAgeFilter)
353441
}
354442
}
355443

356444
if matched {
357-
newEscalationMatched = true
445+
escalations = append(escalations, escalation)
446+
}
447+
}
448+
}
449+
}
358450

359-
state := &EscalationState{RuleEscalationID: escalation.ID, TriggeredAt: types.UnixMilli(time.Now())}
360-
i.EscalationState[escalation.ID] = state
451+
if retryAfter != rule.RetryNever {
452+
// The retryAfter duration is relative to the incident duration represented by the escalation filter,
453+
// i.e. if an incident is 15m old and an escalation rule evaluates incident_age>=1h the retryAfter would
454+
// contain 45m (1h - incident age (15m)). Therefore, we have to use the event time instead of the incident
455+
// start time here.
456+
nextEvalAt := eventTime.Add(retryAfter)
361457

362-
i.logger.Infof("Rule %q reached escalation %q", r.Name, escalation.DisplayName())
458+
i.logger.Infow("Scheduling escalation reevaluation", zap.Duration("after", retryAfter), zap.Time("at", nextEvalAt))
459+
i.timer = time.AfterFunc(retryAfter, func() {
460+
i.logger.Info("Reevaluating escalations")
363461

364-
if err := i.AddEscalationTriggered(ctx, tx, state); err != nil {
365-
i.logger.Errorw(
366-
"Failed to upsert escalation state", zap.String("rule", r.Name),
367-
zap.String("escalation", escalation.DisplayName()), zap.Error(err),
368-
)
462+
i.RetriggerEscalations(&event.Event{
463+
Type: event.TypeInternal,
464+
Time: nextEvalAt,
465+
Message: fmt.Sprintf("Incident reached age %v", retryAfter),
466+
})
467+
})
468+
}
369469

370-
return false, errors.New("failed to upsert escalation state")
371-
}
470+
return escalations, nil
471+
}
372472

373-
history := &HistoryRow{
374-
Time: state.TriggeredAt,
375-
EventID: utils.ToDBInt(ev.ID),
376-
RuleEscalationID: utils.ToDBInt(state.RuleEscalationID),
377-
RuleID: utils.ToDBInt(r.ID),
378-
Type: EscalationTriggered,
379-
CausedByIncidentHistoryID: causedBy,
380-
}
473+
// triggerEscalations triggers the given escalations and generates incident history items for each of them.
474+
// Returns an error on database failure.
475+
func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *event.Event, causedBy types.Int, escalations []*rule.Escalation) error {
476+
for _, escalation := range escalations {
477+
r := i.runtimeConfig.Rules[escalation.RuleID]
478+
i.logger.Infof("Rule %q reached escalation %q", r.Name, escalation.DisplayName())
381479

382-
if _, err := i.AddHistory(ctx, tx, history, false); err != nil {
383-
i.logger.Errorw(
384-
"Failed to insert escalation triggered incident history", zap.String("rule", r.Name),
385-
zap.String("escalation", escalation.DisplayName()), zap.Error(err),
386-
)
480+
state := &EscalationState{RuleEscalationID: escalation.ID, TriggeredAt: types.UnixMilli(time.Now())}
481+
i.EscalationState[escalation.ID] = state
387482

388-
return false, errors.New("failed to insert escalation triggered incident history")
389-
}
483+
if err := i.AddEscalationTriggered(ctx, tx, state); err != nil {
484+
i.logger.Errorw(
485+
"Failed to upsert escalation state", zap.String("rule", r.Name),
486+
zap.String("escalation", escalation.DisplayName()), zap.Error(err),
487+
)
390488

391-
if err := i.AddRecipient(ctx, tx, escalation, ev.ID); err != nil {
392-
return false, err
393-
}
394-
}
395-
}
489+
return errors.New("failed to upsert escalation state")
490+
}
491+
492+
history := &HistoryRow{
493+
Time: state.TriggeredAt,
494+
EventID: utils.ToDBInt(ev.ID),
495+
RuleEscalationID: utils.ToDBInt(state.RuleEscalationID),
496+
RuleID: utils.ToDBInt(r.ID),
497+
Type: EscalationTriggered,
498+
CausedByIncidentHistoryID: causedBy,
499+
}
500+
501+
if _, err := i.AddHistory(ctx, tx, history, false); err != nil {
502+
i.logger.Errorw(
503+
"Failed to insert escalation triggered incident history", zap.String("rule", r.Name),
504+
zap.String("escalation", escalation.DisplayName()), zap.Error(err),
505+
)
506+
507+
return errors.New("failed to insert escalation triggered incident history")
508+
}
509+
510+
if err := i.AddRecipient(ctx, tx, escalation, ev.ID); err != nil {
511+
return err
396512
}
397513
}
398514

399-
return newEscalationMatched, nil
515+
return nil
400516
}
401517

402518
// notifyContacts executes all the given pending notifications of the current incident.

internal/incident/incidents.go

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -45,34 +45,11 @@ func LoadOpenIncidents(ctx context.Context, db *icingadb.DB, logger *logging.Log
4545
continue
4646
}
4747

48-
evaluateRulesAndEscalations := func(ctx context.Context) error {
49-
ev := &event.Event{Time: time.Now(), Type: event.TypeEscalation}
50-
if !incident.evaluateEscalations() {
51-
return nil
52-
}
53-
54-
tx, err := db.BeginTxx(ctx, nil)
55-
if err != nil {
56-
return err
57-
}
58-
defer func() { _ = tx.Rollback() }()
59-
60-
err = incident.notifyContacts(ctx, tx, ev, types.Int{})
61-
if err != nil {
62-
return err
63-
}
64-
65-
if err = tx.Commit(); err != nil {
66-
incident.logger.Errorw("Failed to commit database transaction", zap.Error(err))
67-
return err
68-
}
69-
70-
return nil
71-
}
72-
73-
if evaluateRulesAndEscalations(ctx) != nil {
74-
continue
75-
}
48+
incident.RetriggerEscalations(&event.Event{
49+
Time: time.Now(),
50+
Type: event.TypeInternal,
51+
Message: "Incident reevaluation at daemon startup",
52+
})
7653
}
7754

7855
return nil

internal/utils/utils.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,26 @@ func BuildInsertStmtWithout(db *icingadb.DB, into interface{}, withoutColumn str
3030
)
3131
}
3232

33+
// RunInTx allows running a function in a database transaction without requiring manual transaction handling.
34+
//
35+
// A new transaction is started on db which is then passed to fn. After fn returns, the transaction is
36+
// committed unless an error was returned. If fn returns an error, that error is returned, otherwise an
37+
// error is returned if a database operation fails.
38+
func RunInTx(ctx context.Context, db *icingadb.DB, fn func(tx *sqlx.Tx) error) error {
39+
tx, err := db.BeginTxx(ctx, nil)
40+
if err != nil {
41+
return err
42+
}
43+
defer func() { _ = tx.Rollback() }()
44+
45+
err = fn(tx)
46+
if err != nil {
47+
return err
48+
}
49+
50+
return tx.Commit()
51+
}
52+
3353
// InsertAndFetchId executes the given query and fetches the last inserted ID.
3454
func InsertAndFetchId(ctx context.Context, tx *sqlx.Tx, stmt string, args any) (int64, error) {
3555
var lastInsertId int64

0 commit comments

Comments
 (0)