diff --git a/adxexporter/summaryrule.go b/adxexporter/summaryrule.go index aabafb297..2b577086b 100644 --- a/adxexporter/summaryrule.go +++ b/adxexporter/summaryrule.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "strings" "time" adxmonv1 "github.com/Azure/adx-mon/api/v1" @@ -28,8 +29,22 @@ const ( stateCompleted = "Completed" stateFailed = "Failed" + + autoSuspendMinDuration = 15 * time.Minute + autoSuspendIntervalMultiplier = 3 + defaultSuspendedMessage = "SummaryRule execution is suspended" ) +var throttleErrorIndicators = []string{ + "e_query_throttled", + "query throttled", + "too many requests", + "request is throttled", + "rate limit", + "status code 429", + "http status code 429", +} + // SummaryRuleReconciler will reconcile SummaryRule objects. type SummaryRuleReconciler struct { client.Client @@ -41,6 +56,8 @@ type SummaryRuleReconciler struct { KustoExecutors map[string]KustoExecutor // per-database Kusto clients supporting Query and Mgmt Clock clock.Clock + + AutoSuspendEvaluator func(*adxmonv1.SummaryRule) bool } // Reconcile processes a single SummaryRule; placeholder implementation for now. @@ -79,12 +96,17 @@ func (r *SummaryRuleReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, nil } - if requeue, handled := r.adoptIfDesired(ctx, &rule); handled { - return requeue, nil + suspended, suspendMsg := r.evaluateSuspension(&rule) + if suspended { + r.markSuspendedStatus(&rule, suspendMsg) + } else { + if requeue, handled := r.adoptIfDesired(ctx, &rule); handled { + return requeue, nil + } } // Minimal submission path: compute window and submit async if it's time - if rule.ShouldSubmitRule(r.Clock) { + if !suspended && rule.ShouldSubmitRule(r.Clock) { windowStart, windowEnd := rule.NextExecutionWindow(r.Clock) // Use inclusive end for the query by subtracting OneTick (100ns) to avoid boundary issues @@ -109,10 +131,12 @@ func (r *SummaryRuleReconciler) Reconcile(ctx context.Context, req ctrl.Request) } // Backfill any missing async operations based on last successful execution time - rule.BackfillAsyncOperations(r.Clock) + if !suspended { + rule.BackfillAsyncOperations(r.Clock) + } // Track/advance outstanding async operations (including backlog windows) - r.trackAsyncOperations(ctx, &rule) + r.trackAsyncOperations(ctx, &rule, suspended) // Persist any changes made to async operation conditions or timestamps if err := r.Status().Update(ctx, &rule); err != nil { @@ -121,7 +145,7 @@ func (r *SummaryRuleReconciler) Reconcile(ctx context.Context, req ctrl.Request) } // Emit health / status log (best-effort, after status update to reflect latest view) - r.emitHealthLog(&rule) + r.emitHealthLog(&rule, suspended) // Requeue based on rule interval and async polling needs return ctrl.Result{RequeueAfter: nextRequeue(&rule)}, nil @@ -184,6 +208,118 @@ func (r *SummaryRuleReconciler) isDatabaseManaged(rule *adxmonv1.SummaryRule) bo return ok } +func (r *SummaryRuleReconciler) evaluateSuspension(rule *adxmonv1.SummaryRule) (bool, string) { + if r.AutoSuspendEvaluator != nil { + if r.AutoSuspendEvaluator(rule) { + return true, "" + } + return false, "" + } + return r.shouldAutoSuspend(rule) +} + +func (r *SummaryRuleReconciler) shouldAutoSuspend(rule *adxmonv1.SummaryRule) (bool, string) { + clk := r.Clock + if clk == nil { + clk = clock.RealClock{} + } + + threshold := autoSuspendThresholdForRule(rule) + if threshold <= 0 { + return false, "" + } + + failed := meta.FindStatusCondition(rule.Status.Conditions, adxmonv1.ConditionFailed) + if failed == nil || failed.Status != metav1.ConditionTrue { + return false, "" + } + if failed.ObservedGeneration != rule.GetGeneration() { + return false, "" + } + if !isThrottleFailure(failed) { + return false, "" + } + if failed.LastTransitionTime.IsZero() { + return false, "" + } + + completed := meta.FindStatusCondition(rule.Status.Conditions, adxmonv1.ConditionCompleted) + if completed != nil && completed.Status == metav1.ConditionTrue { + if completed.LastTransitionTime.After(failed.LastTransitionTime.Time) { + return false, "" + } + } + + elapsed := clk.Now().Sub(failed.LastTransitionTime.Time) + if elapsed < threshold { + return false, "" + } + + message := fmt.Sprintf("%s after sustained throttling (elapsed=%s, threshold=%s)", defaultSuspendedMessage, formatDuration(elapsed), formatDuration(threshold)) + return true, message +} + +func (r *SummaryRuleReconciler) markSuspendedStatus(rule *adxmonv1.SummaryRule, message string) { + if message == "" { + message = defaultSuspendedMessage + } + condition := metav1.Condition{ + Type: adxmonv1.SummaryRuleOwner, + Status: metav1.ConditionFalse, + Reason: "Suspended", + Message: message, + ObservedGeneration: rule.GetGeneration(), + LastTransitionTime: metav1.Now(), + } + existing := meta.FindStatusCondition(rule.Status.Conditions, adxmonv1.SummaryRuleOwner) + if existing != nil && existing.Status == condition.Status && existing.Reason == condition.Reason && existing.Message == condition.Message { + return + } + meta.SetStatusCondition(&rule.Status.Conditions, condition) +} + +func autoSuspendThresholdForRule(rule *adxmonv1.SummaryRule) time.Duration { + interval := rule.Spec.Interval.Duration + if interval <= 0 { + return autoSuspendMinDuration + } + base := time.Duration(autoSuspendIntervalMultiplier) * interval + if base < autoSuspendMinDuration { + return autoSuspendMinDuration + } + return base +} + +func isThrottleFailure(cond *metav1.Condition) bool { + if cond == nil { + return false + } + if cond.Status != metav1.ConditionTrue { + return false + } + if cond.Message == "" { + return false + } + lower := strings.ToLower(cond.Message) + for _, indicator := range throttleErrorIndicators { + if strings.Contains(lower, indicator) { + return true + } + } + return false +} + +func formatDuration(d time.Duration) string { + if d <= 0 { + return "0s" + } + // Prefer second precision to avoid overly long fractional strings. + if d < time.Second { + return d.String() + } + return d.Truncate(time.Second).String() +} + // (criteria match handled inline for clarity) // adoptIfDesired tries to claim ownership if requested and safe. It returns (result, handled) @@ -403,17 +539,20 @@ func (r *SummaryRuleReconciler) getOperation(ctx context.Context, database strin } // trackAsyncOperations iterates current async ops and processes completed/retry/backlog entries -func (r *SummaryRuleReconciler) trackAsyncOperations(ctx context.Context, rule *adxmonv1.SummaryRule) { +func (r *SummaryRuleReconciler) trackAsyncOperations(ctx context.Context, rule *adxmonv1.SummaryRule, suspended bool) { // Reviewer note addressed: this method previously handled backlog lookup, status polling, // completion, and retry logic inline. It is now a simple iterator delegating responsibilities // to focused helpers for readability & testability. for _, op := range rule.GetAsyncOperations() { - r.processAsyncOperation(ctx, rule, op) + r.processAsyncOperation(ctx, rule, op, suspended) } } // processAsyncOperation routes an async operation to the correct handler based on its state. -func (r *SummaryRuleReconciler) processAsyncOperation(ctx context.Context, rule *adxmonv1.SummaryRule, op adxmonv1.AsyncOperation) { +func (r *SummaryRuleReconciler) processAsyncOperation(ctx context.Context, rule *adxmonv1.SummaryRule, op adxmonv1.AsyncOperation, suspended bool) { + if suspended && op.OperationId == "" { + return + } if op.OperationId == "" { // backlog entry (submission previously failed) r.processBacklogOperation(ctx, rule, op) return @@ -428,6 +567,10 @@ func (r *SummaryRuleReconciler) processAsyncOperation(ctx context.Context, rule case status.State == stateCompleted || status.State == stateFailed: r.handleCompletedOperation(ctx, rule, *status) case status.ShouldRetry != 0: + if suspended { + logger.Infof("Skipping retry for async operation %s on suspended rule %s.%s", status.OperationId, rule.Spec.Database, rule.Name) + return + } r.handleRetryOperation(ctx, rule, op, *status) } } @@ -521,7 +664,7 @@ func (r *SummaryRuleReconciler) updateLastExecutionTimeIfForward(rule *adxmonv1. // criteria matched, no stale async operations, no detected interval gap beyond one interval, and // (if due) submission succeeded or (if not due) simply waiting. // This avoids expensive downstream Kusto correlation for basic liveness checks. -func (r *SummaryRuleReconciler) emitHealthLog(rule *adxmonv1.SummaryRule) { +func (r *SummaryRuleReconciler) emitHealthLog(rule *adxmonv1.SummaryRule, suspended bool) { // Only log for rules the exporter owns or is attempting to adopt; skip others to reduce noise. owned := crdownership.IsOwnedBy(rule, adxmonv1.SummaryRuleOwnerADXExporter) wantsExporter := crdownership.WantsOwner(rule, adxmonv1.SummaryRuleOwnerADXExporter) @@ -591,11 +734,14 @@ func (r *SummaryRuleReconciler) emitHealthLog(rule *adxmonv1.SummaryRule) { } } - // Determine due & submitted from conditions: we treat due purely as whether it *would* be submitted now - due := rule.ShouldSubmitRule(r.Clock) + // Determine due & submitted from conditions: suspended rules are never due + due := false + if !suspended { + due = rule.ShouldSubmitRule(r.Clock) + } submitted := false // We approximate submission success by checking if due and we advanced LastSuccessfulExecutionTime within this reconcile. - if due && lastExec != nil { + if !suspended && due && lastExec != nil { // If lastExec within <= interval of now (with delay considered) we consider that just submitted. if effectiveNow.Sub(*lastExec) < (interval / 2) { // heuristic; avoids false positive if reconcile jitter small submitted = true @@ -605,23 +751,28 @@ func (r *SummaryRuleReconciler) emitHealthLog(rule *adxmonv1.SummaryRule) { // Working state & reason working := true degraded := "" - switch { - case !r.isDatabaseManaged(rule): - working, degraded = false, "unmanaged_database" - case func() bool { - ok, _, _, _ := EvaluateExecutionCriteria(rule.Spec.Criteria, rule.Spec.CriteriaExpression, r.ClusterLabels) - return !ok - }(): - working, degraded = false, "criteria_mismatch" - case !owned && wantsExporter: - working, degraded = false, "ownership_pending" - case due && !submitted: - // If it was due but not (apparently) submitted this cycle - working, degraded = false, "submission_missed" - case staleAsync: - working, degraded = false, "async_stale" - case gapDetected: - working, degraded = false, "backlog_gap" + if suspended { + working = false + degraded = "suspended" + } else { + switch { + case !r.isDatabaseManaged(rule): + working, degraded = false, "unmanaged_database" + case func() bool { + ok, _, _, _ := EvaluateExecutionCriteria(rule.Spec.Criteria, rule.Spec.CriteriaExpression, r.ClusterLabels) + return !ok + }(): + working, degraded = false, "criteria_mismatch" + case !owned && wantsExporter: + working, degraded = false, "ownership_pending" + case due && !submitted: + // If it was due but not (apparently) submitted this cycle + working, degraded = false, "submission_missed" + case staleAsync: + working, degraded = false, "async_stale" + case gapDetected: + working, degraded = false, "backlog_gap" + } } l := logger.Logger() @@ -641,6 +792,7 @@ func (r *SummaryRuleReconciler) emitHealthLog(rule *adxmonv1.SummaryRule) { kv = append(kv, "due", due, "submitted", submitted, + "suspended", suspended, "async_inflight", inflight, "async_backlog", backlog, "stale_async", staleAsync, diff --git a/adxexporter/summaryrule_test.go b/adxexporter/summaryrule_test.go index 958d8bbea..1e1526841 100644 --- a/adxexporter/summaryrule_test.go +++ b/adxexporter/summaryrule_test.go @@ -600,3 +600,283 @@ func TestSummaryRule_getOperation_Parsing(t *testing.T) { require.NoError(t, err) require.Nil(t, st) } + +func TestSummaryRule_SuspendSkipsSubmission(t *testing.T) { + ensureTestVFlagSetT(t) + now := time.Now() + rule := &adxmonv1.SummaryRule{ + ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "sr-suspend", Annotations: map[string]string{adxmonv1.SummaryRuleOwnerAnnotation: adxmonv1.SummaryRuleOwnerADXExporter}}, + Spec: adxmonv1.SummaryRuleSpec{ + Database: "testdb", + Table: "T", + Interval: metav1.Duration{Duration: time.Hour}, + Body: "Body", + }, + } + c := newFakeClientWithRule(t, rule) + mock := NewMockKustoExecutor(t, "testdb", "https://test") + r := newBaseReconciler(t, c, mock, now) + r.AutoSuspendEvaluator = func(*adxmonv1.SummaryRule) bool { return true } + + _, err := r.Reconcile(context.Background(), ctrl.Request{NamespacedName: client.ObjectKeyFromObject(rule)}) + require.NoError(t, err) + + var updated adxmonv1.SummaryRule + require.NoError(t, c.Get(context.Background(), client.ObjectKeyFromObject(rule), &updated)) + require.Len(t, updated.GetAsyncOperations(), 0) + cond := updated.GetCondition() + require.NotNil(t, cond) + require.Equal(t, metav1.ConditionFalse, cond.Status) + require.Equal(t, "Suspended", cond.Reason) + require.Equal(t, "SummaryRule execution is suspended", cond.Message) + + require.Empty(t, mock.GetQueries()) +} + +func TestSummaryRule_SuspendKeepsBacklogIdle(t *testing.T) { + ensureTestVFlagSetT(t) + now := time.Now().UTC() + rule := &adxmonv1.SummaryRule{ + ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "sr-suspend-backlog", Annotations: map[string]string{adxmonv1.SummaryRuleOwnerAnnotation: adxmonv1.SummaryRuleOwnerADXExporter}}, + Spec: adxmonv1.SummaryRuleSpec{ + Database: "testdb", + Table: "T", + Interval: metav1.Duration{Duration: time.Hour}, + Body: "Body", + }, + } + start := now.Add(-time.Hour) + endInclusive := start.Add(time.Hour).Add(-kustoutil.OneTick) + rule.SetAsyncOperation(adxmonv1.AsyncOperation{OperationId: "", StartTime: start.Format(time.RFC3339Nano), EndTime: endInclusive.Format(time.RFC3339Nano)}) + + c := newFakeClientWithRule(t, rule) + mock := NewMockKustoExecutor(t, "testdb", "https://test") + r := newBaseReconciler(t, c, mock, now) + r.AutoSuspendEvaluator = func(*adxmonv1.SummaryRule) bool { return true } + + _, err := r.Reconcile(context.Background(), ctrl.Request{NamespacedName: client.ObjectKeyFromObject(rule)}) + require.NoError(t, err) + + var updated adxmonv1.SummaryRule + require.NoError(t, c.Get(context.Background(), client.ObjectKeyFromObject(rule), &updated)) + ops := updated.GetAsyncOperations() + require.Len(t, ops, 1) + require.Equal(t, "", ops[0].OperationId) + + require.Empty(t, mock.GetQueries()) +} + +func TestSummaryRule_SuspendTracksInflightOperations(t *testing.T) { + ensureTestVFlagSetT(t) + now := time.Now().UTC() + rule := &adxmonv1.SummaryRule{ + ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "sr-suspend-track", Annotations: map[string]string{adxmonv1.SummaryRuleOwnerAnnotation: adxmonv1.SummaryRuleOwnerADXExporter}}, + Spec: adxmonv1.SummaryRuleSpec{ + Database: "testdb", + Table: "T", + Interval: metav1.Duration{Duration: time.Hour}, + Body: "Body", + }, + } + rule.SetAsyncOperation(adxmonv1.AsyncOperation{OperationId: "op-1", StartTime: now.Add(-time.Hour).Format(time.RFC3339Nano), EndTime: now.Format(time.RFC3339Nano)}) + + c := newFakeClientWithRule(t, rule) + mock := NewMockKustoExecutor(t, "testdb", "https://test") + cols := table.Columns{{Name: "LastUpdatedOn", Type: types.DateTime}, {Name: "OperationId", Type: types.String}, {Name: "State", Type: types.String}, {Name: "ShouldRetry", Type: types.Real}, {Name: "Status", Type: types.String}} + rows := []value.Values{{ + value.DateTime{Value: now, Valid: true}, + value.String{Value: "op-1", Valid: true}, + value.String{Value: "Completed", Valid: true}, + value.Real{Value: 0, Valid: true}, + value.String{Value: "", Valid: true}, + }} + mock.results = append(mock.results, createRowIteratorFromMockRows(t, cols, rows)) + r := newBaseReconciler(t, c, mock, now) + r.AutoSuspendEvaluator = func(*adxmonv1.SummaryRule) bool { return true } + + _, err := r.Reconcile(context.Background(), ctrl.Request{NamespacedName: client.ObjectKeyFromObject(rule)}) + require.NoError(t, err) + + var updated adxmonv1.SummaryRule + require.NoError(t, c.Get(context.Background(), client.ObjectKeyFromObject(rule), &updated)) + require.Len(t, updated.GetAsyncOperations(), 0) + + queries := mock.GetQueries() + require.NotEmpty(t, queries) + require.Contains(t, queries[0], ".show operations") +} + +func TestSummaryRule_SuspendSkipsRetryResubmission(t *testing.T) { + ensureTestVFlagSetT(t) + now := time.Now().UTC() + rule := &adxmonv1.SummaryRule{ + ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "sr-suspend-retry", Annotations: map[string]string{adxmonv1.SummaryRuleOwnerAnnotation: adxmonv1.SummaryRuleOwnerADXExporter}}, + Spec: adxmonv1.SummaryRuleSpec{ + Database: "testdb", + Table: "T", + Interval: metav1.Duration{Duration: time.Hour}, + Body: "Body", + }, + } + rule.SetAsyncOperation(adxmonv1.AsyncOperation{OperationId: "op-1", StartTime: now.Add(-time.Hour).Format(time.RFC3339Nano), EndTime: now.Format(time.RFC3339Nano)}) + + c := newFakeClientWithRule(t, rule) + mock := NewMockKustoExecutor(t, "testdb", "https://test") + cols := table.Columns{{Name: "LastUpdatedOn", Type: types.DateTime}, {Name: "OperationId", Type: types.String}, {Name: "State", Type: types.String}, {Name: "ShouldRetry", Type: types.Real}, {Name: "Status", Type: types.String}} + rows := []value.Values{{ + value.DateTime{Value: now, Valid: true}, + value.String{Value: "op-1", Valid: true}, + value.String{Value: "InProgress", Valid: true}, + value.Real{Value: 1, Valid: true}, + value.String{Value: "retry advised", Valid: true}, + }} + mock.results = append(mock.results, createRowIteratorFromMockRows(t, cols, rows)) + + r := newBaseReconciler(t, c, mock, now) + r.AutoSuspendEvaluator = func(*adxmonv1.SummaryRule) bool { return true } + + _, err := r.Reconcile(context.Background(), ctrl.Request{NamespacedName: client.ObjectKeyFromObject(rule)}) + require.NoError(t, err) + + queries := mock.GetQueries() + require.Len(t, queries, 1) + require.Contains(t, queries[0], ".show operations") + + var updated adxmonv1.SummaryRule + require.NoError(t, c.Get(context.Background(), client.ObjectKeyFromObject(rule), &updated)) + ops := updated.GetAsyncOperations() + require.Len(t, ops, 1) + require.Equal(t, "op-1", ops[0].OperationId) +} + +func TestSummaryRule_ShouldAutoSuspend(t *testing.T) { + ensureTestVFlagSetT(t) + now := time.Now().UTC() + gen := int64(3) + reconciler := &SummaryRuleReconciler{Clock: klock.NewFakeClock(now)} + + makeRule := func(interval time.Duration, failureAge time.Duration, message string, observed int64, completed *metav1.Condition) *adxmonv1.SummaryRule { + condFailed := metav1.Condition{ + Type: adxmonv1.ConditionFailed, + Status: metav1.ConditionTrue, + Reason: "ExecutionFailed", + Message: message, + ObservedGeneration: observed, + LastTransitionTime: metav1.NewTime(now.Add(-failureAge)), + } + rule := &adxmonv1.SummaryRule{ + ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "sr-auto", Generation: gen}, + Spec: adxmonv1.SummaryRuleSpec{ + Database: "testdb", + Table: "T", + Interval: metav1.Duration{Duration: interval}, + Body: "Body", + }, + Status: adxmonv1.SummaryRuleStatus{Conditions: []metav1.Condition{condFailed}}, + } + if completed != nil { + completed.ObservedGeneration = observed + rule.Status.Conditions = append(rule.Status.Conditions, *completed) + } + return rule + } + + cases := []struct { + name string + rule *adxmonv1.SummaryRule + expect bool + }{ + { + name: "sustained throttling triggers suspension", + rule: makeRule(time.Minute, 30*time.Minute, "Partial query failure: Query throttled (E_QUERY_THROTTLED)", gen, nil), + expect: true, + }, + { + name: "below threshold does not suspend", + rule: makeRule(time.Minute, 5*time.Minute, "Partial query failure: Query throttled (E_QUERY_THROTTLED)", gen, nil), + expect: false, + }, + { + name: "non throttle failure ignored", + rule: makeRule(time.Minute, 30*time.Minute, "Partial query failure: Query timeout", gen, nil), + expect: false, + }, + { + name: "stale generation ignored", + rule: makeRule(time.Minute, 30*time.Minute, "Partial query failure: Query throttled (E_QUERY_THROTTLED)", gen-1, nil), + expect: false, + }, + { + name: "recent completion resets", + rule: makeRule(time.Minute, 30*time.Minute, "Partial query failure: Query throttled (E_QUERY_THROTTLED)", gen, &metav1.Condition{ + Type: adxmonv1.ConditionCompleted, + Status: metav1.ConditionTrue, + Reason: "ExecutionSuccessful", + Message: "Most recent submission succeeded", + LastTransitionTime: metav1.NewTime(now.Add(-10 * time.Minute)), + }), + expect: false, + }, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + suspend, _ := reconciler.shouldAutoSuspend(tc.rule) + require.Equal(t, tc.expect, suspend) + }) + } +} + +func TestSummaryRule_AutoSuspendUpdatesCondition(t *testing.T) { + ensureTestVFlagSetT(t) + now := time.Now().UTC() + gen := int64(4) + rule := &adxmonv1.SummaryRule{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "sr-auto-condition", + Generation: gen, + Annotations: map[string]string{adxmonv1.SummaryRuleOwnerAnnotation: adxmonv1.SummaryRuleOwnerADXExporter}, + }, + Spec: adxmonv1.SummaryRuleSpec{ + Database: "testdb", + Table: "T", + Interval: metav1.Duration{Duration: time.Minute}, + Body: "Body", + }, + Status: adxmonv1.SummaryRuleStatus{Conditions: []metav1.Condition{ + { + Type: adxmonv1.ConditionFailed, + Status: metav1.ConditionTrue, + Reason: "ExecutionFailed", + Message: "Partial query failure: Query throttled (E_QUERY_THROTTLED)", + ObservedGeneration: gen, + LastTransitionTime: metav1.NewTime(now.Add(-30 * time.Minute)), + }, + { + Type: adxmonv1.ConditionCompleted, + Status: metav1.ConditionFalse, + Reason: "ExecutionFailed", + Message: "Partial query failure: Query throttled (E_QUERY_THROTTLED)", + ObservedGeneration: gen, + LastTransitionTime: metav1.NewTime(now.Add(-30 * time.Minute)), + }, + }}, + } + c := newFakeClientWithRule(t, rule) + mock := NewMockKustoExecutor(t, "testdb", "https://test") + r := newBaseReconciler(t, c, mock, now) + + _, err := r.Reconcile(context.Background(), ctrl.Request{NamespacedName: client.ObjectKeyFromObject(rule)}) + require.NoError(t, err) + + var updated adxmonv1.SummaryRule + require.NoError(t, c.Get(context.Background(), client.ObjectKeyFromObject(rule), &updated)) + ownerCond := updated.GetCondition() + require.NotNil(t, ownerCond) + require.Equal(t, metav1.ConditionFalse, ownerCond.Status) + require.Equal(t, "Suspended", ownerCond.Reason) + require.Contains(t, ownerCond.Message, "sustained throttling") + require.Empty(t, mock.GetQueries()) +} diff --git a/docs/crds.md b/docs/crds.md index fa90e8ba6..bc48f1891 100644 --- a/docs/crds.md +++ b/docs/crds.md @@ -262,6 +262,8 @@ SummaryRules are managed by the Ingestor's `SummaryRuleTask`, which runs periodi 5. **Resilience**: Handles ADX cluster restarts, network issues, and operation retries. Operations older than 25 hours are automatically cleaned up if they fall out of ADX's 24-hour operations window. +6. **Auto-Suspension Guardrails**: The ADX Exporter automatically suspends a SummaryRule when it sees sustained throttling from ADX. The guardrail triggers once the failure condition shows throttling for at least `max(3 * spec.interval, 15m)` without a successful completion. Suspended rules surface a `SummaryRuleOwner` condition with reason `Suspended` and a message that includes the elapsed throttling duration and the threshold. The controller does not automatically resume execution; clearing suspension requires operator intervention (for example, editing the rule to produce a new generation or manually updating the status condition after addressing the underlying throttling). + **Execution Triggers**: Rules are submitted when: - Rule is being deleted - Rule was updated (new generation)