Skip to content

Commit 1fda816

Browse files
Fix #238 timeout to execute read/write operations (#239)
1 parent eb3dad2 commit 1fda816

File tree

23 files changed

+227
-118
lines changed

23 files changed

+227
-118
lines changed

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ example-http: http; $(info ======== compiled matrixcube http example:)
2626

2727
.PHONY: test
2828
test: ; $(info ======== test matrixcube)
29-
go test $(RACE) -count=1 -v -timeout 600s $(PKGNAME)/storage
30-
go test $(RACE) -count=1 -v -timeout 600s $(PKGNAME)/raftstore
29+
go test $(RACE) -count=1 -v -timeout 300s $(PKGNAME)/storage
30+
go test $(RACE) -count=1 -v -timeout 300s $(PKGNAME)/raftstore
3131

3232
.PHONY: race-test
3333
race-test: override RACE=-race

components/prophet/client.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -734,9 +734,10 @@ func (c *asyncClient) initLeaderConn(conn goetty.IOSession, timeout time.Duratio
734734
func (c *asyncClient) maybeRegisterContainer() {
735735
if c.containerID > 0 {
736736
req := &rpcpb.Request{}
737+
req.ID = c.nextID()
737738
req.Type = rpcpb.TypeRegisterContainer
738739
req.ContainerID = c.containerID
739-
c.asyncDo(req, nil)
740+
c.doWrite(newAsyncCtx(req, nil))
740741
}
741742
}
742743

components/prophet/join/join.go

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,13 @@ func PrepareJoinCluster(ctx context.Context, cfg *config.Config) (*clientv3.Clie
128128
defer client.Close()
129129

130130
for {
131+
util.GetLogger().Infof("%s(%s) begin to check etcd members",
132+
cfg.Name,
133+
cfg.DataDir)
131134
checkMembers(client, cfg)
135+
util.GetLogger().Infof("%s(%s) end to check etcd members",
136+
cfg.Name,
137+
cfg.DataDir)
132138

133139
var prophets []string
134140
// - A new Prophet joins an existing cluster.
@@ -138,12 +144,18 @@ func PrepareJoinCluster(ctx context.Context, cfg *config.Config) (*clientv3.Clie
138144
// First adds member through the API
139145
resp, err := util.AddEtcdMember(client, []string{cfg.EmbedEtcd.AdvertisePeerUrls})
140146
if err != nil {
141-
util.GetLogger().Errorf("add member to embed etcd failed with %+v, retry later", err)
147+
util.GetLogger().Errorf("%s(%s) add member to embed etcd failed with %+v, retry later",
148+
cfg.Name,
149+
cfg.DataDir,
150+
err)
142151
time.Sleep(time.Millisecond * 500)
143152
continue
144153
}
145154

146-
util.GetLogger().Infof("%s added into embed etcd cluster with resp %+v", cfg.Name, resp)
155+
util.GetLogger().Infof("%s(%s) added into embed etcd cluster with resp %+v",
156+
cfg.Name,
157+
cfg.DataDir,
158+
resp)
147159

148160
for _, m := range resp.Members {
149161
if m.Name != "" {
@@ -163,6 +175,10 @@ func PrepareJoinCluster(ctx context.Context, cfg *config.Config) (*clientv3.Clie
163175

164176
c, e, err := startEmbedEtcd(ctx, cfg)
165177
if err != nil && strings.Contains(err.Error(), "member count is unequal") {
178+
util.GetLogger().Infof("%s(%s) start embed etcd failed with %+v, retry",
179+
cfg.Name,
180+
cfg.DataDir,
181+
err)
166182
continue
167183
}
168184
if err != nil {
@@ -186,7 +202,10 @@ func PrepareJoinCluster(ctx context.Context, cfg *config.Config) (*clientv3.Clie
186202
util.GetLogger().Fatalf("write data path failed with %+v",
187203
err)
188204
}
189-
205+
util.GetLogger().Errorf("%s(%s) save InitialCluster with %+v",
206+
cfg.Name,
207+
cfg.DataDir,
208+
cfg.EmbedEtcd.InitialCluster)
190209
return c, e, nil
191210
}
192211
}
@@ -196,7 +215,9 @@ OUTER:
196215
for {
197216
listResp, err := util.ListEtcdMembers(client)
198217
if err != nil {
199-
util.GetLogger().Errorf("list embed etcd members failed with %+v, retry later",
218+
util.GetLogger().Errorf("%s(%s) list embed etcd members failed with %+v, retry later",
219+
cfg.Name,
220+
cfg.DataDir,
200221
err)
201222
time.Sleep(time.Second)
202223
continue
@@ -205,7 +226,11 @@ OUTER:
205226
for _, m := range listResp.Members {
206227
if len(m.Name) == 0 {
207228
// A new member added, but not started
208-
util.GetLogger().Warningf("there is a member that has not joined successfully")
229+
util.GetLogger().Warningf("%s(%s) there is a member that has not joined successfully, member %+v, self %+v",
230+
cfg.Name,
231+
cfg.DataDir,
232+
m.PeerURLs,
233+
cfg.EmbedEtcd.PeerUrls)
209234
time.Sleep(time.Second)
210235
continue OUTER
211236
}

components/prophet/prophet_leader.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ func (p *defaultProphet) createEventNotifer() {
7979
func (p *defaultProphet) stopEventNotifer() {
8080
if p.wn != nil {
8181
p.wn.stop()
82+
p.wn = nil
8283
}
8384
}
8485

components/prophet/prophet_watcher.go

Lines changed: 44 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package prophet
1515

1616
import (
17+
"fmt"
1718
"sync"
1819
"sync/atomic"
1920

@@ -43,13 +44,16 @@ func (wt *watcherSession) notify(evt rpcpb.EventNotify) error {
4344
}
4445

4546
type watcherNotifier struct {
46-
watchers sync.Map
47+
sync.Mutex
48+
49+
watchers map[uint64]*watcherSession
4750
cluster *cluster.RaftCluster
4851
}
4952

5053
func newWatcherNotifier(cluster *cluster.RaftCluster) *watcherNotifier {
5154
return &watcherNotifier{
52-
cluster: cluster,
55+
cluster: cluster,
56+
watchers: make(map[uint64]*watcherSession),
5357
}
5458
}
5559

@@ -84,21 +88,46 @@ func (wn *watcherNotifier) handleCreateWatcher(req *rpcpb.Request, resp *rpcpb.R
8488
resp.Event.InitEvent = rsp
8589
}
8690

87-
wn.watchers.Store(session.ID(), &watcherSession{
88-
flag: req.CreateWatcher.Flag,
89-
session: session,
90-
})
91+
return wn.addWatcher(req.CreateWatcher.Flag, session)
92+
}
93+
94+
return nil
95+
}
96+
97+
func (wn *watcherNotifier) addWatcher(flag uint32, session goetty.IOSession) error {
98+
wn.Lock()
99+
defer wn.Unlock()
100+
101+
if wn.watchers == nil {
102+
return fmt.Errorf("watcher notifier stopped")
91103
}
92104

105+
wn.watchers[session.ID()] = &watcherSession{
106+
flag: flag,
107+
session: session,
108+
}
93109
return nil
94110
}
95111

96-
func (wn *watcherNotifier) clearWatcher(w *watcherSession) {
97-
wn.watchers.Delete(w.session.ID())
112+
func (wn *watcherNotifier) doClearWatcherLocked(w *watcherSession) {
113+
delete(wn.watchers, w.session.ID())
114+
w.session.Close()
98115
util.GetLogger().Infof("watcher %s removed",
99116
w.session.RemoteAddr())
100117
}
101118

119+
func (wn *watcherNotifier) doNotify(evt rpcpb.EventNotify) {
120+
wn.Lock()
121+
defer wn.Unlock()
122+
123+
for _, wt := range wn.watchers {
124+
err := wt.notify(evt)
125+
if err != nil {
126+
wn.doClearWatcherLocked(wt)
127+
}
128+
}
129+
}
130+
102131
func (wn *watcherNotifier) start() {
103132
go func() {
104133
defer func() {
@@ -108,40 +137,25 @@ func (wn *watcherNotifier) start() {
108137
}
109138
}()
110139

111-
var closed []*watcherSession
112140
for {
113141
evt, ok := <-wn.cluster.ChangedEventNotifier()
114142
if !ok {
115143
util.GetLogger().Infof("watcher notifer exited")
116144
return
117145
}
118146

119-
if len(closed) > 0 {
120-
closed = closed[:0]
121-
}
122-
123-
wn.watchers.Range(func(key, value interface{}) bool {
124-
wt := value.(*watcherSession)
125-
err := wt.notify(evt)
126-
if err != nil {
127-
closed = append(closed, wt)
128-
}
129-
return true
130-
})
131-
132-
for _, w := range closed {
133-
wn.clearWatcher(w)
134-
}
147+
wn.doNotify(evt)
135148
}
136149
}()
137150
}
138151

139152
func (wn *watcherNotifier) stop() {
140-
wn.watchers.Range(func(key, value interface{}) bool {
141-
wn.watchers.Delete(key)
142-
value.(*watcherSession).session.Close()
143-
return true
144-
})
153+
wn.Lock()
154+
defer wn.Unlock()
145155

156+
for _, wt := range wn.watchers {
157+
wn.doClearWatcherLocked(wt)
158+
}
159+
wn.watchers = nil
146160
util.GetLogger().Infof("watcher notifier stopped")
147161
}

components/prophet/storage/storage.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,7 @@ func (s *storage) PutBootstrapped(container metadata.Container, resources ...met
510510
batch.SaveValues = append(batch.SaveValues, string(v))
511511
}
512512

513-
ok, _, err := s.kv.SaveIfNotExists(s.clusterPath, string(format.UInt64ToString(clusterID)), batch)
513+
ok, _, err := s.kv.SaveIfNotExists(s.clusterPath, format.Uint64ToString(clusterID), batch)
514514
return ok, err
515515
}
516516

@@ -547,9 +547,9 @@ func (s *storage) containerWeightPath(id uint64, typ string) string {
547547
}
548548

549549
func (s *storage) jobKey(jobType metapb.JobType) string {
550-
return path.Join(s.jobPath, string(format.UInt64ToString(uint64(jobType))))
550+
return path.Join(s.jobPath, format.Uint64ToString(uint64(jobType)))
551551
}
552552

553553
func (s *storage) jobDataKey(jobType metapb.JobType) string {
554-
return path.Join(s.jobDataPath, string(format.UInt64ToString(uint64(jobType))))
554+
return path.Join(s.jobDataPath, format.Uint64ToString(uint64(jobType)))
555555
}

components/prophet/util/keyutil/util.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,5 @@ func BuildKeyRangeKey(group uint64, startKey, endKey []byte) string {
2929

3030
// GetGroupFromRangeKey return group from range key
3131
func GetGroupFromRangeKey(key string) uint64 {
32-
return format.MustParseStrUInt64(strings.Split(key, "-")[0])
32+
return format.MustParseStringUint64(strings.Split(key, "-")[0])
3333
}

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ require (
1111
github.com/cockroachdb/pebble v0.0.0-20210503173641-1387689d3d7c
1212
github.com/coreos/go-semver v0.3.0
1313
github.com/docker/go-units v0.4.0
14-
github.com/fagongzi/goetty v1.11.0
14+
github.com/fagongzi/goetty v1.11.1
1515
github.com/fagongzi/log v0.0.0-20191122063922-293b75312445
16-
github.com/fagongzi/util v0.0.0-20210409031311-a10fdf8fbd7a
16+
github.com/fagongzi/util v0.0.0-20210923134909-bccc37b5040d
1717
github.com/go-ole/go-ole v1.2.4 // indirect
1818
github.com/gogo/protobuf v1.3.2
1919
github.com/google/btree v1.0.1

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,13 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m
118118
github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
119119
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
120120
github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw=
121-
github.com/fagongzi/goetty v1.11.0 h1:Ld65qimH2AnMX0O7KEIssiLGbFNNCRK5+m3pBFOMmCM=
122-
github.com/fagongzi/goetty v1.11.0/go.mod h1:42WH1Shi0NYzyop19JaY5oabuk1OzB9AWNDqbF/NdP4=
121+
github.com/fagongzi/goetty v1.11.1 h1:Y2jnmkpTXdBmpQ7dchfUgPLcZOdQHvfVhOjNapuYNFA=
122+
github.com/fagongzi/goetty v1.11.1/go.mod h1:42WH1Shi0NYzyop19JaY5oabuk1OzB9AWNDqbF/NdP4=
123123
github.com/fagongzi/log v0.0.0-20191122063922-293b75312445 h1:XhTJpo5oMYnjgQPq28pufQ/XDGoQXi1x8qa87A4Fsb8=
124124
github.com/fagongzi/log v0.0.0-20191122063922-293b75312445/go.mod h1:1aUifpZCjAdZK7L3vRRl2cpSktbKasAU5vOSyLSCODs=
125125
github.com/fagongzi/util v0.0.0-20201116094402-221cc40c4593/go.mod h1:jYDIbpaqHXCCQ7QIDXRVfsQYAGKSNNb6N8BPTgdpcdE=
126-
github.com/fagongzi/util v0.0.0-20210409031311-a10fdf8fbd7a h1:TaySCZ6YAnc3dhVc2tzfw6Pu1Nufee98gkFBCI/AxKY=
127-
github.com/fagongzi/util v0.0.0-20210409031311-a10fdf8fbd7a/go.mod h1:5cqSns2zMRcJeVGvAqeTrbXFqh5AqBFr5uVKP9T2kiE=
126+
github.com/fagongzi/util v0.0.0-20210923134909-bccc37b5040d h1:1pILVCatHj3eVo9i52dZyY4BwjTmSIeN+/hoJh8rD0Y=
127+
github.com/fagongzi/util v0.0.0-20210923134909-bccc37b5040d/go.mod h1:5cqSns2zMRcJeVGvAqeTrbXFqh5AqBFr5uVKP9T2kiE=
128128
github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072/go.mod h1:duJ4Jxv5lDcvg4QuQr0oowTf7dz4/CR8NtyCooz9HL8=
129129
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
130130
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=

raftstore/peer_apply.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,9 @@ func (pr *peerReplica) doCompactRaftLog(shardID, startIndex, endIndex uint64) er
9393
}
9494

9595
func (pr *peerReplica) doApplyingSnapshotJob() error {
96-
logger.Infof("shard %d begin apply snapshot data", pr.shardID)
96+
logger.Infof("shard %d peer %d begin apply snapshot data",
97+
pr.shardID,
98+
pr.peer.ID)
9799
localState, err := pr.ps.loadShardLocalState(pr.ps.applySnapJob)
98100
if err != nil {
99101
logger.Fatalf("shard %d apply snap load local state failed with %+v",
@@ -132,8 +134,9 @@ func (pr *peerReplica) doApplyingSnapshotJob() error {
132134
pr.store.aware.SnapshotApplied(pr.ps.shard)
133135
}
134136
pr.stopRaftTick = false
135-
logger.Infof("shard %d apply snapshot data complete, %+v",
137+
logger.Infof("shard %d peer %d apply snapshot data complete, %+v",
136138
pr.shardID,
139+
pr.peer.ID,
137140
pr.ps.raftLocalState.HardState)
138141
return nil
139142
}

0 commit comments

Comments
 (0)