Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 38 additions & 10 deletions pkg/alertmanager/alertmanager_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,29 +27,56 @@ const (
RingNumTokens = 128
)

// RingOp is the operation used for reading/writing to the alertmanagers.
// Original ring operations (with extension enabled for backward compatibility)
var RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, func(s ring.InstanceState) bool {
// Only ACTIVE Alertmanager get requests. If instance is not ACTIVE, we need to find another Alertmanager.
// Original behavior: extend replica set if instance is not ACTIVE
return s != ring.ACTIVE
})

// SyncRingOp is the operation used for checking if a user is owned by an alertmanager.
var SyncRingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE, ring.JOINING}, func(s ring.InstanceState) bool {
// Original behavior: extend replica set if instance is not ACTIVE
return s != ring.ACTIVE
})

// Blast radius limited ring operations (with extension disabled)
var RingOpNoExtension = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, func(s ring.InstanceState) bool {
// Never extend replica set to limit blast radius during config corruption incidents
return false
})

var SyncRingOpNoExtension = ring.NewOp([]ring.InstanceState{ring.ACTIVE, ring.JOINING}, func(s ring.InstanceState) bool {
// Never extend replica set during sync to limit blast radius during config corruption incidents
return false
})

// Helper functions to select the appropriate ring operation based on config
func GetRingOp(disableExtension bool) ring.Operation {
if disableExtension {
return RingOpNoExtension
}
return RingOp
}

func GetSyncRingOp(disableExtension bool) ring.Operation {
if disableExtension {
return SyncRingOpNoExtension
}
return SyncRingOp
}

// RingConfig masks the ring lifecycler config which contains
// many options not really required by the alertmanager ring. This config
// is used to strip down the config to the minimum, and avoid confusion
// to the user.
type RingConfig struct {
KVStore kv.Config `yaml:"kvstore" doc:"description=The key-value store used to share the hash ring across multiple instances."`
HeartbeatPeriod time.Duration `yaml:"heartbeat_period"`
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`
ReplicationFactor int `yaml:"replication_factor"`
ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"`
TokensFilePath string `yaml:"tokens_file_path"`
DetailedMetricsEnabled bool `yaml:"detailed_metrics_enabled"`
KVStore kv.Config `yaml:"kvstore" doc:"description=The key-value store used to share the hash ring across multiple instances."`
HeartbeatPeriod time.Duration `yaml:"heartbeat_period"`
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`
ReplicationFactor int `yaml:"replication_factor"`
ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"`
TokensFilePath string `yaml:"tokens_file_path"`
DetailedMetricsEnabled bool `yaml:"detailed_metrics_enabled"`
DisableReplicaSetExtension bool `yaml:"disable_replica_set_extension"`

FinalSleep time.Duration `yaml:"final_sleep"`
WaitInstanceStateTimeout time.Duration `yaml:"wait_instance_state_timeout"`
Expand Down Expand Up @@ -90,6 +117,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.ZoneAwarenessEnabled, rfprefix+"zone-awareness-enabled", false, "True to enable zone-awareness and replicate alerts across different availability zones.")
f.StringVar(&cfg.TokensFilePath, rfprefix+"tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.")
f.BoolVar(&cfg.DetailedMetricsEnabled, rfprefix+"detailed-metrics-enabled", true, "Set to true to enable ring detailed metrics. These metrics provide detailed information, such as token count and ownership per tenant. Disabling them can significantly decrease the number of metrics emitted.")
f.BoolVar(&cfg.DisableReplicaSetExtension, rfprefix+"disable-replica-set-extension", false, "Disable extending the replica set when instances are unhealthy. This limits blast radius during config corruption incidents but reduces availability during normal failures.")

// Instance flags
cfg.InstanceInterfaceNames = []string{"eth0", "en0"}
Expand Down
49 changes: 49 additions & 0 deletions pkg/alertmanager/alertmanager_ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,52 @@ func TestIsHealthyForAlertmanagerOperations(t *testing.T) {
})
}
}

func TestBlastRadiusProtection(t *testing.T) {
t.Parallel()

tests := map[string]struct {
operation ring.Operation
instance *ring.InstanceDesc
timeout time.Duration
expected bool
}{
"RingOp extends to unhealthy ACTIVE instance": {
operation: RingOp,
instance: &ring.InstanceDesc{State: ring.ACTIVE, Timestamp: time.Now().Add(-90 * time.Second).Unix()},
timeout: time.Minute,
expected: false,
},
"RingOpNoExtension excludes unhealthy ACTIVE instance": {
operation: RingOpNoExtension,
instance: &ring.InstanceDesc{State: ring.ACTIVE, Timestamp: time.Now().Add(-90 * time.Second).Unix()},
timeout: time.Minute,
expected: false,
},
"RingOp extends to LEAVING instance": {
operation: RingOp,
instance: &ring.InstanceDesc{State: ring.LEAVING, Timestamp: time.Now().Add(-30 * time.Second).Unix()},
timeout: time.Minute,
expected: false,
},
"RingOpNoExtension excludes LEAVING instance": {
operation: RingOpNoExtension,
instance: &ring.InstanceDesc{State: ring.LEAVING, Timestamp: time.Now().Add(-30 * time.Second).Unix()},
timeout: time.Minute,
expected: false,
},
"Both operations include healthy ACTIVE instance": {
operation: RingOp,
instance: &ring.InstanceDesc{State: ring.ACTIVE, Timestamp: time.Now().Add(-30 * time.Second).Unix()},
timeout: time.Minute,
expected: true,
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
actual := testData.instance.IsHealthy(testData.operation, testData.timeout, time.Now())
assert.Equal(t, testData.expected, actual)
})
}
}
79 changes: 70 additions & 9 deletions pkg/alertmanager/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ type Distributor struct {

alertmanagerRing ring.ReadRing
alertmanagerClientsPool ClientsPool

logger log.Logger
ringConfig RingConfig
logger log.Logger
}

// NewDistributor constructs a new Distributor
func NewDistributor(cfg ClientConfig, maxRecvMsgSize int64, alertmanagersRing *ring.Ring, alertmanagerClientsPool ClientsPool, logger log.Logger, reg prometheus.Registerer) (d *Distributor, err error) {
func NewDistributor(cfg ClientConfig, maxRecvMsgSize int64, alertmanagersRing *ring.Ring, alertmanagerClientsPool ClientsPool, ringConfig RingConfig, logger log.Logger, reg prometheus.Registerer) (d *Distributor, err error) {
if alertmanagerClientsPool == nil {
alertmanagerClientsPool = newAlertmanagerClientsPool(client.NewRingServiceDiscovery(alertmanagersRing), cfg, logger, reg)
}
Expand All @@ -52,6 +52,7 @@ func NewDistributor(cfg ClientConfig, maxRecvMsgSize int64, alertmanagersRing *r
maxRecvMsgSize: maxRecvMsgSize,
alertmanagerRing: alertmanagersRing,
alertmanagerClientsPool: alertmanagerClientsPool,
ringConfig: ringConfig,
}

d.Service = services.NewBasicService(nil, d.running, nil)
Expand Down Expand Up @@ -89,6 +90,9 @@ func (d *Distributor) isQuorumReadPath(p string) (bool, merger.Merger) {
if strings.HasSuffix(path.Dir(p), "/v2/silence") {
return true, merger.V2SilenceID{}
}
if strings.HasSuffix(p, "/v2/receivers") {
return true, merger.V2Receivers{}
}
return false, nil
}

Expand Down Expand Up @@ -160,7 +164,7 @@ func (d *Distributor) doQuorum(userID string, w http.ResponseWriter, r *http.Req
var responses []*httpgrpc.HTTPResponse
var responsesMtx sync.Mutex
grpcHeaders := httpToHttpgrpcHeaders(r.Header)
err = ring.DoBatch(r.Context(), RingOp, d.alertmanagerRing, nil, []uint32{users.ShardByUser(userID)}, func(am ring.InstanceDesc, _ []int) error {
err = ring.DoBatch(r.Context(), GetRingOp(d.ringConfig.DisableReplicaSetExtension), d.alertmanagerRing, nil, []uint32{users.ShardByUser(userID)}, func(am ring.InstanceDesc, _ []int) error {
// Use a background context to make sure all alertmanagers get the request even if we return early.
localCtx := opentracing.ContextWithSpan(user.InjectOrgID(context.Background(), userID), opentracing.SpanFromContext(r.Context()))
sp, localCtx := opentracing.StartSpanFromContext(localCtx, "Distributor.doQuorum")
Expand Down Expand Up @@ -207,7 +211,7 @@ func (d *Distributor) doQuorum(userID string, w http.ResponseWriter, r *http.Req

func (d *Distributor) doUnary(userID string, w http.ResponseWriter, r *http.Request, logger log.Logger) {
key := users.ShardByUser(userID)
replicationSet, err := d.alertmanagerRing.Get(key, RingOp, nil, nil, nil)
replicationSet, err := d.alertmanagerRing.Get(key, GetRingOp(d.ringConfig.DisableReplicaSetExtension), nil, nil, nil)
if err != nil {
level.Error(logger).Log("msg", "failed to get replication set from the ring", "err", err)
w.WriteHeader(http.StatusInternalServerError)
Expand Down Expand Up @@ -244,16 +248,30 @@ func (d *Distributor) doUnary(userID string, w http.ResponseWriter, r *http.Requ
instances[i], instances[j] = instances[j], instances[i]
})
} else {
//Picking 1 instance at Random for Non-Get and Non-Delete Unary Read requests, as shuffling through large number of instances might increase complexity
randN := rand.Intn(len(replicationSet.Instances))
instances = replicationSet.Instances[randN : randN+1]
// For POST requests, add retry logic to PutSilence
if d.isUnaryWritePath(r.URL.Path) {
instances = replicationSet.Instances
rand.Shuffle(len(instances), func(i, j int) {
instances[i], instances[j] = instances[j], instances[i]
})
} else {
// Other POST requests pick 1 instance at Random for Non-Get and Non-Delete Unary Read requests, as shuffling through large number of instances might increase complexity
randN := rand.Intn(len(replicationSet.Instances))
instances = replicationSet.Instances[randN : randN+1]
}
}

var lastErr error
for _, instance := range instances {
resp, err := d.doRequest(ctx, instance, req)
// storing the last error message
if err != nil {
// For PutSilence with non-retryable errors, fail immediately
if d.isUnaryWritePath(r.URL.Path) && !d.isRetryableError(err) {
level.Error(logger).Log("msg", "non-retryable error from alertmanager", "instance", instance.Addr, "err", err)
respondFromError(err, w, logger)
return
}
// storing the last error message
lastErr = err
continue
}
Expand All @@ -267,6 +285,49 @@ func (d *Distributor) doUnary(userID string, w http.ResponseWriter, r *http.Requ
}
}

// isRetryableError determines if an error is retryable (network/availability issues)
// vs non-retryable (bad request, validation errors)
func (d *Distributor) isRetryableError(err error) bool {
if err == nil {
return false
}

// Check if it's an HTTP error with a status code
httpResp, ok := httpgrpc.HTTPResponseFromError(errors.Cause(err))
if ok {
statusCode := int(httpResp.Code)

if statusCode == http.StatusRequestTimeout || statusCode == http.StatusTooManyRequests || statusCode >= 500 {
return true
}

if statusCode >= 400 && statusCode < 500 {
return false
}
}

// Network errors, context errors, etc. are retryable
errorStr := err.Error()
retryablePatterns := []string{
"connection refused",
"connection reset",
"timeout",
"context deadline exceeded",
"no such host",
"network is unreachable",
"broken pipe",
}

for _, pattern := range retryablePatterns {
if strings.Contains(strings.ToLower(errorStr), pattern) {
return true
}
}

// Default to retryable for unknown errors to maximize availability
return true
}

func respondFromError(err error, w http.ResponseWriter, logger log.Logger) {
httpResp, ok := httpgrpc.HTTPResponseFromError(errors.Cause(err))
if !ok {
Expand Down
13 changes: 12 additions & 1 deletion pkg/alertmanager/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,16 @@ func TestDistributor_DistributeRequest(t *testing.T) {
expStatusCode: http.StatusOK,
expectedTotalCalls: 1,
route: "/receivers",
}, {
name: "Read /v2/receivers is sent to 3 AMs",
numAM: 5,
numHappyAM: 5,
replicationFactor: 3,
isRead: true,
expStatusCode: http.StatusOK,
expectedTotalCalls: 3,
route: "/v2/receivers",
responseBody: []byte(`[{"name":"default"},{"name":"slack"}]`),
}, {
name: "Write /receivers not supported",
numAM: 5,
Expand Down Expand Up @@ -352,8 +362,9 @@ func prepare(t *testing.T, numAM, numHappyAM, replicationFactor int, responseBod

cfg := &MultitenantAlertmanagerConfig{}
flagext.DefaultValues(cfg)
cfg.ShardingRing.DisableReplicaSetExtension = false

d, err := NewDistributor(cfg.AlertmanagerClient, cfg.MaxRecvMsgSize, amRing, newMockAlertmanagerClientFactory(amByAddr), util_log.Logger, prometheus.NewRegistry())
d, err := NewDistributor(cfg.AlertmanagerClient, cfg.MaxRecvMsgSize, amRing, newMockAlertmanagerClientFactory(amByAddr), cfg.ShardingRing, util_log.Logger, prometheus.NewRegistry())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), d))

Expand Down
46 changes: 46 additions & 0 deletions pkg/alertmanager/merger/v2_receivers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package merger

import (
"github.com/go-openapi/swag/jsonutils"
v2_models "github.com/prometheus/alertmanager/api/v2/models"
)

// V2Receivers implements the Merger interface for GET /v2/receivers. It returns the union of receivers
// over all the responses. When a receiver with the same name exists in multiple responses, any one
// of them is returned (they should be identical across replicas).
type V2Receivers struct{}

func (V2Receivers) MergeResponses(in [][]byte) ([]byte, error) {
receivers := make([]*v2_models.Receiver, 0)
for _, body := range in {
parsed := make([]*v2_models.Receiver, 0)
if err := jsonutils.ReadJSON(body, &parsed); err != nil {
return nil, err
}
receivers = append(receivers, parsed...)
}

merged := mergeV2Receivers(receivers)
return jsonutils.WriteJSON(merged)
}

func mergeV2Receivers(in []*v2_models.Receiver) []*v2_models.Receiver {
// Deduplicate receivers by name. Since receivers should be identical across replicas
// (they come from the same config), we just keep the first occurrence of each name.
seen := make(map[string]bool)
result := make([]*v2_models.Receiver, 0)

for _, receiver := range in {
if receiver.Name == nil {
continue
}

name := *receiver.Name
if !seen[name] {
seen[name] = true
result = append(result, receiver)
}
}

return result
}
Loading