Skip to content

Commit d5beced

Browse files
[release-21.0] VDiff: fix race when a vdiff resumes on vttablet restart (#17638) (#17694)
Signed-off-by: Rohit Nayak <[email protected]> Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com> Co-authored-by: Rohit Nayak <[email protected]>
1 parent 1cdad83 commit d5beced

File tree

6 files changed

+58
-29
lines changed

6 files changed

+58
-29
lines changed

go/vt/vttablet/tabletmanager/shard_sync.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,15 @@ func (tm *TabletManager) shardSyncLoop(ctx context.Context, notifyChan <-chan st
8585
// We don't use the watch event except to know that we should
8686
// re-read the shard record, and to know if the watch dies.
8787
log.Info("Change in shard record")
88-
if event.Err != nil {
89-
// The watch failed. Stop it so we start a new one if needed.
90-
log.Errorf("Shard watch failed: %v", event.Err)
91-
shardWatch.stop()
88+
89+
if event != nil {
90+
if event.Err != nil {
91+
// The watch failed. Stop it so we start a new one if needed.
92+
log.Errorf("Shard watch failed: %v", event.Err)
93+
shardWatch.stop()
94+
}
95+
} else {
96+
log.Infof("Got a nil event from the shard watcher for %s. This should not happen.", tm.tabletAlias)
9297
}
9398
case <-ctx.Done():
9499
// Our context was cancelled. Terminate the loop.

go/vt/vttablet/tabletmanager/vdiff/controller.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ type controller struct {
8383
TableDiffPhaseTimings *stats.Timings
8484
}
8585

86-
func newController(ctx context.Context, row sqltypes.RowNamedValues, dbClientFactory func() binlogplayer.DBClient,
86+
func newController(row sqltypes.RowNamedValues, dbClientFactory func() binlogplayer.DBClient,
8787
ts *topo.Server, vde *Engine, options *tabletmanagerdata.VDiffOptions) (*controller, error) {
8888

8989
log.Infof("VDiff controller initializing for %+v", row)
@@ -104,9 +104,6 @@ func newController(ctx context.Context, row sqltypes.RowNamedValues, dbClientFac
104104
TableDiffRowCounts: stats.NewCountersWithSingleLabel("", "", "Rows"),
105105
TableDiffPhaseTimings: stats.NewTimings("", "", "", "TablePhase"),
106106
}
107-
ctx, ct.cancel = context.WithCancel(ctx)
108-
go ct.run(ctx)
109-
110107
return ct, nil
111108
}
112109

go/vt/vttablet/tabletmanager/vdiff/engine.go

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -146,19 +146,21 @@ func (vde *Engine) openLocked(ctx context.Context) error {
146146
vde.resetControllers()
147147
}
148148

149+
globalStats.initControllerStats()
150+
149151
// At this point the tablet has no controllers running. So
150152
// we want to start any VDiffs that have not been explicitly
151153
// stopped or otherwise finished.
152154
rows, err := vde.getVDiffsToRun(ctx)
153155
if err != nil {
154156
return err
155157
}
158+
156159
vde.ctx, vde.cancel = context.WithCancel(ctx)
157160
vde.isOpen = true // now we are open and have things to close
158161
if err := vde.initControllers(rows); err != nil {
159162
return err
160163
}
161-
vde.updateStats()
162164

163165
// At this point we've fully and successfully opened so begin
164166
// retrying error'd VDiffs until the engine is closed.
@@ -212,7 +214,7 @@ func (vde *Engine) retry(ctx context.Context, err error) {
212214
// addController creates a new controller using the given vdiff record and adds it to the engine.
213215
// You must already have the main engine mutex (mu) locked before calling this.
214216
func (vde *Engine) addController(row sqltypes.RowNamedValues, options *tabletmanagerdata.VDiffOptions) error {
215-
ct, err := newController(vde.ctx, row, vde.dbClientFactoryDba, vde.ts, vde, options)
217+
ct, err := newController(row, vde.dbClientFactoryDba, vde.ts, vde, options)
216218
if err != nil {
217219
return fmt.Errorf("controller could not be initialized for stream %+v on tablet %v",
218220
row, vde.thisTablet.Alias)
@@ -221,6 +223,10 @@ func (vde *Engine) addController(row sqltypes.RowNamedValues, options *tabletman
221223
globalStats.mu.Lock()
222224
defer globalStats.mu.Unlock()
223225
globalStats.controllers[ct.id] = ct
226+
227+
controllerCtx, cancel := context.WithCancel(vde.ctx)
228+
ct.cancel = cancel
229+
go ct.run(controllerCtx)
224230
return nil
225231
}
226232

@@ -395,16 +401,4 @@ func (vde *Engine) resetControllers() {
395401
ct.Stop()
396402
}
397403
vde.controllers = make(map[int64]*controller)
398-
vde.updateStats()
399-
}
400-
401-
// updateStats must only be called while holding the engine lock.
402-
func (vre *Engine) updateStats() {
403-
globalStats.mu.Lock()
404-
defer globalStats.mu.Unlock()
405-
406-
globalStats.controllers = make(map[int64]*controller, len(vre.controllers))
407-
for id, ct := range vre.controllers {
408-
globalStats.controllers[id] = ct
409-
}
410404
}

go/vt/vttablet/tabletmanager/vdiff/framework_test.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -672,7 +672,27 @@ func (tvde *testVDiffEnv) createController(t *testing.T, id int) *controller {
672672
fmt.Sprintf("%d|%s|%s|%s|%s|%s|%s|%s|", id, uuid.New(), tvde.workflow, tstenv.KeyspaceName, tstenv.ShardName, vdiffDBName, PendingState, optionsJS),
673673
)
674674
tvde.dbClient.ExpectRequest(fmt.Sprintf("select * from _vt.vdiff where id = %d", id), noResults, nil)
675-
ct, err := newController(context.Background(), controllerQR.Named().Row(), tvde.dbClientFactory, tstenv.TopoServ, tvde.vde, tvde.opts)
675+
ct := tvde.newController(t, controllerQR)
676+
ct.sources = map[string]*migrationSource{
677+
tstenv.ShardName: {
678+
vrID: 1,
679+
shardStreamer: &shardStreamer{
680+
tablet: tvde.vde.thisTablet,
681+
shard: tstenv.ShardName,
682+
},
683+
},
684+
}
685+
ct.sourceKeyspace = tstenv.KeyspaceName
686+
687+
return ct
688+
}
689+
690+
func (tvde *testVDiffEnv) newController(t *testing.T, controllerQR *sqltypes.Result) *controller {
691+
ctx := context.Background()
692+
ct, err := newController(controllerQR.Named().Row(), tvde.dbClientFactory, tstenv.TopoServ, tvde.vde, tvde.opts)
676693
require.NoError(t, err)
694+
ctx2, cancel := context.WithCancel(ctx)
695+
ct.cancel = cancel
696+
go ct.run(ctx2)
677697
return ct
678698
}

go/vt/vttablet/tabletmanager/vdiff/stats.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,18 @@ type vdiffStats struct {
4444
RowsDiffedCount *stats.Counter
4545
}
4646

47+
func (vds *vdiffStats) initControllerStats() {
48+
vds.mu.Lock()
49+
defer vds.mu.Unlock()
50+
vds.controllers = make(map[int64]*controller)
51+
}
52+
4753
func (vds *vdiffStats) register() {
4854
globalStats.Count = stats.NewGauge("", "")
4955
globalStats.ErrorCount = stats.NewCounter("", "")
5056
globalStats.RestartedTableDiffs = stats.NewCountersWithSingleLabel("", "", "Table")
5157
globalStats.RowsDiffedCount = stats.NewCounter("", "")
58+
globalStats.initControllerStats()
5259

5360
stats.NewGaugeFunc("VDiffCount", "Number of current vdiffs", vds.numControllers)
5461

go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ limitations under the License.
1717
package vdiff
1818

1919
import (
20-
"context"
2120
"fmt"
2221
"strings"
2322
"testing"
@@ -49,8 +48,17 @@ func TestBuildPlanSuccess(t *testing.T) {
4948
)
5049

5150
vdiffenv.dbClient.ExpectRequest("select * from _vt.vdiff where id = 1", noResults, nil)
52-
ct, err := newController(context.Background(), controllerQR.Named().Row(), vdiffenv.dbClientFactory, tstenv.TopoServ, vdiffenv.vde, vdiffenv.opts)
53-
require.NoError(t, err)
51+
ct := vdenv.newController(t, controllerQR)
52+
ct.sources = map[string]*migrationSource{
53+
tstenv.ShardName: {
54+
vrID: 1,
55+
shardStreamer: &shardStreamer{
56+
tablet: vdenv.vde.thisTablet,
57+
shard: tstenv.ShardName,
58+
},
59+
},
60+
}
61+
ct.sourceKeyspace = tstenv.KeyspaceName
5462

5563
testcases := []struct {
5664
input *binlogdatapb.Rule
@@ -667,9 +675,7 @@ func TestBuildPlanFailure(t *testing.T) {
667675
fmt.Sprintf("1|%s|%s|%s|%s|%s|%s|%s|", UUID, vdiffenv.workflow, tstenv.KeyspaceName, tstenv.ShardName, vdiffDBName, PendingState, optionsJS),
668676
)
669677
vdiffenv.dbClient.ExpectRequest("select * from _vt.vdiff where id = 1", noResults, nil)
670-
ct, err := newController(context.Background(), controllerQR.Named().Row(), vdiffenv.dbClientFactory, tstenv.TopoServ, vdiffenv.vde, vdiffenv.opts)
671-
require.NoError(t, err)
672-
678+
ct := vdenv.newController(t, controllerQR)
673679
testcases := []struct {
674680
input *binlogdatapb.Rule
675681
err string

0 commit comments

Comments
 (0)