Skip to content

Commit 8eb630a

Browse files
committed
refactor: Extract assign policy from balancer and add StoppingBalancer
- Create new assign package with unified AssignPolicy interface that abstracts segment and channel assignment logic - Implement three assign policies: RoundRobin (for fast loading), RowCount-based, and Score-based assignment strategies - Add AssignPolicyFactory for creating and caching policy instances - Introduce StoppingBalancer to centralize stopping node balance logic that was previously duplicated across multiple balancers - Add BalancerFactory for dynamic balancer creation and caching based on configuration - Refactor existing balancers (ScoreBased, RowCountBased, ChannelLevel) to use the new assign policy abstraction - Move priority_queue and streaming_query_node_channel_helper to assign package - Add comprehensive unit tests for all new components - Add new config parameter for StoppingBalanceAssignPolicy Signed-off-by: Wei Liu <[email protected]>
1 parent 4f080bd commit 8eb630a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+5802
-2573
lines changed

configs/milvus.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,7 @@ queryCoord:
432432
checkNodeSessionInterval: 60 # the interval(in seconds) of check querynode cluster session
433433
gracefulStopTimeout: 5 # seconds. force stop node without graceful stop
434434
enableStoppingBalance: true # whether enable stopping balance
435+
stoppingBalanceAssignPolicy: ScoreBased # assign policy for stopping balance, options: RoundRobin, RowCount, ScoreBased
435436
channelExclusiveNodeFactor: 4 # the least node number for enable channel's exclusive mode
436437
collectionObserverInterval: 200 # the interval of collection observer
437438
checkExecutedFlagInterval: 100 # the interval of check executed flag to force to pull dist
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
reviewers:
2+
- weiliu1031
3+
4+
approvers:
5+
- maintainers
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// Licensed to the LF AI & Data foundation under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
package assign
18+
19+
import (
20+
"context"
21+
"fmt"
22+
23+
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
24+
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
25+
)
26+
27+
// SegmentAssignPlan represents a plan to assign a segment to a node
28+
type SegmentAssignPlan struct {
29+
Segment *meta.Segment
30+
Replica *meta.Replica
31+
From int64 // -1 if empty
32+
To int64
33+
FromScore int64
34+
ToScore int64
35+
SegmentScore int64
36+
LoadPriority commonpb.LoadPriority
37+
}
38+
39+
func (segPlan *SegmentAssignPlan) String() string {
40+
return fmt.Sprintf("SegmentPlan:[collectionID: %d, replicaID: %d, segmentID: %d, from: %d, to: %d, fromScore: %d, toScore: %d, segmentScore: %d]\n",
41+
segPlan.Segment.CollectionID, segPlan.Replica.GetID(), segPlan.Segment.ID, segPlan.From, segPlan.To, segPlan.FromScore, segPlan.ToScore, segPlan.SegmentScore)
42+
}
43+
44+
// ChannelAssignPlan represents a plan to assign a channel to a node
45+
type ChannelAssignPlan struct {
46+
Channel *meta.DmChannel
47+
Replica *meta.Replica
48+
From int64
49+
To int64
50+
FromScore int64
51+
ToScore int64
52+
ChannelScore int64
53+
}
54+
55+
func (chanPlan *ChannelAssignPlan) String() string {
56+
return fmt.Sprintf("ChannelPlan:[collectionID: %d, channel: %s, replicaID: %d, from: %d, to: %d]\n",
57+
chanPlan.Channel.CollectionID, chanPlan.Channel.ChannelName, chanPlan.Replica.GetID(), chanPlan.From, chanPlan.To)
58+
}
59+
60+
// AssignPolicy defines the unified policy for assigning both segments and channels to nodes
61+
// This interface abstracts the common logic of resource assignment across different balancers
62+
type AssignPolicy interface {
63+
// AssignSegment assigns segments to nodes based on the policy
64+
// Returns a list of segment assignment plans
65+
AssignSegment(ctx context.Context, collectionID int64, segments []*meta.Segment, nodes []int64, forceAssign bool) []SegmentAssignPlan
66+
67+
// AssignChannel assigns channels to nodes based on the policy
68+
// Returns a list of channel assignment plans
69+
AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, forceAssign bool) []ChannelAssignPlan
70+
}
71+
72+
// AssignPolicyConfig contains common configuration for assignment policies
73+
type AssignPolicyConfig struct {
74+
// BatchSize limits the number of resources to assign in one batch
75+
BatchSize int
76+
77+
// EnableBenefitCheck enables benefit evaluation before assignment
78+
EnableBenefitCheck bool
79+
}
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
// Licensed to the LF AI & Data foundation under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
package assign
18+
19+
import (
20+
"context"
21+
"math"
22+
23+
"github.com/blang/semver/v4"
24+
"github.com/samber/lo"
25+
26+
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
27+
"github.com/milvus-io/milvus/internal/querycoordv2/params"
28+
"github.com/milvus-io/milvus/internal/querycoordv2/session"
29+
)
30+
31+
// nodeFilter filters nodes based on various criteria
32+
type nodeFilter interface {
33+
// FilterNodes filters the input nodes and returns the valid ones
34+
FilterNodes(ctx context.Context, nodes []int64, forceAssign bool) []int64
35+
}
36+
37+
// segmentScoreCalculator calculates the score of a segment
38+
type segmentScoreCalculator interface {
39+
// CalculateScore returns the score of a segment (e.g., row count)
40+
CalculateScore(segment *meta.Segment) int64
41+
}
42+
43+
// channelScoreCalculator calculates the score of a channel
44+
type channelScoreCalculator interface {
45+
// CalculateScore returns the score of a channel
46+
CalculateScore(channel *meta.DmChannel) int64
47+
}
48+
49+
// nodeScoreCalculator calculates the load score of a node
50+
type nodeScoreCalculator interface {
51+
// CalculateScore returns the current load score of a node for the given collection
52+
CalculateScore(ctx context.Context, nodeID int64, collectionID int64) float64
53+
}
54+
55+
// assignmentStrategy defines how to select the best node for resource assignment
56+
type assignmentStrategy interface {
57+
// SelectNode selects the best node from candidates
58+
// Returns the selected node ID and updated node info
59+
SelectNode(candidates []*NodeItem, resourceScore int64) (selectedNode int64, updatedNodeInfo []*NodeItem)
60+
}
61+
62+
// benefitEvaluator evaluates whether an assignment provides enough benefit
63+
type benefitEvaluator interface {
64+
// HasEnoughBenefit checks if the assignment from source to target is beneficial enough
65+
// sourceScore: current score of source node
66+
// targetScore: current score of target node
67+
// resourceScore: score of the resource to be assigned
68+
HasEnoughBenefit(sourceScore float64, targetScore float64, resourceScore int64) bool
69+
}
70+
71+
// ============================================================================
72+
// Common Component Implementations
73+
// These are reusable implementations that can be shared across different policies
74+
// ============================================================================
75+
76+
// commonSegmentNodeFilter is a reusable node filter for segment assignment
77+
// It filters out nodes that are not in normal state
78+
type commonSegmentNodeFilter struct {
79+
nodeManager *session.NodeManager
80+
}
81+
82+
// newCommonSegmentNodeFilter creates a new common segment node filter
83+
func newCommonSegmentNodeFilter(nodeManager *session.NodeManager) nodeFilter {
84+
return &commonSegmentNodeFilter{nodeManager: nodeManager}
85+
}
86+
87+
// FilterNodes filters the input nodes and returns nodes in normal state
88+
func (f *commonSegmentNodeFilter) FilterNodes(ctx context.Context, nodes []int64, forceAssign bool) []int64 {
89+
if forceAssign {
90+
return nodes
91+
}
92+
return lo.Filter(nodes, func(node int64, _ int) bool {
93+
info := f.nodeManager.Get(node)
94+
return info != nil && info.GetState() == session.NodeStateNormal
95+
})
96+
}
97+
98+
// commonChannelNodeFilter is a reusable node filter for channel assignment
99+
// It filters out SQN nodes (if enabled), nodes with version < 2.4, and non-normal nodes
100+
type commonChannelNodeFilter struct {
101+
nodeManager *session.NodeManager
102+
}
103+
104+
// newCommonChannelNodeFilter creates a new common channel node filter
105+
func newCommonChannelNodeFilter(nodeManager *session.NodeManager) nodeFilter {
106+
return &commonChannelNodeFilter{nodeManager: nodeManager}
107+
}
108+
109+
// FilterNodes filters nodes for channel assignment considering SQN, version, and state
110+
func (f *commonChannelNodeFilter) FilterNodes(ctx context.Context, nodes []int64, forceAssign bool) []int64 {
111+
// Filter SQN if streaming service is enabled
112+
nodes = filterSQNIfStreamingServiceEnabled(nodes)
113+
114+
if forceAssign {
115+
return nodes
116+
}
117+
118+
// Version range filter: require version > 2.3.x
119+
versionRangeFilter := semver.MustParseRange(">2.3.x")
120+
return lo.Filter(nodes, func(node int64, _ int) bool {
121+
info := f.nodeManager.Get(node)
122+
// Balance channel to qn with version < 2.4 is not allowed since l0 segment supported
123+
// If watch channel on qn with version < 2.4, it may cause delete data loss
124+
return info != nil && info.GetState() == session.NodeStateNormal && versionRangeFilter(info.Version())
125+
})
126+
}
127+
128+
// commonScoreBasedBenefitEvaluator is a reusable benefit evaluator
129+
// for score-based policies
130+
type commonScoreBasedBenefitEvaluator struct{}
131+
132+
// HasEnoughBenefit checks if the assignment provides enough benefit
133+
// It considers:
134+
// 1. Score unbalance toleration factor
135+
// 2. Reverse unbalance toleration factor (if assignment would reverse the balance)
136+
func (e *commonScoreBasedBenefitEvaluator) HasEnoughBenefit(sourceScore float64, targetScore float64, resourceScore int64) bool {
137+
// Check if the score diff between source and target is below tolerance
138+
oldPriorityDiff := math.Abs(sourceScore - targetScore)
139+
if oldPriorityDiff < targetScore*params.Params.QueryCoordCfg.ScoreUnbalanceTolerationFactor.GetAsFloat() {
140+
return false
141+
}
142+
143+
// Check if assignment would reverse the balance
144+
newSourceScore := sourceScore - float64(resourceScore)
145+
newTargetScore := targetScore + float64(resourceScore)
146+
if newTargetScore > newSourceScore {
147+
// If score diff is reversed, check if the new diff is acceptable
148+
newScoreDiff := math.Abs(newSourceScore - newTargetScore)
149+
if newScoreDiff*params.Params.QueryCoordCfg.ReverseUnbalanceTolerationFactor.GetAsFloat() >= oldPriorityDiff {
150+
return false
151+
}
152+
}
153+
154+
return true
155+
}
156+
157+
// HasEnoughBenefitForNodes is a helper method for NodeItem-based evaluation
158+
func (e *commonScoreBasedBenefitEvaluator) HasEnoughBenefitForNodes(sourceNode *NodeItem, targetNode *NodeItem, scoreChanges float64) bool {
159+
return e.HasEnoughBenefit(
160+
float64(sourceNode.getPriority()),
161+
float64(targetNode.getPriority()),
162+
int64(scoreChanges),
163+
)
164+
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
// Licensed to the LF AI & Data foundation under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
package assign
18+
19+
import (
20+
"sync"
21+
22+
"go.uber.org/zap"
23+
24+
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
25+
"github.com/milvus-io/milvus/internal/querycoordv2/session"
26+
"github.com/milvus-io/milvus/internal/querycoordv2/task"
27+
"github.com/milvus-io/milvus/pkg/v2/log"
28+
)
29+
30+
const (
31+
// PolicyTypeRoundRobin uses simple round-robin assignment
32+
PolicyTypeRoundRobin = "round_robin"
33+
// PolicyTypeRowCount uses row count/channel count-based priority queue assignment
34+
PolicyTypeRowCount = "row_count"
35+
// PolicyTypeScoreBased uses comprehensive score-based assignment with benefit evaluation
36+
PolicyTypeScoreBased = "score_based"
37+
)
38+
39+
// AssignPolicyFactory is responsible for creating and caching assign policy instances.
40+
// It supports dynamic policy switching based on configuration changes.
41+
type AssignPolicyFactory struct {
42+
policyMap map[string]AssignPolicy
43+
policyLock sync.RWMutex
44+
45+
// Dependencies for creating policies
46+
scheduler task.Scheduler
47+
nodeManager *session.NodeManager
48+
dist *meta.DistributionManager
49+
meta *meta.Meta
50+
targetMgr meta.TargetManagerInterface
51+
}
52+
53+
// Global factory instance
54+
var (
55+
globalPolicyFactory *AssignPolicyFactory
56+
policyFactoryOnce sync.Once
57+
)
58+
59+
// InitGlobalAssignPolicyFactory initializes the global assign policy factory singleton.
60+
// This should be called once during server startup.
61+
func InitGlobalAssignPolicyFactory(
62+
scheduler task.Scheduler,
63+
nodeManager *session.NodeManager,
64+
dist *meta.DistributionManager,
65+
meta *meta.Meta,
66+
targetMgr meta.TargetManagerInterface,
67+
) {
68+
policyFactoryOnce.Do(func() {
69+
globalPolicyFactory = NewAssignPolicyFactory(scheduler, nodeManager, dist, meta, targetMgr)
70+
log.Info("Global assign policy factory initialized")
71+
})
72+
}
73+
74+
// GetGlobalAssignPolicyFactory returns the global assign policy factory instance.
75+
// Returns nil if InitGlobalAssignPolicyFactory has not been called.
76+
func GetGlobalAssignPolicyFactory() *AssignPolicyFactory {
77+
return globalPolicyFactory
78+
}
79+
80+
// ResetGlobalAssignPolicyFactoryForTest resets the global factory for testing purposes.
81+
// This should only be used in tests.
82+
func ResetGlobalAssignPolicyFactoryForTest() {
83+
globalPolicyFactory = nil
84+
policyFactoryOnce = sync.Once{}
85+
}
86+
87+
// NewAssignPolicyFactory creates a new AssignPolicyFactory instance.
88+
func NewAssignPolicyFactory(
89+
scheduler task.Scheduler,
90+
nodeManager *session.NodeManager,
91+
dist *meta.DistributionManager,
92+
meta *meta.Meta,
93+
targetMgr meta.TargetManagerInterface,
94+
) *AssignPolicyFactory {
95+
return &AssignPolicyFactory{
96+
policyMap: make(map[string]AssignPolicy),
97+
scheduler: scheduler,
98+
nodeManager: nodeManager,
99+
dist: dist,
100+
meta: meta,
101+
targetMgr: targetMgr,
102+
}
103+
}
104+
105+
// GetPolicy returns an assign policy instance based on the specified policy type.
106+
// It caches policy instances and reuses them when the policy type hasn't changed.
107+
func (f *AssignPolicyFactory) GetPolicy(policyType string) AssignPolicy {
108+
f.policyLock.Lock()
109+
defer f.policyLock.Unlock()
110+
111+
policy, ok := f.policyMap[policyType]
112+
if ok {
113+
return policy
114+
}
115+
116+
log.Info("Creating new assign policy", zap.String("type", policyType))
117+
118+
switch policyType {
119+
case PolicyTypeRoundRobin:
120+
policy = newRoundRobinAssignPolicy(f.nodeManager, f.scheduler, f.targetMgr)
121+
case PolicyTypeRowCount:
122+
policy = newRowCountBasedAssignPolicy(f.nodeManager, f.scheduler, f.dist)
123+
case PolicyTypeScoreBased:
124+
policy = newScoreBasedAssignPolicy(f.nodeManager, f.scheduler, f.dist, f.meta)
125+
default:
126+
log.Info("Unknown assign policy type, using default",
127+
zap.String("requested", policyType),
128+
zap.String("default", PolicyTypeScoreBased))
129+
policy = newScoreBasedAssignPolicy(f.nodeManager, f.scheduler, f.dist, f.meta)
130+
}
131+
132+
f.policyMap[policyType] = policy
133+
return policy
134+
}

0 commit comments

Comments
 (0)