Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 182 additions & 30 deletions adxexporter/summaryrule.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"strings"
"time"

adxmonv1 "github.com/Azure/adx-mon/api/v1"
Expand All @@ -28,8 +29,22 @@ const (

stateCompleted = "Completed"
stateFailed = "Failed"

autoSuspendMinDuration = 15 * time.Minute
autoSuspendIntervalMultiplier = 3
defaultSuspendedMessage = "SummaryRule execution is suspended"
)

var throttleErrorIndicators = []string{
"e_query_throttled",
"query throttled",
"too many requests",
"request is throttled",
"rate limit",
"status code 429",
"http status code 429",
}

// SummaryRuleReconciler will reconcile SummaryRule objects.
type SummaryRuleReconciler struct {
client.Client
Expand All @@ -41,6 +56,8 @@ type SummaryRuleReconciler struct {

KustoExecutors map[string]KustoExecutor // per-database Kusto clients supporting Query and Mgmt
Clock clock.Clock

AutoSuspendEvaluator func(*adxmonv1.SummaryRule) bool
}

// Reconcile processes a single SummaryRule; placeholder implementation for now.
Expand Down Expand Up @@ -79,12 +96,17 @@ func (r *SummaryRuleReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil
}

if requeue, handled := r.adoptIfDesired(ctx, &rule); handled {
return requeue, nil
suspended, suspendMsg := r.evaluateSuspension(&rule)
if suspended {
r.markSuspendedStatus(&rule, suspendMsg)
} else {
if requeue, handled := r.adoptIfDesired(ctx, &rule); handled {
return requeue, nil
}
}

// Minimal submission path: compute window and submit async if it's time
if rule.ShouldSubmitRule(r.Clock) {
if !suspended && rule.ShouldSubmitRule(r.Clock) {
windowStart, windowEnd := rule.NextExecutionWindow(r.Clock)

// Use inclusive end for the query by subtracting OneTick (100ns) to avoid boundary issues
Expand All @@ -109,10 +131,12 @@ func (r *SummaryRuleReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}

// Backfill any missing async operations based on last successful execution time
rule.BackfillAsyncOperations(r.Clock)
if !suspended {
rule.BackfillAsyncOperations(r.Clock)
}

// Track/advance outstanding async operations (including backlog windows)
r.trackAsyncOperations(ctx, &rule)
r.trackAsyncOperations(ctx, &rule, suspended)

// Persist any changes made to async operation conditions or timestamps
if err := r.Status().Update(ctx, &rule); err != nil {
Expand All @@ -121,7 +145,7 @@ func (r *SummaryRuleReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}

// Emit health / status log (best-effort, after status update to reflect latest view)
r.emitHealthLog(&rule)
r.emitHealthLog(&rule, suspended)

// Requeue based on rule interval and async polling needs
return ctrl.Result{RequeueAfter: nextRequeue(&rule)}, nil
Expand Down Expand Up @@ -184,6 +208,118 @@ func (r *SummaryRuleReconciler) isDatabaseManaged(rule *adxmonv1.SummaryRule) bo
return ok
}

func (r *SummaryRuleReconciler) evaluateSuspension(rule *adxmonv1.SummaryRule) (bool, string) {
if r.AutoSuspendEvaluator != nil {
if r.AutoSuspendEvaluator(rule) {
return true, ""
}
return false, ""
}
return r.shouldAutoSuspend(rule)
}

func (r *SummaryRuleReconciler) shouldAutoSuspend(rule *adxmonv1.SummaryRule) (bool, string) {
clk := r.Clock
if clk == nil {
clk = clock.RealClock{}
}

threshold := autoSuspendThresholdForRule(rule)
if threshold <= 0 {
return false, ""
}

failed := meta.FindStatusCondition(rule.Status.Conditions, adxmonv1.ConditionFailed)
if failed == nil || failed.Status != metav1.ConditionTrue {
return false, ""
}
if failed.ObservedGeneration != rule.GetGeneration() {
return false, ""
}
if !isThrottleFailure(failed) {
return false, ""
}
if failed.LastTransitionTime.IsZero() {
return false, ""
}

completed := meta.FindStatusCondition(rule.Status.Conditions, adxmonv1.ConditionCompleted)
if completed != nil && completed.Status == metav1.ConditionTrue {
if completed.LastTransitionTime.After(failed.LastTransitionTime.Time) {
return false, ""
}
}

elapsed := clk.Now().Sub(failed.LastTransitionTime.Time)
if elapsed < threshold {
return false, ""
}

message := fmt.Sprintf("%s after sustained throttling (elapsed=%s, threshold=%s)", defaultSuspendedMessage, formatDuration(elapsed), formatDuration(threshold))
return true, message
}

func (r *SummaryRuleReconciler) markSuspendedStatus(rule *adxmonv1.SummaryRule, message string) {
if message == "" {
message = defaultSuspendedMessage
}
condition := metav1.Condition{
Type: adxmonv1.SummaryRuleOwner,
Status: metav1.ConditionFalse,
Reason: "Suspended",
Message: message,
ObservedGeneration: rule.GetGeneration(),
LastTransitionTime: metav1.Now(),
}
existing := meta.FindStatusCondition(rule.Status.Conditions, adxmonv1.SummaryRuleOwner)
if existing != nil && existing.Status == condition.Status && existing.Reason == condition.Reason && existing.Message == condition.Message {
return
}
meta.SetStatusCondition(&rule.Status.Conditions, condition)
}

func autoSuspendThresholdForRule(rule *adxmonv1.SummaryRule) time.Duration {
interval := rule.Spec.Interval.Duration
if interval <= 0 {
return autoSuspendMinDuration
}
base := time.Duration(autoSuspendIntervalMultiplier) * interval
if base < autoSuspendMinDuration {
return autoSuspendMinDuration
}
return base
}

func isThrottleFailure(cond *metav1.Condition) bool {
if cond == nil {
return false
}
if cond.Status != metav1.ConditionTrue {
return false
}
if cond.Message == "" {
return false
}
lower := strings.ToLower(cond.Message)
for _, indicator := range throttleErrorIndicators {
if strings.Contains(lower, indicator) {
return true
}
}
return false
}

func formatDuration(d time.Duration) string {
if d <= 0 {
return "0s"
}
// Prefer second precision to avoid overly long fractional strings.
if d < time.Second {
return d.String()
}
return d.Truncate(time.Second).String()
}

// (criteria match handled inline for clarity)

// adoptIfDesired tries to claim ownership if requested and safe. It returns (result, handled)
Expand Down Expand Up @@ -403,17 +539,20 @@ func (r *SummaryRuleReconciler) getOperation(ctx context.Context, database strin
}

// trackAsyncOperations iterates current async ops and processes completed/retry/backlog entries
func (r *SummaryRuleReconciler) trackAsyncOperations(ctx context.Context, rule *adxmonv1.SummaryRule) {
func (r *SummaryRuleReconciler) trackAsyncOperations(ctx context.Context, rule *adxmonv1.SummaryRule, suspended bool) {
// Reviewer note addressed: this method previously handled backlog lookup, status polling,
// completion, and retry logic inline. It is now a simple iterator delegating responsibilities
// to focused helpers for readability & testability.
for _, op := range rule.GetAsyncOperations() {
r.processAsyncOperation(ctx, rule, op)
r.processAsyncOperation(ctx, rule, op, suspended)
}
}

// processAsyncOperation routes an async operation to the correct handler based on its state.
func (r *SummaryRuleReconciler) processAsyncOperation(ctx context.Context, rule *adxmonv1.SummaryRule, op adxmonv1.AsyncOperation) {
func (r *SummaryRuleReconciler) processAsyncOperation(ctx context.Context, rule *adxmonv1.SummaryRule, op adxmonv1.AsyncOperation, suspended bool) {
if suspended && op.OperationId == "" {
return
}
if op.OperationId == "" { // backlog entry (submission previously failed)
r.processBacklogOperation(ctx, rule, op)
return
Expand All @@ -428,6 +567,10 @@ func (r *SummaryRuleReconciler) processAsyncOperation(ctx context.Context, rule
case status.State == stateCompleted || status.State == stateFailed:
r.handleCompletedOperation(ctx, rule, *status)
case status.ShouldRetry != 0:
if suspended {
logger.Infof("Skipping retry for async operation %s on suspended rule %s.%s", status.OperationId, rule.Spec.Database, rule.Name)
return
}
r.handleRetryOperation(ctx, rule, op, *status)
}
}
Expand Down Expand Up @@ -521,7 +664,7 @@ func (r *SummaryRuleReconciler) updateLastExecutionTimeIfForward(rule *adxmonv1.
// criteria matched, no stale async operations, no detected interval gap beyond one interval, and
// (if due) submission succeeded or (if not due) simply waiting.
// This avoids expensive downstream Kusto correlation for basic liveness checks.
func (r *SummaryRuleReconciler) emitHealthLog(rule *adxmonv1.SummaryRule) {
func (r *SummaryRuleReconciler) emitHealthLog(rule *adxmonv1.SummaryRule, suspended bool) {
// Only log for rules the exporter owns or is attempting to adopt; skip others to reduce noise.
owned := crdownership.IsOwnedBy(rule, adxmonv1.SummaryRuleOwnerADXExporter)
wantsExporter := crdownership.WantsOwner(rule, adxmonv1.SummaryRuleOwnerADXExporter)
Expand Down Expand Up @@ -591,11 +734,14 @@ func (r *SummaryRuleReconciler) emitHealthLog(rule *adxmonv1.SummaryRule) {
}
}

// Determine due & submitted from conditions: we treat due purely as whether it *would* be submitted now
due := rule.ShouldSubmitRule(r.Clock)
// Determine due & submitted from conditions: suspended rules are never due
due := false
if !suspended {
due = rule.ShouldSubmitRule(r.Clock)
}
submitted := false
// We approximate submission success by checking if due and we advanced LastSuccessfulExecutionTime within this reconcile.
if due && lastExec != nil {
if !suspended && due && lastExec != nil {
// If lastExec within <= interval of now (with delay considered) we consider that just submitted.
if effectiveNow.Sub(*lastExec) < (interval / 2) { // heuristic; avoids false positive if reconcile jitter small
submitted = true
Expand All @@ -605,23 +751,28 @@ func (r *SummaryRuleReconciler) emitHealthLog(rule *adxmonv1.SummaryRule) {
// Working state & reason
working := true
degraded := ""
switch {
case !r.isDatabaseManaged(rule):
working, degraded = false, "unmanaged_database"
case func() bool {
ok, _, _, _ := EvaluateExecutionCriteria(rule.Spec.Criteria, rule.Spec.CriteriaExpression, r.ClusterLabels)
return !ok
}():
working, degraded = false, "criteria_mismatch"
case !owned && wantsExporter:
working, degraded = false, "ownership_pending"
case due && !submitted:
// If it was due but not (apparently) submitted this cycle
working, degraded = false, "submission_missed"
case staleAsync:
working, degraded = false, "async_stale"
case gapDetected:
working, degraded = false, "backlog_gap"
if suspended {
working = false
degraded = "suspended"
} else {
switch {
case !r.isDatabaseManaged(rule):
working, degraded = false, "unmanaged_database"
case func() bool {
ok, _, _, _ := EvaluateExecutionCriteria(rule.Spec.Criteria, rule.Spec.CriteriaExpression, r.ClusterLabels)
return !ok
}():
working, degraded = false, "criteria_mismatch"
case !owned && wantsExporter:
working, degraded = false, "ownership_pending"
case due && !submitted:
// If it was due but not (apparently) submitted this cycle
working, degraded = false, "submission_missed"
case staleAsync:
working, degraded = false, "async_stale"
case gapDetected:
working, degraded = false, "backlog_gap"
}
}

l := logger.Logger()
Expand All @@ -641,6 +792,7 @@ func (r *SummaryRuleReconciler) emitHealthLog(rule *adxmonv1.SummaryRule) {
kv = append(kv,
"due", due,
"submitted", submitted,
"suspended", suspended,
"async_inflight", inflight,
"async_backlog", backlog,
"stale_async", staleAsync,
Expand Down
Loading
Loading