From f8547b39dd2a9e469c9c9a5c449e36c86ea2a50a Mon Sep 17 00:00:00 2001 From: Isaac Diamond Date: Fri, 10 Jan 2025 12:01:19 -0800 Subject: [PATCH] delete consumer group --- cmd/topicctl/subcmd/delete.go | 27 +++++++++++++++++++++++++++ pkg/cli/cli.go | 13 +++++++++++++ pkg/groups/groups.go | 25 +++++++++++++++++++++++++ 3 files changed, 65 insertions(+) diff --git a/cmd/topicctl/subcmd/delete.go b/cmd/topicctl/subcmd/delete.go index 27fdbd09..4cc0a30f 100644 --- a/cmd/topicctl/subcmd/delete.go +++ b/cmd/topicctl/subcmd/delete.go @@ -44,6 +44,7 @@ func init() { addSharedFlags(deleteCmd, &deleteConfig.shared) deleteCmd.AddCommand( deleteACLCmd(), + deleteGroupCmd(), ) RootCmd.AddCommand(deleteCmd) } @@ -150,3 +151,29 @@ $ topicctl delete acls --resource-type topic --resource-pattern-type literal --r cmd.MarkFlagRequired("resource-type") return cmd } + +func deleteGroupCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "group [group]", + Short: "Delete a given consumer group. Ensure the group is not active before deleting.", + Args: cobra.ExactArgs(1), + Example: `Delete group my-group`, + RunE: func(cmd *cobra.Command, args []string) error { + ctx := context.Background() + sess := session.Must(session.NewSession()) + + adminClient, err := deleteConfig.shared.getAdminClient(ctx, sess, deleteConfig.dryRun) + if err != nil { + return err + } + defer adminClient.Close() + + group := args[0] + cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner) + return cliRunner.DeleteGroup(ctx, group) + }, + } + + return cmd + +} diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index ee20f497..3eee049a 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -648,6 +648,19 @@ func (c *CLIRunner) ResetOffsets( return nil } +// DeleteGroup deletes a single consumer group. +func (c *CLIRunner) DeleteGroup(ctx context.Context, groupID string) error { + c.startSpinner() + err := groups.Delete(ctx, c.adminClient.GetConnector(), groupID) + c.stopSpinner() + if err != nil { + return err + } + + c.printer("Success") + return nil +} + // Tail prints out a stream of the latest messages in a topic. func (c *CLIRunner) Tail( ctx context.Context, diff --git a/pkg/groups/groups.go b/pkg/groups/groups.go index 6874cca2..af2bd5c0 100644 --- a/pkg/groups/groups.go +++ b/pkg/groups/groups.go @@ -225,6 +225,31 @@ func ResetOffsets( ) } +// Delete deletes a consumer group based on its groupId. +func Delete(ctx context.Context, connector *admin.Connector, groupID string) error { + describeGroupsRequest := kafka.DescribeGroupsRequest{ + GroupIDs: []string{groupID}, + } + describeGroupsResponse, err := connector.KafkaClient.DescribeGroups(ctx, &describeGroupsRequest) + if err != nil { + return err + } + + if len(describeGroupsResponse.Groups) != 1 { + return errors.New("Unexpected response length from describeGroups") + } + + if describeGroupsResponse.Groups[0].GroupState == "Dead" { + return errors.New("Group state is dead; check that group ID is valid") + } + + req := kafka.DeleteGroupsRequest{ + GroupIDs: []string{groupID}, + } + _, err = connector.KafkaClient.DeleteGroups(ctx, &req) + return err +} + // GetEarliestorLatestOffset gets earliest/latest offset for a given topic partition for resetting offsets of consumer group func GetEarliestOrLatestOffset( ctx context.Context,