Skip to content

Commit cdb3d88

Browse files
authored
VReplication: Show all tables in copy progress output (#18825)
Signed-off-by: Matt Lord <[email protected]>
1 parent 8fa9b87 commit cdb3d88

File tree

9 files changed

+1223
-702
lines changed

9 files changed

+1223
-702
lines changed

go/cmd/vtctldclient/command/vreplication/common/utils.go

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,16 @@ limitations under the License.
1717
package common
1818

1919
import (
20-
"bytes"
2120
"context"
2221
"encoding/json"
2322
"errors"
2423
"fmt"
24+
"sort"
2525
"strings"
2626
"time"
2727

2828
"github.com/spf13/cobra"
29+
"golang.org/x/exp/maps"
2930

3031
"vitess.io/vitess/go/cmd/vtctldclient/cli"
3132
"vitess.io/vitess/go/vt/key"
@@ -260,29 +261,54 @@ func GetTabletSelectionPreference(cmd *cobra.Command) tabletmanagerdatapb.Tablet
260261
}
261262

262263
func OutputStatusResponse(resp *vtctldatapb.WorkflowStatusResponse, format string) error {
263-
var output []byte
264-
var err error
265264
if format == "json" {
266-
output, err = cli.MarshalJSONPretty(resp)
265+
output, err := cli.MarshalJSONPretty(resp)
267266
if err != nil {
268267
return err
269268
}
270-
} else {
271-
tout := bytes.Buffer{}
272-
tout.WriteString(fmt.Sprintf("The following vreplication streams exist for workflow %s.%s:\n\n",
273-
BaseOptions.TargetKeyspace, BaseOptions.Workflow))
274-
for _, shardstreams := range resp.ShardStreams {
275-
for _, shardstream := range shardstreams.Streams {
276-
tablet := fmt.Sprintf("%s-%d", shardstream.Tablet.Cell, shardstream.Tablet.Uid)
277-
tout.WriteString(fmt.Sprintf("id=%d on %s/%s: Status: %s. %s.\n",
278-
shardstream.Id, BaseOptions.TargetKeyspace, tablet, shardstream.Status, shardstream.Info))
269+
fmt.Println(string(output))
270+
return nil
271+
}
272+
273+
// Plain text formatted output.
274+
tout := strings.Builder{}
275+
tout.WriteString(fmt.Sprintf("The following vreplication streams exist for workflow %s.%s:\n\n",
276+
BaseOptions.TargetKeyspace, BaseOptions.Workflow))
277+
for _, shardstreams := range resp.ShardStreams {
278+
for _, shardstream := range shardstreams.Streams {
279+
tablet := fmt.Sprintf("%s-%d", shardstream.Tablet.Cell, shardstream.Tablet.Uid)
280+
tout.WriteString(fmt.Sprintf("id=%d on %s/%s: Status: %s. %s.\n",
281+
shardstream.Id, BaseOptions.TargetKeyspace, tablet, shardstream.Status, shardstream.Info))
282+
}
283+
}
284+
if len(resp.TableCopyState) > 0 {
285+
tables := maps.Keys(resp.TableCopyState)
286+
sort.Strings(tables) // Ensure that the output is intuitive and consistent
287+
tout.WriteString("\nTable Copy Status:")
288+
for _, table := range tables {
289+
// Unfortunately we cannot use the prototext marshaler here as it has no option
290+
// to emit unpopulated fields.
291+
tcs := resp.TableCopyState[table]
292+
tout.WriteString("\n\t")
293+
tout.WriteString(table)
294+
tout.WriteString(": ")
295+
tout.WriteString(fmt.Sprintf("RowsCopied:%d, ", tcs.RowsCopied))
296+
tout.WriteString(fmt.Sprintf("RowsTotal:%d, ", tcs.RowsTotal))
297+
tout.WriteString(fmt.Sprintf("RowsPercentage:%.2f, ", tcs.RowsPercentage))
298+
tout.WriteString(fmt.Sprintf("BytesCopied:%d, ", tcs.BytesCopied))
299+
tout.WriteString(fmt.Sprintf("BytesTotal:%d, ", tcs.BytesTotal))
300+
tout.WriteString(fmt.Sprintf("BytesPercentage:%.2f", tcs.BytesPercentage))
301+
// If we're talking to an older server it won't provide this field. We should
302+
// not show a wrong or confusing value in this case so elide it from the output.
303+
if tcs.Phase != vtctldatapb.TableCopyPhase_UNKNOWN {
304+
tout.WriteString(fmt.Sprintf(", Phase:%s", tcs.Phase))
279305
}
280306
}
281-
tout.WriteString("\nTraffic State: ")
282-
tout.WriteString(resp.TrafficState)
283-
output = tout.Bytes()
307+
tout.WriteString("\n")
284308
}
285-
fmt.Println(string(output))
309+
tout.WriteString("\nTraffic State: ")
310+
tout.WriteString(resp.TrafficState)
311+
fmt.Println(tout.String())
286312
return nil
287313
}
288314

go/cmd/vtctldclient/command/vreplication/common/utils_test.go

Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@ limitations under the License.
1717
package common_test
1818

1919
import (
20+
"bytes"
2021
"context"
22+
"io"
23+
"os"
2124
"testing"
2225
"time"
2326

@@ -33,6 +36,9 @@ import (
3336
"vitess.io/vitess/go/vt/vtctl/vtctldclient"
3437
"vitess.io/vitess/go/vt/vtenv"
3538
"vitess.io/vitess/go/vt/vttablet/tmclient"
39+
40+
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
41+
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
3642
)
3743

3844
func TestParseAndValidateCreateOptions(t *testing.T) {
@@ -152,3 +158,268 @@ func SetupLocalVtctldClient(t *testing.T, ctx context.Context, cells ...string)
152158
require.NoError(t, err, "failed to create local vtctld client which uses an internal vtctld server")
153159
common.SetClient(client)
154160
}
161+
162+
func TestOutputStatusResponse(t *testing.T) {
163+
cell := "zone1"
164+
common.BaseOptions.TargetKeyspace = "customer"
165+
common.BaseOptions.Workflow = "commerce2customer"
166+
tests := []struct {
167+
name string
168+
resp *vtctldatapb.WorkflowStatusResponse
169+
want string
170+
format string
171+
}{
172+
{
173+
name: "plain-text format",
174+
resp: &vtctldatapb.WorkflowStatusResponse{
175+
TableCopyState: map[string]*vtctldatapb.WorkflowStatusResponse_TableCopyState{
176+
"table1": {
177+
RowsCopied: 20,
178+
RowsTotal: 20,
179+
RowsPercentage: 100,
180+
BytesCopied: 1000,
181+
BytesTotal: 1000,
182+
BytesPercentage: 100,
183+
Phase: vtctldatapb.TableCopyPhase_COMPLETE,
184+
},
185+
"table2": {
186+
RowsCopied: 10,
187+
RowsTotal: 50,
188+
RowsPercentage: 20,
189+
BytesCopied: 1000,
190+
BytesTotal: 5000,
191+
BytesPercentage: 20,
192+
Phase: vtctldatapb.TableCopyPhase_IN_PROGRESS,
193+
},
194+
"table3": {
195+
RowsCopied: 0,
196+
RowsTotal: 2000,
197+
RowsPercentage: 0,
198+
BytesCopied: 0,
199+
BytesTotal: 200000,
200+
BytesPercentage: 0,
201+
Phase: vtctldatapb.TableCopyPhase_NOT_STARTED,
202+
},
203+
},
204+
ShardStreams: map[string]*vtctldatapb.WorkflowStatusResponse_ShardStreams{
205+
"customer/0": {
206+
Streams: []*vtctldatapb.WorkflowStatusResponse_ShardStreamState{
207+
{
208+
Id: 1,
209+
Tablet: &topodatapb.TabletAlias{
210+
Cell: cell,
211+
Uid: 1,
212+
},
213+
SourceShard: "commerce/0",
214+
Position: "f3918180-b58f-11f0-9085-360472309971:1-29655",
215+
Status: "Copying",
216+
Info: "VStream Lag: -1s; ; Tx time: Thu Oct 30 13:05:02 2025",
217+
},
218+
},
219+
},
220+
},
221+
TrafficState: "Reads Not Switched. Writes Not Switched",
222+
},
223+
want: `The following vreplication streams exist for workflow customer.commerce2customer:
224+
225+
id=1 on customer/zone1-1: Status: Copying. VStream Lag: -1s; ; Tx time: Thu Oct 30 13:05:02 2025.
226+
227+
Table Copy Status:
228+
table1: RowsCopied:20, RowsTotal:20, RowsPercentage:100.00, BytesCopied:1000, BytesTotal:1000, BytesPercentage:100.00, Phase:COMPLETE
229+
table2: RowsCopied:10, RowsTotal:50, RowsPercentage:20.00, BytesCopied:1000, BytesTotal:5000, BytesPercentage:20.00, Phase:IN_PROGRESS
230+
table3: RowsCopied:0, RowsTotal:2000, RowsPercentage:0.00, BytesCopied:0, BytesTotal:200000, BytesPercentage:0.00, Phase:NOT_STARTED
231+
232+
Traffic State: Reads Not Switched. Writes Not Switched
233+
`,
234+
},
235+
{
236+
// An older server won't send the new Phase field so it will have the
237+
// zero value for it in the message. We should then not display it in
238+
// the output.
239+
name: "plain-text format with old server",
240+
resp: &vtctldatapb.WorkflowStatusResponse{
241+
TableCopyState: map[string]*vtctldatapb.WorkflowStatusResponse_TableCopyState{
242+
"table1": {
243+
RowsCopied: 20,
244+
RowsTotal: 20,
245+
RowsPercentage: 100,
246+
BytesCopied: 1000,
247+
BytesTotal: 1000,
248+
BytesPercentage: 100,
249+
Phase: vtctldatapb.TableCopyPhase_UNKNOWN,
250+
},
251+
"table2": {
252+
RowsCopied: 10,
253+
RowsTotal: 50,
254+
RowsPercentage: 20,
255+
BytesCopied: 1000,
256+
BytesTotal: 5000,
257+
BytesPercentage: 20,
258+
Phase: vtctldatapb.TableCopyPhase_UNKNOWN,
259+
},
260+
"table3": {
261+
RowsCopied: 0,
262+
RowsTotal: 2000,
263+
RowsPercentage: 0,
264+
BytesCopied: 0,
265+
BytesTotal: 200000,
266+
BytesPercentage: 0,
267+
Phase: vtctldatapb.TableCopyPhase_UNKNOWN,
268+
},
269+
},
270+
ShardStreams: map[string]*vtctldatapb.WorkflowStatusResponse_ShardStreams{
271+
"customer/0": {
272+
Streams: []*vtctldatapb.WorkflowStatusResponse_ShardStreamState{
273+
{
274+
Id: 1,
275+
Tablet: &topodatapb.TabletAlias{
276+
Cell: cell,
277+
Uid: 1,
278+
},
279+
SourceShard: "commerce/0",
280+
Position: "f3918180-b58f-11f0-9085-360472309971:1-29655",
281+
Status: "Copying",
282+
Info: "VStream Lag: -1s; ; Tx time: Thu Oct 30 13:05:02 2025",
283+
},
284+
},
285+
},
286+
},
287+
TrafficState: "Reads Not Switched. Writes Not Switched",
288+
},
289+
want: `The following vreplication streams exist for workflow customer.commerce2customer:
290+
291+
id=1 on customer/zone1-1: Status: Copying. VStream Lag: -1s; ; Tx time: Thu Oct 30 13:05:02 2025.
292+
293+
Table Copy Status:
294+
table1: RowsCopied:20, RowsTotal:20, RowsPercentage:100.00, BytesCopied:1000, BytesTotal:1000, BytesPercentage:100.00
295+
table2: RowsCopied:10, RowsTotal:50, RowsPercentage:20.00, BytesCopied:1000, BytesTotal:5000, BytesPercentage:20.00
296+
table3: RowsCopied:0, RowsTotal:2000, RowsPercentage:0.00, BytesCopied:0, BytesTotal:200000, BytesPercentage:0.00
297+
298+
Traffic State: Reads Not Switched. Writes Not Switched
299+
`,
300+
},
301+
{
302+
name: "json format",
303+
resp: &vtctldatapb.WorkflowStatusResponse{
304+
TableCopyState: map[string]*vtctldatapb.WorkflowStatusResponse_TableCopyState{
305+
"table1": {
306+
RowsCopied: 20,
307+
RowsTotal: 20,
308+
RowsPercentage: 100,
309+
BytesCopied: 1000,
310+
BytesTotal: 1000,
311+
BytesPercentage: 100,
312+
Phase: vtctldatapb.TableCopyPhase_COMPLETE,
313+
},
314+
"table2": {
315+
RowsCopied: 10,
316+
RowsTotal: 50,
317+
RowsPercentage: 20,
318+
BytesCopied: 1000,
319+
BytesTotal: 5000,
320+
BytesPercentage: 20,
321+
Phase: vtctldatapb.TableCopyPhase_IN_PROGRESS,
322+
},
323+
"table3": {
324+
RowsCopied: 0,
325+
RowsTotal: 2000,
326+
RowsPercentage: 0,
327+
BytesCopied: 0,
328+
BytesTotal: 200000,
329+
BytesPercentage: 0,
330+
Phase: vtctldatapb.TableCopyPhase_NOT_STARTED,
331+
},
332+
},
333+
ShardStreams: map[string]*vtctldatapb.WorkflowStatusResponse_ShardStreams{
334+
"customer/0": {
335+
Streams: []*vtctldatapb.WorkflowStatusResponse_ShardStreamState{
336+
{
337+
Id: 1,
338+
Tablet: &topodatapb.TabletAlias{
339+
Cell: cell,
340+
Uid: 1,
341+
},
342+
SourceShard: "commerce/0",
343+
Position: "f3918180-b58f-11f0-9085-360472309971:1-29655",
344+
Status: "Copying",
345+
Info: "VStream Lag: -1s; ; Tx time: Thu Oct 30 13:05:02 2025",
346+
},
347+
},
348+
},
349+
},
350+
TrafficState: "Reads Not Switched. Writes Not Switched",
351+
},
352+
format: "json",
353+
want: `{
354+
"table_copy_state": {
355+
"table1": {
356+
"rows_copied": "20",
357+
"rows_total": "20",
358+
"rows_percentage": 100,
359+
"bytes_copied": "1000",
360+
"bytes_total": "1000",
361+
"bytes_percentage": 100,
362+
"phase": "COMPLETE"
363+
},
364+
"table2": {
365+
"rows_copied": "10",
366+
"rows_total": "50",
367+
"rows_percentage": 20,
368+
"bytes_copied": "1000",
369+
"bytes_total": "5000",
370+
"bytes_percentage": 20,
371+
"phase": "IN_PROGRESS"
372+
},
373+
"table3": {
374+
"rows_copied": "0",
375+
"rows_total": "2000",
376+
"rows_percentage": 0,
377+
"bytes_copied": "0",
378+
"bytes_total": "200000",
379+
"bytes_percentage": 0,
380+
"phase": "NOT_STARTED"
381+
}
382+
},
383+
"shard_streams": {
384+
"customer/0": {
385+
"streams": [
386+
{
387+
"id": 1,
388+
"tablet": {
389+
"cell": "zone1",
390+
"uid": 1
391+
},
392+
"source_shard": "commerce/0",
393+
"position": "f3918180-b58f-11f0-9085-360472309971:1-29655",
394+
"status": "Copying",
395+
"info": "VStream Lag: -1s; ; Tx time: Thu Oct 30 13:05:02 2025"
396+
}
397+
]
398+
}
399+
},
400+
"traffic_state": "Reads Not Switched. Writes Not Switched"
401+
}
402+
`,
403+
},
404+
}
405+
406+
for _, tt := range tests {
407+
t.Run(tt.name, func(t *testing.T) {
408+
origStdout := os.Stdout
409+
defer func() {
410+
os.Stdout = origStdout
411+
}()
412+
r, w, _ := os.Pipe()
413+
os.Stdout = w
414+
415+
err := common.OutputStatusResponse(tt.resp, tt.format)
416+
w.Close()
417+
require.NoError(t, err)
418+
var buf bytes.Buffer
419+
_, err = io.Copy(&buf, r)
420+
require.NoError(t, err)
421+
output := buf.String()
422+
require.Equal(t, tt.want, output)
423+
})
424+
}
425+
}

0 commit comments

Comments
 (0)