Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 14 additions & 47 deletions internal/querycoordv2/meta/channel_dist_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,45 +365,23 @@ func (m *ChannelDistManager) GetShardLeader(channelName string, replica *Replica
candidatesServiceable := candidates.IsServiceable()
channelServiceable := channel.IsServiceable()

candidateIsStreamingNode := m.checkIfStreamingNode(candidates.Node)
channelIsStreamingNode := m.checkIfStreamingNode(channel.Node)
logger.Debug("check whether stream node is serviceable",
zap.Bool("candidatesServiceable", candidatesServiceable),
zap.Bool("channelServiceable", channelServiceable),
zap.Bool("candidateIsStreamingNode", candidateIsStreamingNode),
zap.Bool("channelIsStreamingNode", channelIsStreamingNode))

if channelIsStreamingNode && !candidateIsStreamingNode {
// When upgrading from 2.5 to 2.6, the delegator leader may not locate at streaming node.
// We always use the streaming node as the delegator leader to avoid the delete data lost when loading segment.
logger.Debug("set delegator on stream node to candidate shard leader", zap.Int64("node", channel.Node),
updateNeeded := false
switch {
case !candidatesServiceable && channelServiceable:
// Current candidate is not serviceable but new channel is
updateNeeded = true
logger.Debug("set serviceable delegator to candidate shard leader", zap.Int64("node", channel.Node),
zap.Int64("channel version", channel.Version))
case candidatesServiceable == channelServiceable && channel.Version > candidates.Version:
// Same service status but higher version
updateNeeded = true
logger.Debug("set serviceable delegator with larger version to candidate shard leader", zap.Int64("node", channel.Node),
zap.Int64("channel version", channel.Version), zap.Int64("candidate version", candidates.Version))
}
if updateNeeded {
candidates = channel
} else if !channelIsStreamingNode && candidateIsStreamingNode {
// When downgrading from 2.6 to 2.5, the delegator leader may locate at non-streaming node.
// We always use the non-streaming node as the delegator leader to avoid the delete data lost when loading segment.
logger.Debug("found delegator which is not on stream node", zap.Int64("node", channel.Node),
zap.Int64("channel version", channel.Version))
continue
} else {
updateNeeded := false
switch {
case !candidatesServiceable && channelServiceable:
// Current candidate is not serviceable but new channel is
updateNeeded = true
logger.Debug("set serviceable delegator to candidate shard leader", zap.Int64("node", channel.Node),
zap.Int64("channel version", channel.Version))
case candidatesServiceable == channelServiceable && channel.Version > candidates.Version:
// Same service status but higher version
updateNeeded = true
logger.Debug("set serviceable delegator with larger version to candidate shard leader", zap.Int64("node", channel.Node),
zap.Int64("channel version", channel.Version), zap.Int64("candidate version", candidates.Version))
}
if updateNeeded {
candidates = channel
} else {
logger.Debug("not set any channel to candidates in this round")
}
logger.Debug("not set any channel to candidates in this round")
}
}
}
Expand All @@ -416,17 +394,6 @@ func (m *ChannelDistManager) GetShardLeader(channelName string, replica *Replica
return candidates
}

// checkIfStreamingNode checks if the node is a streaming node.
// Because the session of streaming node and embedded query node are different,
// So we need to check if the node is a streaming node from the query node session but not streaming node session to avoid the wrong check result.
func (m *ChannelDistManager) checkIfStreamingNode(nodeID int64) bool {
node := m.nodeManager.Get(nodeID)
if node == nil {
return false
}
return node.IsEmbeddedQueryNodeInStreamingNode() || node.IsInStandalone()
}

func (m *ChannelDistManager) GetChannelDist(collectionID int64) []*metricsinfo.DmChannel {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
Expand Down
20 changes: 0 additions & 20 deletions internal/querycoordv2/meta/channel_dist_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/milvus-io/milvus/internal/coordinator/snmanager"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
"github.com/milvus-io/milvus/pkg/v2/util/metricsinfo"
Expand Down Expand Up @@ -344,25 +343,6 @@ func (suite *ChannelDistManagerSuite) TestGetShardLeader() {
// Test nonexistent channel
leader = dist.GetShardLeader("nonexistent", replica)
suite.Nil(leader)

// Test streaming node
nodeManager.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 4,
Address: "localhost:1",
Hostname: "localhost",
Labels: map[string]string{sessionutil.LabelStreamingNodeEmbeddedQueryNode: "1"},
}))
channel1Node4 := suite.channels["dmc0"].Clone()
channel1Node4.Node = 4
channel1Node4.Version = 3
channel1Node4.View.Status.Serviceable = false
dist.Update(4, channel1Node4)

leader = dist.GetShardLeader("dmc0", replica)
suite.NotNil(leader)
suite.Equal(int64(4), leader.Node)
suite.Equal(int64(3), leader.Version)
suite.False(leader.IsServiceable())
}

func TestGetChannelDistJSON(t *testing.T) {
Expand Down
43 changes: 20 additions & 23 deletions internal/querycoordv2/task/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,10 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/coordinator/snmanager"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
Expand Down Expand Up @@ -243,11 +241,6 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
return err
}

if err := ex.checkIfShardLeaderIsStreamingNode(view); err != nil {
log.Warn("shard leader is not a streamingnode, skip load segment", zap.Error(err))
return err
}

log = log.With(zap.Int64("shardLeader", view.Node))

// NOTE: for balance segment task, expected load and release execution on the same shard leader
Expand All @@ -270,25 +263,29 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
return nil
}

// checkIfShardLeaderIsStreamingNode checks if the shard leader is a streamingnode.
// If we enable following checking when loading segments,
// 1. all segment should always be loaded by streamingnode but not 2.5 querynode, make some search and query failure when upgrading.
// Otherwise, some search and query result will be wrong when upgrading.
// We choose to disable this checking for now to promise available search and query when upgrading.
//
// Because the L0 management at 2.6 and 2.5 is different, so when upgrading mixcoord,
// the new mixcoord will make a wrong plan when balancing a segment from one query node to another by 2.5 delegator.
// We need to balance the 2.5 delegator to 2.6 delegator before balancing any segment by 2.6 mixcoord.
func (ex *Executor) checkIfShardLeaderIsStreamingNode(view *meta.DmChannel) error {
if !streamingutil.IsStreamingServiceEnabled() {
return nil
}

node := ex.nodeMgr.Get(view.Node)
if node == nil {
return merr.WrapErrServiceInternal(fmt.Sprintf("node %d is not found", view.Node))
}
nodes := snmanager.StaticStreamingNodeManager.GetStreamingQueryNodeIDs()
if !nodes.Contain(view.Node) {
return merr.WrapErrServiceInternal(fmt.Sprintf("channel %s at node %d is not working at streamingnode, skip load segment", view.GetChannelName(), view.Node))
}
return nil
}
// func (ex *Executor) checkIfShardLeaderIsStreamingNode(view *meta.DmChannel) error {
// if !streamingutil.IsStreamingServiceEnabled() {
// return nil
// }
//
// node := ex.nodeMgr.Get(view.Node)
// if node == nil {
// return merr.WrapErrServiceInternal(fmt.Sprintf("node %d is not found", view.Node))
// }
// nodes := snmanager.StaticStreamingNodeManager.GetStreamingQueryNodeIDs()
// if !nodes.Contain(view.Node) {
// return merr.WrapErrServiceInternal(fmt.Sprintf("channel %s at node %d is not working at streamingnode, skip load segment", view.GetChannelName(), view.Node))
// }
// return nil
// }

func (ex *Executor) releaseSegment(task *SegmentTask, step int) {
defer ex.removeTask(task, step)
Expand Down
Loading