Skip to content

Commit 6821048

Browse files
committed
improvement: add debug log on collection load
Signed-off-by: xiaofanluan <[email protected]>
1 parent 7078f40 commit 6821048

File tree

4 files changed

+31
-4
lines changed

4 files changed

+31
-4
lines changed

internal/querycoordv2/balance/score_based_balancer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,7 @@ func (b *ScoreBasedBalancer) balanceSegments(ctx context.Context, br *balanceRep
524524
}
525525

526526
log.Info("Handle stopping nodes",
527+
zap.Any("replica", replica.GetID()),
527528
zap.Any("stopping nodes", roNodes),
528529
zap.Any("available nodes", rwNodes),
529530
)

internal/querycoordv2/job/job_load.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,11 @@ func (job *LoadCollectionJob) Execute() error {
8888

8989
log := log.Ctx(job.ctx).With(zap.Int64("collectionID", req.GetCollectionId()))
9090
meta.GlobalFailedLoadCache.Remove(req.GetCollectionId())
91+
log.Info("start load collection job",
92+
zap.Int("replicaConfigs", len(req.GetReplicas())),
93+
zap.Int("channelNum", len(vchannels)),
94+
zap.Int("partitionNum", len(req.GetPartitionIds())),
95+
)
9196

9297
// 1. create replica if not exist
9398
if _, err := utils.SpawnReplicasWithReplicaConfig(job.ctx, job.meta, meta.SpawnWithReplicaConfigParams{
@@ -151,6 +156,7 @@ func (job *LoadCollectionJob) Execute() error {
151156
}
152157
}
153158
if len(toReleasePartitions) > 0 {
159+
log.Info("release stale partitions", zap.Int64s("toReleasePartitions", toReleasePartitions))
154160
job.targetObserver.ReleasePartition(req.GetCollectionId(), toReleasePartitions...)
155161
if err := job.meta.CollectionManager.RemovePartition(job.ctx, req.GetCollectionId(), toReleasePartitions...); err != nil {
156162
return errors.Wrap(err, "failed to remove partitions")
@@ -177,7 +183,9 @@ func (job *LoadCollectionJob) Execute() error {
177183
}
178184

179185
// 6. register load task into collection observer
180-
job.collectionObserver.LoadPartitions(ctx, req.GetCollectionId(), incomingPartitions.Collect())
186+
incomingPartitionIDs := incomingPartitions.Collect()
187+
log.Info("trigger collection observer load", zap.Int64s("incomingPartitions", incomingPartitionIDs))
188+
job.collectionObserver.LoadPartitions(ctx, req.GetCollectionId(), incomingPartitionIDs)
181189

182190
// 7. wait for partition released if any partition is released
183191
if len(toReleasePartitions) > 0 {
@@ -193,5 +201,6 @@ func (job *LoadCollectionJob) Execute() error {
193201
}
194202
log.Info("wait for partition released done", zap.Int64s("toReleasePartitions", toReleasePartitions))
195203
}
204+
log.Info("load collection job finished successfully")
196205
return nil
197206
}

internal/querycoordv2/server.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -678,6 +678,7 @@ func (s *Server) watchNodes(revision int64) {
678678

679679
switch event.EventType {
680680
case sessionutil.SessionAddEvent:
681+
log.Info("adding a node")
681682
s.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
682683
NodeID: nodeID,
683684
Address: addr,

internal/querynodev2/services.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -234,9 +234,25 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
234234
return merr.Status(err), nil
235235
}
236236

237-
_, exist := node.delegators.Get(channel.GetChannelName())
237+
delegator, exist := node.delegators.Get(channel.GetChannelName())
238238
if exist {
239-
log.Info("channel already subscribed")
239+
var (
240+
loadedRatio float64
241+
queryViewVersion int64
242+
serviceable bool
243+
)
244+
if view := delegator.GetChannelQueryView(); view != nil {
245+
loadedRatio = view.GetLoadedRatio()
246+
queryViewVersion = view.GetVersion()
247+
serviceable = view.Serviceable()
248+
}
249+
log.Info("channel already subscribed",
250+
zap.Int64("delegatorVersion", delegator.Version()),
251+
zap.Float64("loadedRatio", loadedRatio),
252+
zap.Bool("serviceable", serviceable),
253+
zap.Int64("queryViewVersion", queryViewVersion),
254+
zap.Time("tSafe", tsoutil.PhysicalTime(delegator.GetTSafe())),
255+
)
240256
return merr.Success(), nil
241257
}
242258

@@ -259,7 +275,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
259275
req.GetTargetVersion(),
260276
)
261277

262-
delegator, err := delegator.NewShardDelegator(
278+
delegator, err = delegator.NewShardDelegator(
263279
ctx,
264280
req.GetCollectionID(),
265281
req.GetReplicaID(),

0 commit comments

Comments
 (0)