diff --git a/internal/querycoordv2/balance/score_based_balancer.go b/internal/querycoordv2/balance/score_based_balancer.go index 9dcf10bf70b4d..d3f02b2b43faa 100644 --- a/internal/querycoordv2/balance/score_based_balancer.go +++ b/internal/querycoordv2/balance/score_based_balancer.go @@ -524,6 +524,7 @@ func (b *ScoreBasedBalancer) balanceSegments(ctx context.Context, br *balanceRep } log.Info("Handle stopping nodes", + zap.Any("replica", replica.GetID()), zap.Any("stopping nodes", roNodes), zap.Any("available nodes", rwNodes), ) diff --git a/internal/querycoordv2/job/job_load.go b/internal/querycoordv2/job/job_load.go index 7a588542fb195..ff0bf3892a9eb 100644 --- a/internal/querycoordv2/job/job_load.go +++ b/internal/querycoordv2/job/job_load.go @@ -88,6 +88,11 @@ func (job *LoadCollectionJob) Execute() error { log := log.Ctx(job.ctx).With(zap.Int64("collectionID", req.GetCollectionId())) meta.GlobalFailedLoadCache.Remove(req.GetCollectionId()) + log.Info("start load collection job", + zap.Int("replicaConfigs", len(req.GetReplicas())), + zap.Int("channelNum", len(vchannels)), + zap.Int("partitionNum", len(req.GetPartitionIds())), + ) // 1. create replica if not exist if _, err := utils.SpawnReplicasWithReplicaConfig(job.ctx, job.meta, meta.SpawnWithReplicaConfigParams{ @@ -151,6 +156,7 @@ func (job *LoadCollectionJob) Execute() error { } } if len(toReleasePartitions) > 0 { + log.Info("release stale partitions", zap.Int64s("toReleasePartitions", toReleasePartitions)) job.targetObserver.ReleasePartition(req.GetCollectionId(), toReleasePartitions...) if err := job.meta.CollectionManager.RemovePartition(job.ctx, req.GetCollectionId(), toReleasePartitions...); err != nil { return errors.Wrap(err, "failed to remove partitions") @@ -177,7 +183,9 @@ func (job *LoadCollectionJob) Execute() error { } // 6. register load task into collection observer - job.collectionObserver.LoadPartitions(ctx, req.GetCollectionId(), incomingPartitions.Collect()) + incomingPartitionIDs := incomingPartitions.Collect() + log.Info("trigger collection observer load", zap.Int64s("incomingPartitions", incomingPartitionIDs)) + job.collectionObserver.LoadPartitions(ctx, req.GetCollectionId(), incomingPartitionIDs) // 7. wait for partition released if any partition is released if len(toReleasePartitions) > 0 { @@ -193,5 +201,6 @@ func (job *LoadCollectionJob) Execute() error { } log.Info("wait for partition released done", zap.Int64s("toReleasePartitions", toReleasePartitions)) } + log.Info("load collection job finished successfully") return nil } diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 51567ac5d4c3d..b57d3c7487491 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -678,6 +678,7 @@ func (s *Server) watchNodes(revision int64) { switch event.EventType { case sessionutil.SessionAddEvent: + log.Info("adding a node") s.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ NodeID: nodeID, Address: addr, diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 53ba81fa7e8eb..0c1c11badf73f 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -234,9 +234,25 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm return merr.Status(err), nil } - _, exist := node.delegators.Get(channel.GetChannelName()) + current, exist := node.delegators.Get(channel.GetChannelName()) if exist { - log.Info("channel already subscribed") + var ( + loadedRatio float64 + queryViewVersion int64 + serviceable bool + ) + if view := current.GetChannelQueryView(); view != nil { + loadedRatio = view.GetLoadedRatio() + queryViewVersion = view.GetVersion() + serviceable = view.Serviceable() + } + log.Info("channel already subscribed", + zap.Int64("delegatorVersion", delegator.Version()), + zap.Float64("loadedRatio", loadedRatio), + zap.Bool("serviceable", serviceable), + zap.Int64("queryViewVersion", queryViewVersion), + zap.Time("tSafe", tsoutil.PhysicalTime(delegator.GetTSafe())), + ) return merr.Success(), nil }