Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ S3_BUCKET = "test-bucket"
S3_REGION = "us-east-1"
YDB_NAME = "local-ydb"
ENABLE_NEW_PATHS_FORMAT = true
ENABLE_BACKUPS_ENCRYPTION = true
# local-ydb image that was built from main
# Image: https://github.com/ydb-platform/ydb/pkgs/container/local-ydb/551703770
# Built from revision 07aeccc41b43c9fc0b7da7680340fbac01b81427
Expand Down
26 changes: 25 additions & 1 deletion .github/workflows/unit-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,16 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
enable_new_paths_format: [ true, false ]
include:
- enable_new_paths_format: false
enable_backups_encryption: false
- enable_new_paths_format: true
enable_backups_encryption: false
- enable_new_paths_format: true
enable_backups_encryption: true
env:
ENABLE_NEW_PATHS_FORMAT: ${{ matrix.enable_new_paths_format }}
ENABLE_BACKUPS_ENCRYPTION: ${{ matrix.enable_backups_encryption }}
steps:
- uses: actions/checkout@v4
- name: supply with s3 access keys
Expand Down Expand Up @@ -110,3 +117,20 @@ jobs:
if: ${{ matrix.enable_new_paths_format }}
run: |
docker compose down
- name: docker compose up
if: ${{ matrix.enable_backups_encryption }}
run: |
docker compose up -d
- name: run make_encrypted_backup tests
if: ${{ matrix.enable_backups_encryption }}
run: |
while [ "$(docker inspect -f {{.State.Health.Status}} local-ydbcp)" != "healthy" ]; do
echo "Waiting for container to become healthy..."
sleep 1
done
echo "Starting make_encrypted_backup tests!"
docker exec local-ydbcp sh -c './make_encrypted_backup'
- name: docker compose down
if: ${{ matrix.enable_backups_encryption }}
run: |
docker compose down
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ RUN go build -o ./make_backup ./cmd/integration/make_backup/main.go
RUN go build -o ./list_entities ./cmd/integration/list_entities/main.go
RUN go build -o ./orm ./cmd/integration/orm/main.go
RUN go build -o ./test_new_paths_format ./cmd/integration/new_paths_format/main.go
RUN go build -o ./make_encrypted_backup ./cmd/integration/make_encrypted_backup/main.go

# Command to run the executable
CMD ["./main", "--config=local_config.yaml"]
Expand Down
260 changes: 260 additions & 0 deletions cmd/integration/make_encrypted_backup/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
package main

import (
"context"
"log"
"time"
"ydbcp/cmd/integration/common"
"ydbcp/internal/types"
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

const (
containerID = "abcde"
databaseName = "/local"
ydbcpEndpoint = "0.0.0.0:50051"
databaseEndpoint = "grpcs://local-ydb:2135"
testKmsKeyID = "test-kms-key-id-123"
)

type encryptedBackupScenario struct {
name string
request *pb.MakeBackupRequest
}

type negativeEncryptedBackupScenario struct {
name string
request *pb.MakeBackupRequest
expectedStatus codes.Code
}

func newEncryptedBackupRequest(rootPath string, sourcePaths []string, kmsKeyID string) *pb.MakeBackupRequest {
encryptionSettings := &pb.EncryptionSettings{
Algorithm: pb.EncryptionSettings_AES_256_GCM,
}

if len(kmsKeyID) > 0 {
encryptionSettings.KeyEncryptionKey = &pb.EncryptionSettings_KmsKey_{
KmsKey: &pb.EncryptionSettings_KmsKey{
KeyId: kmsKeyID,
},
}
}

return &pb.MakeBackupRequest{
ContainerId: containerID,
DatabaseName: databaseName,
DatabaseEndpoint: databaseEndpoint,
RootPath: rootPath,
SourcePaths: sourcePaths,
EncryptionSettings: encryptionSettings,
}
}

func runEncryptedBackupScenario(ctx context.Context, backupClient pb.BackupServiceClient, opClient pb.OperationServiceClient, scenario encryptedBackupScenario) {
log.Printf("Running scenario: %s", scenario.name)

tbwr, err := backupClient.MakeBackup(ctx, scenario.request)
if err != nil {
log.Panicf("scenario %s: failed to make backup: %v", scenario.name, err)
}

op, err := opClient.GetOperation(
ctx, &pb.GetOperationRequest{
Id: tbwr.Id,
},
)
if err != nil {
log.Panicf("scenario %s: failed to get operation: %v", scenario.name, err)
}

if op.EncryptionSettings == nil {
log.Panicf("scenario %s: encryption settings are nil", scenario.name)
}

if op.EncryptionSettings.GetKmsKey() == nil {
log.Panicf("scenario %s: KMS key is nil", scenario.name)
}

if op.EncryptionSettings.GetKmsKey().GetKeyId() != testKmsKeyID {
log.Panicf("scenario %s: KMS key ID mismatch, expected %s, got %s",
scenario.name, testKmsKeyID, op.EncryptionSettings.GetKmsKey().GetKeyId())
}

if op.EncryptionSettings.GetAlgorithm() != pb.EncryptionSettings_AES_256_GCM {
log.Panicf("scenario %s: encryption algorithm is not AES_256_GCM", scenario.name)
}

// Wait for operation handler
time.Sleep(time.Second * 3)

ops, err := opClient.ListOperations(
ctx, &pb.ListOperationsRequest{
ContainerId: containerID,
DatabaseNameMask: databaseName,
OperationTypes: []string{types.OperationTypeTB.String()},
},
)
if err != nil {
log.Panicf("failed to list operations: %v", err)
}

var tbOperation *pb.Operation
for _, op := range ops.Operations {
if op.GetParentOperationId() == tbwr.Id {
tbOperation = op
break
}
}

if tbOperation == nil {
log.Panicf("scenario %s: TB operation not found", scenario.name)
}

// Wait for backup to complete
done := false
var backup *pb.Backup
for range 30 {
backup, err = backupClient.GetBackup(
ctx,
&pb.GetBackupRequest{Id: tbOperation.BackupId},
)
if err != nil {
log.Panicf("scenario %s: failed to get backup: %v", scenario.name, err)
}
if backup.GetStatus().String() == types.BackupStateAvailable {
done = true
break
}
time.Sleep(time.Second)
}
if !done {
log.Panicf("scenario %s: failed to complete backup in 30 seconds", scenario.name)
}

// Verify the backup has encryption settings
if backup.EncryptionSettings == nil {
log.Panicf("scenario %s: backup should have encryption settings", scenario.name)
}

if backup.EncryptionSettings.GetKmsKey() == nil {
log.Panicf("scenario %s: backup should have KMS key", scenario.name)
}

if backup.EncryptionSettings.GetKmsKey().GetKeyId() != testKmsKeyID {
log.Panicf("scenario %s: KMS key ID mismatch, expected %s, got %s",
scenario.name, testKmsKeyID, backup.EncryptionSettings.GetKmsKey().GetKeyId())
}

if backup.EncryptionSettings.GetAlgorithm() != pb.EncryptionSettings_AES_256_GCM {
log.Panicf("scenario %s: encryption algorithm is not AES_256_GCM", scenario.name)
}

restoreRequest := &pb.MakeRestoreRequest{
ContainerId: containerID,
BackupId: backup.Id,
DatabaseName: databaseName,
DatabaseEndpoint: databaseEndpoint,
DestinationPath: "/restored_backup_" + backup.Id,
SourcePaths: scenario.request.SourcePaths,
}

restoreOperation, err := backupClient.MakeRestore(ctx, restoreRequest)
if err != nil {
log.Panicf("scenario %s: failed to make restore: %v", scenario.name, err)
}

// Wait for restore operation to complete
done = false
for range 30 {
op, err := opClient.GetOperation(
ctx,
&pb.GetOperationRequest{Id: restoreOperation.Id},
)
if err != nil {
log.Panicf("scenario %s: failed to get restore operation: %v", scenario.name, err)
}
if op.GetStatus().String() == types.OperationStateDone.String() {
done = true
break
}
time.Sleep(time.Second)
}
if !done {
log.Panicf("scenario %s: failed to complete restore in 30 seconds", scenario.name)
}

log.Printf("scenario %s: passed", scenario.name)
}

func runNegativeEncryptedBackupScenario(ctx context.Context, backupClient pb.BackupServiceClient, opClient pb.OperationServiceClient, scenario negativeEncryptedBackupScenario) {
log.Printf("Running negative scenario: %s", scenario.name)

_, err := backupClient.MakeBackup(ctx, scenario.request)
if err != nil {
st, ok := status.FromError(err)
if !ok {
log.Panicf("scenario %s: MakeBackup failed but couldn't extract status: %v", scenario.name, err)
}
if st.Code() != scenario.expectedStatus {
log.Panicf("scenario %s: expected status code %v, got %v: %v", scenario.name, scenario.expectedStatus, st.Code(), err)
}
log.Printf("scenario %s: passed", scenario.name)
} else {
log.Panicf("scenario %s: MakeBackup should fail with status code %v, but it succeeded", scenario.name, scenario.expectedStatus)
}
}

func main() {
conn := common.CreateGRPCClient(ydbcpEndpoint)
defer func(conn *grpc.ClientConn) {
err := conn.Close()
if err != nil {
log.Panicln("failed to close connection")
}
}(conn)
backupClient := pb.NewBackupServiceClient(conn)
opClient := pb.NewOperationServiceClient(conn)

ctx := context.Background()

positiveScenarios := []encryptedBackupScenario{
{
name: "full encrypted backup",
request: newEncryptedBackupRequest("", nil, testKmsKeyID),
},
{
name: "partial encrypted backup",
request: newEncryptedBackupRequest("", []string{"kv_test"}, testKmsKeyID),
},
{
name: "full encrypted backup with root path",
request: newEncryptedBackupRequest("stocks", nil, testKmsKeyID),
},
{
name: "partial encrypted backup with root path",
request: newEncryptedBackupRequest("stocks", []string{"orders"}, testKmsKeyID),
},
}

for _, scenario := range positiveScenarios {
runEncryptedBackupScenario(ctx, backupClient, opClient, scenario)
time.Sleep(time.Second)
}

negativeScenarios := []negativeEncryptedBackupScenario{
{
name: "encrypted backup with empty kms key id",
request: newEncryptedBackupRequest("", nil, ""),
expectedStatus: codes.InvalidArgument,
},
}

for _, scenario := range negativeScenarios {
runNegativeEncryptedBackupScenario(ctx, backupClient, opClient, scenario)
}
}
29 changes: 27 additions & 2 deletions cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"flag"
"fmt"
"github.com/ydb-platform/ydb-go-sdk/v3/log"
"net/http"
_ "net/http/pprof"
"os"
Expand All @@ -19,6 +18,7 @@ import (
"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/connectors/s3"
"ydbcp/internal/handlers"
"ydbcp/internal/kms"
"ydbcp/internal/metrics"
"ydbcp/internal/processor"
"ydbcp/internal/server"
Expand All @@ -32,6 +32,9 @@ import (
"ydbcp/internal/watchers/schedule_watcher"
"ydbcp/internal/watchers/ttl_watcher"
ap "ydbcp/pkg/plugins/auth"
kp "ydbcp/pkg/plugins/kms"

"github.com/ydb-platform/ydb-go-sdk/v3/log"

"github.com/jonboulle/clockwork"

Expand Down Expand Up @@ -114,6 +117,24 @@ func main() {
}
}()
xlog.Info(ctx, "Initialized AuthProvider")

var kmsProvider kp.KmsProvider
if len(configInstance.KMS.PluginPath) == 0 {
kmsProvider, err = kms.NewDummyKmsProvider(ctx)
} else {
kmsProvider, err = kms.NewKmsProvider(ctx, configInstance.KMS)
}
if err != nil {
xlog.Error(ctx, "Error init KmsProvider", zap.Error(err))
os.Exit(1)
}
defer func() {
if err := kmsProvider.Close(ctx); err != nil {
xlog.Error(ctx, "Error close kms provider", zap.Error(err))
}
}()
xlog.Info(ctx, "Initialized KmsProvider")

metrics.InitializeMetricsRegistry(ctx, &wg, &configInstance.MetricsServer, clockwork.NewRealClock())
xlog.Info(ctx, "Initialized metrics registry")
audit.EventsDestination = configInstance.Audit.EventsDestination
Expand Down Expand Up @@ -143,7 +164,9 @@ func main() {
backup.NewBackupService(
dbConnector,
clientConnector,
s3Connector,
authProvider,
kmsProvider,
*configInstance,
).Register(server)
operation.NewOperationService(dbConnector, authProvider).Register(server)
Expand Down Expand Up @@ -189,9 +212,11 @@ func main() {
handlers.NewTBWROperationHandler(
dbConnector,
clientConnector,
s3Connector,
queries.NewWriteTableQuery,
clockwork.NewRealClock(),
*configInstance,
kmsProvider,
),
); err != nil {
xlog.Error(ctx, "failed to register TBWR handler", zap.Error(err))
Expand All @@ -210,7 +235,7 @@ func main() {
xlog.Info(ctx, "Created TtlWatcher")
}

backupScheduleHandler := handlers.NewBackupScheduleHandler(queries.NewWriteTableQuery, clockwork.NewRealClock())
backupScheduleHandler := handlers.NewBackupScheduleHandler(queries.NewWriteTableQuery, clockwork.NewRealClock(), configInstance.FeatureFlags)

schedule_watcher.NewScheduleWatcher(
ctx, &wg, configInstance.OperationProcessor.ProcessorIntervalSeconds, dbConnector,
Expand Down
Loading
Loading