Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/querycoordv2/balance/score_based_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ func (b *ScoreBasedBalancer) balanceSegments(ctx context.Context, br *balanceRep
}

log.Info("Handle stopping nodes",
zap.Any("replica", replica.GetID()),
zap.Any("stopping nodes", roNodes),
zap.Any("available nodes", rwNodes),
)
Expand Down
11 changes: 10 additions & 1 deletion internal/querycoordv2/job/job_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ func (job *LoadCollectionJob) Execute() error {

log := log.Ctx(job.ctx).With(zap.Int64("collectionID", req.GetCollectionId()))
meta.GlobalFailedLoadCache.Remove(req.GetCollectionId())
log.Info("start load collection job",
zap.Int("replicaConfigs", len(req.GetReplicas())),
zap.Int("channelNum", len(vchannels)),
zap.Int("partitionNum", len(req.GetPartitionIds())),
)

// 1. create replica if not exist
if _, err := utils.SpawnReplicasWithReplicaConfig(job.ctx, job.meta, meta.SpawnWithReplicaConfigParams{
Expand Down Expand Up @@ -151,6 +156,7 @@ func (job *LoadCollectionJob) Execute() error {
}
}
if len(toReleasePartitions) > 0 {
log.Info("release stale partitions", zap.Int64s("toReleasePartitions", toReleasePartitions))
job.targetObserver.ReleasePartition(req.GetCollectionId(), toReleasePartitions...)
if err := job.meta.CollectionManager.RemovePartition(job.ctx, req.GetCollectionId(), toReleasePartitions...); err != nil {
return errors.Wrap(err, "failed to remove partitions")
Expand All @@ -177,7 +183,9 @@ func (job *LoadCollectionJob) Execute() error {
}

// 6. register load task into collection observer
job.collectionObserver.LoadPartitions(ctx, req.GetCollectionId(), incomingPartitions.Collect())
incomingPartitionIDs := incomingPartitions.Collect()
log.Info("trigger collection observer load", zap.Int64s("incomingPartitions", incomingPartitionIDs))
job.collectionObserver.LoadPartitions(ctx, req.GetCollectionId(), incomingPartitionIDs)

// 7. wait for partition released if any partition is released
if len(toReleasePartitions) > 0 {
Expand All @@ -193,5 +201,6 @@ func (job *LoadCollectionJob) Execute() error {
}
log.Info("wait for partition released done", zap.Int64s("toReleasePartitions", toReleasePartitions))
}
log.Info("load collection job finished successfully")
return nil
}
1 change: 1 addition & 0 deletions internal/querycoordv2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,7 @@ func (s *Server) watchNodes(revision int64) {

switch event.EventType {
case sessionutil.SessionAddEvent:
log.Info("adding a node")
s.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: nodeID,
Address: addr,
Expand Down
20 changes: 18 additions & 2 deletions internal/querynodev2/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,25 @@
return merr.Status(err), nil
}

_, exist := node.delegators.Get(channel.GetChannelName())
current, exist := node.delegators.Get(channel.GetChannelName())
if exist {
log.Info("channel already subscribed")
var (
loadedRatio float64
queryViewVersion int64
serviceable bool
)
if view := current.GetChannelQueryView(); view != nil {
loadedRatio = view.GetLoadedRatio()
queryViewVersion = view.GetVersion()
serviceable = view.Serviceable()
}
log.Info("channel already subscribed",
zap.Int64("delegatorVersion", delegator.Version()),

Check failure on line 250 in internal/querynodev2/services.go

View workflow job for this annotation

GitHub Actions / Code Checker MacOS

undefined: delegator.Version
zap.Float64("loadedRatio", loadedRatio),
zap.Bool("serviceable", serviceable),
zap.Int64("queryViewVersion", queryViewVersion),
zap.Time("tSafe", tsoutil.PhysicalTime(delegator.GetTSafe())),

Check failure on line 254 in internal/querynodev2/services.go

View workflow job for this annotation

GitHub Actions / Code Checker MacOS

undefined: delegator.GetTSafe
)
return merr.Success(), nil
}

Expand Down
Loading