Skip to content

Commit 9b6d376

Browse files
Resolve #650 fix can not stop prophet (#652)
1 parent d72f278 commit 9b6d376

File tree

6 files changed

+106
-60
lines changed

6 files changed

+106
-60
lines changed

components/prophet/client.go

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ type asyncClient struct {
9999
containerID uint64
100100
adapter metadata.Adapter
101101
id uint64
102-
contexts sync.Map // id -> request
103102
leaderConn goetty.IOSession
104103

105104
resetReadC chan string
@@ -113,6 +112,11 @@ type asyncClient struct {
113112
sync.RWMutex
114113
state int32
115114
}
115+
116+
contextsMu struct {
117+
sync.RWMutex
118+
contexts map[uint64]*ctx
119+
}
116120
}
117121

118122
// NewClient create a prophet client
@@ -132,6 +136,7 @@ func NewClient(adapter metadata.Adapter, opts ...Option) Client {
132136
c.opts.adjust()
133137
c.stopper = stop.NewStopper("prophet-client", stop.WithLogger(c.opts.logger))
134138
c.leaderConn = createConn(c.opts.logger)
139+
c.contextsMu.contexts = make(map[uint64]*ctx)
135140
c.start()
136141
return c
137142
}
@@ -592,8 +597,10 @@ func (c *asyncClient) do(ctx *ctx) error {
592597
if !added {
593598
ctx.req.ID = c.nextID()
594599
if ctx.sync || ctx.cb != nil {
595-
c.contexts.Store(ctx.req.ID, ctx)
600+
c.contextsMu.Lock()
601+
c.contextsMu.contexts[ctx.req.ID] = ctx
596602
util.DefaultTimeoutWheel().Schedule(c.opts.rpcTimeout, c.timeout, ctx.req.ID)
603+
c.contextsMu.Unlock()
597604
}
598605
added = true
599606
}
@@ -612,9 +619,11 @@ func (c *asyncClient) do(ctx *ctx) error {
612619
}
613620

614621
func (c *asyncClient) timeout(arg interface{}) {
615-
if v, ok := c.contexts.Load(arg); ok {
616-
c.contexts.Delete(arg)
617-
v.(*ctx).done(nil, ErrTimeout)
622+
c.contextsMu.RLock()
623+
defer c.contextsMu.RUnlock()
624+
625+
if ctx, ok := c.contextsMu.contexts[arg.(uint64)]; ok {
626+
ctx.done(nil, ErrTimeout)
618627
}
619628
}
620629

@@ -667,12 +676,12 @@ OUTER:
667676
for {
668677
select {
669678
case <-stopCtx.Done():
670-
c.contexts.Range(func(key, value interface{}) bool {
671-
if value != nil {
672-
value.(*ctx).done(nil, ErrClosed)
673-
}
674-
return true
675-
})
679+
c.contextsMu.Lock()
680+
for k, ctx := range c.contextsMu.contexts {
681+
ctx.done(nil, ErrClosed)
682+
delete(c.contextsMu.contexts, k)
683+
}
684+
c.contextsMu.Unlock()
676685
return
677686
case leader, ok := <-c.resetReadC:
678687
if ok {
@@ -714,20 +723,25 @@ OUTER:
714723
}
715724

716725
func (c *asyncClient) requestDoneWithRetry(resp *rpcpb.Response) {
717-
v, ok := c.contexts.Load(resp.ID)
718-
if ok && v != nil {
719-
v.(*ctx).done(nil, util.ErrNotLeader)
726+
c.contextsMu.Lock()
727+
defer c.contextsMu.Unlock()
728+
729+
if ctx, ok := c.contextsMu.contexts[resp.ID]; ok {
730+
delete(c.contextsMu.contexts, resp.ID)
731+
ctx.done(nil, util.ErrNotLeader)
720732
}
721733
}
722734

723735
func (c *asyncClient) requestDone(resp *rpcpb.Response) {
724-
v, ok := c.contexts.Load(resp.ID)
725-
if ok && v != nil {
726-
c.contexts.Delete(resp.ID)
736+
c.contextsMu.Lock()
737+
defer c.contextsMu.Unlock()
738+
739+
if ctx, ok := c.contextsMu.contexts[resp.ID]; ok {
740+
delete(c.contextsMu.contexts, resp.ID)
727741
if resp.Error != "" {
728-
v.(*ctx).done(nil, errors.New(resp.Error))
742+
ctx.done(nil, errors.New(resp.Error))
729743
} else {
730-
v.(*ctx).done(resp, nil)
744+
ctx.done(resp, nil)
731745
}
732746
}
733747
}

components/prophet/cluster/cluster.go

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,8 @@ func (c *RaftCluster) runBackgroundJobs(interval time.Duration) {
258258
c.Lock()
259259
close(c.createResourceC)
260260
close(c.changedEvents)
261+
c.createResourceC = nil
262+
c.changedEvents = nil
261263
c.Unlock()
262264
c.logger.Info("background jobs has been stopped")
263265
return
@@ -454,13 +456,15 @@ func (c *RaftCluster) HandleContainerHeartbeat(stats *metapb.ContainerStats) err
454456
newContainer = newContainer.Clone(core.SetLastPersistTime(time.Now()))
455457
}
456458

457-
c.changedEvents <- event.NewContainerEvent(newContainer.Meta)
459+
c.addNotifyLocked(event.NewContainerEvent(newContainer.Meta))
458460
}
459461
if container := c.core.GetContainer(newContainer.Meta.ID()); container != nil {
460462
c.hotStat.UpdateContainerHeartbeatMetrics(container)
461463
}
464+
462465
c.core.PutContainer(newContainer)
463-
c.changedEvents <- event.NewContainerStatsEvent(newContainer.GetContainerStats())
466+
c.addNotifyLocked(event.NewContainerStatsEvent(newContainer.GetContainerStats()))
467+
464468
c.hotStat.Observe(newContainer.Meta.ID(), newContainer.GetContainerStats())
465469
c.hotStat.UpdateTotalLoad(c.core.GetContainers())
466470
c.hotStat.FilterUnhealthyContainer(c)
@@ -665,24 +669,20 @@ func (c *RaftCluster) processResourceHeartbeat(res *core.CachedResource) error {
665669
}
666670
c.RLock()
667671
if saveKV || saveCache || isNew {
668-
if c.changedEvents != nil {
669-
if res.GetLeader().GetID() != 0 {
670-
from := uint64(0)
671-
if origin != nil {
672-
from = origin.GetLeader().GetContainerID()
673-
}
674-
c.logger.Debug("notify resource leader changed",
675-
zap.Uint64("resource", res.Meta.ID()),
676-
zap.Uint64("from", from),
677-
zap.Uint64("to", res.GetLeader().GetContainerID()))
672+
if res.GetLeader().GetID() != 0 {
673+
from := uint64(0)
674+
if origin != nil {
675+
from = origin.GetLeader().GetContainerID()
678676
}
679-
c.changedEvents <- event.NewResourceEvent(res.Meta, res.GetLeader().GetID(), false, false)
677+
c.logger.Debug("notify resource leader changed",
678+
zap.Uint64("resource", res.Meta.ID()),
679+
zap.Uint64("from", from),
680+
zap.Uint64("to", res.GetLeader().GetContainerID()))
680681
}
682+
c.addNotifyLocked(event.NewResourceEvent(res.Meta, res.GetLeader().GetID(), false, false))
681683
}
682684
if saveCache {
683-
if c.changedEvents != nil {
684-
c.changedEvents <- event.NewResourceStatsEvent(res.GetStat())
685-
}
685+
c.addNotifyLocked(event.NewResourceStatsEvent(res.GetStat()))
686686
}
687687
c.RUnlock()
688688

@@ -1300,6 +1300,8 @@ func (c *RaftCluster) AllocID() (uint64, error) {
13001300

13011301
// ChangedEventNotifier changedEventNotifier
13021302
func (c *RaftCluster) ChangedEventNotifier() <-chan rpcpb.EventNotify {
1303+
c.RLock()
1304+
defer c.RUnlock()
13031305
return c.changedEvents
13041306
}
13051307

@@ -1606,3 +1608,9 @@ func (c *RaftCluster) JointConsensusEnabled() bool {
16061608
func (c *RaftCluster) GetResourceFactory() func() metadata.Resource {
16071609
return c.adapter.NewResource
16081610
}
1611+
1612+
func (c *RaftCluster) addNotifyLocked(event rpcpb.EventNotify) {
1613+
if c.changedEvents != nil {
1614+
c.changedEvents <- event
1615+
}
1616+
}

components/prophet/cluster/cluster_worker.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ func (c *RaftCluster) HandleRemoveResources(request *rpcpb.Request) (*rpcpb.Remo
333333
c.core.AddRemovedResources(request.RemoveResources.IDs...)
334334
for _, res := range origin {
335335
res.SetState(metapb.ResourceState_Destroyed)
336-
c.changedEvents <- event.NewResourceEvent(res, 0, true, false)
336+
c.addNotifyLocked(event.NewResourceEvent(res, 0, true, false))
337337
}
338338

339339
return &rpcpb.RemoveResourcesRsp{}, nil
@@ -391,15 +391,17 @@ func (c *RaftCluster) HandleGetScheduleGroupRule(request *rpcpb.Request) ([]meta
391391
}
392392

393393
func (c *RaftCluster) triggerNotifyCreateResources() {
394-
select {
395-
case c.createResourceC <- struct{}{}:
396-
default:
394+
if c.createResourceC != nil {
395+
select {
396+
case c.createResourceC <- struct{}{}:
397+
default:
398+
}
397399
}
398400
}
399401

400402
func (c *RaftCluster) doNotifyCreateResources() {
401403
c.core.ForeachWaittingCreateResources(func(res metadata.Resource) {
402-
c.changedEvents <- event.NewResourceEvent(res, 0, false, true)
404+
c.addNotifyLocked(event.NewResourceEvent(res, 0, false, true))
403405
})
404406
}
405407

components/prophet/election/election_leadship.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,11 +111,13 @@ func (ls *Leadership) ChangeLeaderTo(newLeader string) error {
111111

112112
// Stop stop the current leadship
113113
func (ls *Leadership) Stop() error {
114+
ls.logger.Info("begin to stop")
114115
var err error
115116
lease := ls.GetLease()
116117
if lease != nil {
117118
err = lease.Close(ls.elector.client.Ctx())
118119
}
120+
ls.logger.Info("begin to stop stopper")
119121
ls.stopper.Stop()
120122
return err
121123
}
@@ -162,12 +164,16 @@ func (ls *Leadership) ElectionLoop() {
162164
func (ls *Leadership) doElectionLoop(ctx context.Context) {
163165
ls.ctx = ctx
164166
for {
167+
ls.logger.Info("ready to next loop",
168+
mainLoopFiled)
165169
select {
166170
case <-ctx.Done():
167171
ls.logger.Info("loop exit due to context done",
168172
mainLoopFiled)
169173
return
170174
default:
175+
ls.logger.Info("ready to load current leader",
176+
mainLoopFiled)
171177
currentLeader, rev, err := ls.CurrentLeader()
172178
if err != nil {
173179
ls.logger.Error("fail to load current leader, retry later",
@@ -206,6 +212,8 @@ func (ls *Leadership) doElectionLoop(ctx context.Context) {
206212
}
207213

208214
if ls.allowCampaign {
215+
ls.logger.Info("start checkExpectLeader",
216+
mainLoopFiled)
209217
// check expect leader exists
210218
err := ls.checkExpectLeader()
211219
if err != nil {
@@ -215,14 +223,17 @@ func (ls *Leadership) doElectionLoop(ctx context.Context) {
215223
time.Sleep(200 * time.Millisecond)
216224
continue
217225
}
218-
226+
ls.logger.Info("end checkExpectLeader, and start campaign",
227+
mainLoopFiled)
219228
if err = ls.campaign(); err != nil {
220229
ls.logger.Error("fail to campaign leader",
221230
mainLoopFiled,
222231
zap.Error(err))
223232
time.Sleep(time.Second * time.Duration(ls.elector.options.leaseSec))
224233
continue
225234
}
235+
ls.logger.Info("end campaign",
236+
mainLoopFiled)
226237
}
227238

228239
time.Sleep(loopInterval)
@@ -345,7 +356,7 @@ func (ls *Leadership) campaign() error {
345356
ls.logger.Info("exit due to client context done",
346357
keepaliveField)
347358
return errors.New("etcd client closed")
348-
case <-ctx.Done():
359+
case <-ls.ctx.Done():
349360
ls.logger.Info("exit due to context done",
350361
keepaliveField)
351362
return nil

components/prophet/prophet.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,18 +210,21 @@ func (p *defaultProphet) Stop() {
210210
p.trans.Stop()
211211
p.logger.Info("transport stopped")
212212

213-
p.member.Stop()
214-
p.logger.Info("member stopped")
215-
216213
p.cancel()
217214
p.elector.Client().Close()
218215
p.logger.Info("etcd client stopped")
219216

217+
p.member.Stop()
218+
p.logger.Info("member stopped")
219+
220220
if p.etcd != nil {
221221
p.etcd.Close()
222222
}
223+
p.logger.Info("etcd server stopped")
223224

224225
p.stopJobs()
226+
p.logger.Info("job begin to stopped")
227+
225228
p.stopper.Stop()
226229
p.logger.Info("prophet stopped")
227230
})

components/prophet/prophet_watcher.go

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package prophet
1515

1616
import (
17+
"context"
1718
"fmt"
1819
"sync"
1920
"sync/atomic"
@@ -23,6 +24,7 @@ import (
2324
"github.com/matrixorigin/matrixcube/components/prophet/cluster"
2425
"github.com/matrixorigin/matrixcube/components/prophet/event"
2526
"github.com/matrixorigin/matrixcube/components/prophet/pb/rpcpb"
27+
"github.com/matrixorigin/matrixcube/util/stop"
2628
"go.uber.org/zap"
2729
)
2830

@@ -49,14 +51,17 @@ type watcherNotifier struct {
4951
logger *zap.Logger
5052
watchers map[uint64]*watcherSession
5153
cluster *cluster.RaftCluster
54+
stopper *stop.Stopper
5255
}
5356

5457
func newWatcherNotifier(cluster *cluster.RaftCluster, logger *zap.Logger) *watcherNotifier {
55-
return &watcherNotifier{
58+
wn := &watcherNotifier{
5659
logger: log.Adjust(logger).Named("watch-notify"),
5760
cluster: cluster,
5861
watchers: make(map[uint64]*watcherSession),
5962
}
63+
wn.stopper = stop.NewStopper("event-notifier", stop.WithLogger(wn.logger))
64+
return wn
6065
}
6166

6267
func (wn *watcherNotifier) handleCreateWatcher(req *rpcpb.Request, resp *rpcpb.Response, session goetty.IOSession) error {
@@ -131,33 +136,36 @@ func (wn *watcherNotifier) doNotify(evt rpcpb.EventNotify) {
131136
}
132137

133138
func (wn *watcherNotifier) start() {
134-
go func() {
135-
defer func() {
136-
if err := recover(); err != nil {
137-
wn.logger.Error("fail to notify event, restart later", zap.Any("error", err))
138-
wn.start()
139-
}
140-
}()
139+
wn.stopper.RunTask(context.Background(), func(ctx context.Context) {
140+
eventC := wn.cluster.ChangedEventNotifier()
141+
if eventC == nil {
142+
wn.logger.Info("watcher notifer exit with nil event channel")
143+
return
144+
}
141145

142146
for {
143-
evt, ok := <-wn.cluster.ChangedEventNotifier()
144-
if !ok {
147+
select {
148+
case <-ctx.Done():
145149
wn.logger.Info("watcher notifer exit")
146150
return
151+
case evt, ok := <-eventC:
152+
if !ok {
153+
wn.logger.Info("watcher notifer exit with channel closed")
154+
return
155+
}
156+
wn.doNotify(evt)
147157
}
148-
149-
wn.doNotify(evt)
150158
}
151-
}()
159+
})
152160
}
153161

154162
func (wn *watcherNotifier) stop() {
155163
wn.Lock()
156-
defer wn.Unlock()
157-
158164
for _, wt := range wn.watchers {
159165
wn.doClearWatcherLocked(wt)
160166
}
161167
wn.watchers = nil
162168
wn.logger.Info("watcher notifier stopped")
169+
wn.Unlock()
170+
wn.stopper.Stop()
163171
}

0 commit comments

Comments
 (0)