Skip to content

Commit b076903

Browse files
committed
feat(backup_service): support backup encryption
1 parent fef87ae commit b076903

File tree

31 files changed

+927
-223
lines changed

31 files changed

+927
-223
lines changed

.env

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ S3_BUCKET = "test-bucket"
55
S3_REGION = "us-east-1"
66
YDB_NAME = "local-ydb"
77
ENABLE_NEW_PATHS_FORMAT = true
8+
ENABLE_BACKUPS_ENCRYPTION = true
89
# local-ydb image that was built from main
910
# Image: https://github.com/ydb-platform/ydb/pkgs/container/local-ydb/551703770
1011
# Built from revision 07aeccc41b43c9fc0b7da7680340fbac01b81427

.github/workflows/unit-test.yml

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,16 @@ jobs:
5454
runs-on: ubuntu-latest
5555
strategy:
5656
matrix:
57-
enable_new_paths_format: [ true, false ]
57+
include:
58+
- enable_new_paths_format: false
59+
enable_backups_encryption: false
60+
- enable_new_paths_format: true
61+
enable_backups_encryption: false
62+
- enable_new_paths_format: true
63+
enable_backups_encryption: true
5864
env:
5965
ENABLE_NEW_PATHS_FORMAT: ${{ matrix.enable_new_paths_format }}
66+
ENABLE_BACKUPS_ENCRYPTION: ${{ matrix.enable_backups_encryption }}
6067
steps:
6168
- uses: actions/checkout@v4
6269
- name: supply with s3 access keys
@@ -110,3 +117,20 @@ jobs:
110117
if: ${{ matrix.enable_new_paths_format }}
111118
run: |
112119
docker compose down
120+
- name: docker compose up
121+
if: ${{ matrix.enable_backups_encryption }}
122+
run: |
123+
docker compose up -d
124+
- name: run make_encrypted_backup tests
125+
if: ${{ matrix.enable_backups_encryption }}
126+
run: |
127+
while [ "$(docker inspect -f {{.State.Health.Status}} local-ydbcp)" != "healthy" ]; do
128+
echo "Waiting for container to become healthy..."
129+
sleep 1
130+
done
131+
echo "Starting make_encrypted_backup tests!"
132+
docker exec local-ydbcp sh -c './make_encrypted_backup'
133+
- name: docker compose down
134+
if: ${{ matrix.enable_backups_encryption }}
135+
run: |
136+
docker compose down

Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ RUN go build -o ./make_backup ./cmd/integration/make_backup/main.go
2121
RUN go build -o ./list_entities ./cmd/integration/list_entities/main.go
2222
RUN go build -o ./orm ./cmd/integration/orm/main.go
2323
RUN go build -o ./test_new_paths_format ./cmd/integration/new_paths_format/main.go
24+
RUN go build -o ./make_encrypted_backup ./cmd/integration/make_encrypted_backup/main.go
2425

2526
# Command to run the executable
2627
CMD ["./main", "--config=local_config.yaml"]
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"log"
6+
"time"
7+
"ydbcp/cmd/integration/common"
8+
"ydbcp/internal/types"
9+
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"
10+
11+
"google.golang.org/grpc"
12+
"google.golang.org/grpc/codes"
13+
"google.golang.org/grpc/status"
14+
)
15+
16+
const (
17+
containerID = "abcde"
18+
databaseName = "/local"
19+
ydbcpEndpoint = "0.0.0.0:50051"
20+
databaseEndpoint = "grpcs://local-ydb:2135"
21+
testKmsKeyID = "test-kms-key-id-123"
22+
)
23+
24+
type encryptedBackupScenario struct {
25+
name string
26+
request *pb.MakeBackupRequest
27+
}
28+
29+
type negativeEncryptedBackupScenario struct {
30+
name string
31+
request *pb.MakeBackupRequest
32+
expectedStatus codes.Code
33+
}
34+
35+
func newEncryptedBackupRequest(rootPath string, sourcePaths []string, kmsKeyID string) *pb.MakeBackupRequest {
36+
encryptionSettings := &pb.EncryptionSettings{
37+
Algorithm: pb.EncryptionSettings_AES_256_GCM,
38+
}
39+
40+
if len(kmsKeyID) > 0 {
41+
encryptionSettings.KeyEncryptionKey = &pb.EncryptionSettings_KmsKey_{
42+
KmsKey: &pb.EncryptionSettings_KmsKey{
43+
KeyId: kmsKeyID,
44+
},
45+
}
46+
}
47+
48+
return &pb.MakeBackupRequest{
49+
ContainerId: containerID,
50+
DatabaseName: databaseName,
51+
DatabaseEndpoint: databaseEndpoint,
52+
RootPath: rootPath,
53+
SourcePaths: sourcePaths,
54+
EncryptionSettings: encryptionSettings,
55+
}
56+
}
57+
58+
func runEncryptedBackupScenario(ctx context.Context, backupClient pb.BackupServiceClient, opClient pb.OperationServiceClient, scenario encryptedBackupScenario) {
59+
log.Printf("Running scenario: %s", scenario.name)
60+
61+
tbwr, err := backupClient.MakeBackup(ctx, scenario.request)
62+
if err != nil {
63+
log.Panicf("scenario %s: failed to make backup: %v", scenario.name, err)
64+
}
65+
66+
op, err := opClient.GetOperation(
67+
ctx, &pb.GetOperationRequest{
68+
Id: tbwr.Id,
69+
},
70+
)
71+
if err != nil {
72+
log.Panicf("scenario %s: failed to get operation: %v", scenario.name, err)
73+
}
74+
75+
if op.EncryptionSettings == nil {
76+
log.Panicf("scenario %s: encryption settings are nil", scenario.name)
77+
}
78+
79+
if op.EncryptionSettings.GetKmsKey() == nil {
80+
log.Panicf("scenario %s: KMS key is nil", scenario.name)
81+
}
82+
83+
if op.EncryptionSettings.GetKmsKey().GetKeyId() != testKmsKeyID {
84+
log.Panicf("scenario %s: KMS key ID mismatch, expected %s, got %s",
85+
scenario.name, testKmsKeyID, op.EncryptionSettings.GetKmsKey().GetKeyId())
86+
}
87+
88+
if op.EncryptionSettings.GetAlgorithm() != pb.EncryptionSettings_AES_256_GCM {
89+
log.Panicf("scenario %s: encryption algorithm is not AES_256_GCM", scenario.name)
90+
}
91+
92+
// Wait for operation handler
93+
time.Sleep(time.Second * 3)
94+
95+
ops, err := opClient.ListOperations(
96+
ctx, &pb.ListOperationsRequest{
97+
ContainerId: containerID,
98+
DatabaseNameMask: databaseName,
99+
OperationTypes: []string{types.OperationTypeTB.String()},
100+
},
101+
)
102+
if err != nil {
103+
log.Panicf("failed to list operations: %v", err)
104+
}
105+
106+
var tbOperation *pb.Operation
107+
for _, op := range ops.Operations {
108+
if op.GetParentOperationId() == tbwr.Id {
109+
tbOperation = op
110+
break
111+
}
112+
}
113+
114+
if tbOperation == nil {
115+
log.Panicf("scenario %s: TB operation not found", scenario.name)
116+
}
117+
118+
// Wait for backup to complete
119+
done := false
120+
var backup *pb.Backup
121+
for range 30 {
122+
backup, err = backupClient.GetBackup(
123+
ctx,
124+
&pb.GetBackupRequest{Id: tbOperation.BackupId},
125+
)
126+
if err != nil {
127+
log.Panicf("scenario %s: failed to get backup: %v", scenario.name, err)
128+
}
129+
if backup.GetStatus().String() == types.BackupStateAvailable {
130+
done = true
131+
break
132+
}
133+
time.Sleep(time.Second)
134+
}
135+
if !done {
136+
log.Panicf("scenario %s: failed to complete backup in 30 seconds", scenario.name)
137+
}
138+
139+
// Verify the backup has encryption settings
140+
if backup.EncryptionSettings == nil {
141+
log.Panicf("scenario %s: backup should have encryption settings", scenario.name)
142+
}
143+
144+
if backup.EncryptionSettings.GetKmsKey() == nil {
145+
log.Panicf("scenario %s: backup should have KMS key", scenario.name)
146+
}
147+
148+
if backup.EncryptionSettings.GetKmsKey().GetKeyId() != testKmsKeyID {
149+
log.Panicf("scenario %s: KMS key ID mismatch, expected %s, got %s",
150+
scenario.name, testKmsKeyID, backup.EncryptionSettings.GetKmsKey().GetKeyId())
151+
}
152+
153+
if backup.EncryptionSettings.GetAlgorithm() != pb.EncryptionSettings_AES_256_GCM {
154+
log.Panicf("scenario %s: encryption algorithm is not AES_256_GCM", scenario.name)
155+
}
156+
157+
log.Printf("scenario %s: passed", scenario.name)
158+
}
159+
160+
func runNegativeEncryptedBackupScenario(ctx context.Context, backupClient pb.BackupServiceClient, opClient pb.OperationServiceClient, scenario negativeEncryptedBackupScenario) {
161+
log.Printf("Running negative scenario: %s", scenario.name)
162+
163+
_, err := backupClient.MakeBackup(ctx, scenario.request)
164+
if err != nil {
165+
st, ok := status.FromError(err)
166+
if !ok {
167+
log.Panicf("scenario %s: MakeBackup failed but couldn't extract status: %v", scenario.name, err)
168+
}
169+
if st.Code() != scenario.expectedStatus {
170+
log.Panicf("scenario %s: expected status code %v, got %v: %v", scenario.name, scenario.expectedStatus, st.Code(), err)
171+
}
172+
log.Printf("scenario %s: passed", scenario.name)
173+
} else {
174+
log.Panicf("scenario %s: MakeBackup should fail with status code %v, but it succeeded", scenario.name, scenario.expectedStatus)
175+
}
176+
}
177+
178+
func main() {
179+
conn := common.CreateGRPCClient(ydbcpEndpoint)
180+
defer func(conn *grpc.ClientConn) {
181+
err := conn.Close()
182+
if err != nil {
183+
log.Panicln("failed to close connection")
184+
}
185+
}(conn)
186+
backupClient := pb.NewBackupServiceClient(conn)
187+
opClient := pb.NewOperationServiceClient(conn)
188+
189+
ctx := context.Background()
190+
191+
// Positive scenarios
192+
positiveScenarios := []encryptedBackupScenario{
193+
{
194+
name: "full encrypted backup",
195+
request: newEncryptedBackupRequest("", nil, testKmsKeyID),
196+
},
197+
{
198+
name: "partial encrypted backup",
199+
request: newEncryptedBackupRequest("", []string{"kv_test"}, testKmsKeyID),
200+
},
201+
{
202+
name: "full encrypted backup with root path",
203+
request: newEncryptedBackupRequest("stocks", nil, testKmsKeyID),
204+
},
205+
{
206+
name: "partial encrypted backup with root path",
207+
request: newEncryptedBackupRequest("stocks", []string{"orders"}, testKmsKeyID),
208+
},
209+
}
210+
211+
for _, scenario := range positiveScenarios {
212+
runEncryptedBackupScenario(ctx, backupClient, opClient, scenario)
213+
time.Sleep(time.Second)
214+
}
215+
216+
// TODO: add tests for restore encrypted backup
217+
218+
// Negative scenarios
219+
negativeScenarios := []negativeEncryptedBackupScenario{
220+
{
221+
name: "encrypted backup with empty kms key id",
222+
request: newEncryptedBackupRequest("", nil, ""),
223+
expectedStatus: codes.InvalidArgument,
224+
},
225+
}
226+
227+
for _, scenario := range negativeScenarios {
228+
runNegativeEncryptedBackupScenario(ctx, backupClient, opClient, scenario)
229+
}
230+
}

cmd/ydbcp/main.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"flag"
66
"fmt"
7-
"github.com/ydb-platform/ydb-go-sdk/v3/log"
87
"net/http"
98
_ "net/http/pprof"
109
"os"
@@ -19,6 +18,7 @@ import (
1918
"ydbcp/internal/connectors/db/yql/queries"
2019
"ydbcp/internal/connectors/s3"
2120
"ydbcp/internal/handlers"
21+
"ydbcp/internal/kms"
2222
"ydbcp/internal/metrics"
2323
"ydbcp/internal/processor"
2424
"ydbcp/internal/server"
@@ -32,6 +32,9 @@ import (
3232
"ydbcp/internal/watchers/schedule_watcher"
3333
"ydbcp/internal/watchers/ttl_watcher"
3434
ap "ydbcp/pkg/plugins/auth"
35+
kp "ydbcp/pkg/plugins/kms"
36+
37+
"github.com/ydb-platform/ydb-go-sdk/v3/log"
3538

3639
"github.com/jonboulle/clockwork"
3740

@@ -114,6 +117,24 @@ func main() {
114117
}
115118
}()
116119
xlog.Info(ctx, "Initialized AuthProvider")
120+
121+
var kmsProvider kp.KmsProvider
122+
if len(configInstance.KMS.PluginPath) == 0 {
123+
kmsProvider, err = kms.NewDummyKmsProvider(ctx)
124+
} else {
125+
kmsProvider, err = kms.NewKmsProvider(ctx, configInstance.KMS)
126+
}
127+
if err != nil {
128+
xlog.Error(ctx, "Error init KmsProvider", zap.Error(err))
129+
os.Exit(1)
130+
}
131+
defer func() {
132+
if err := kmsProvider.Close(ctx); err != nil {
133+
xlog.Error(ctx, "Error close kms provider", zap.Error(err))
134+
}
135+
}()
136+
xlog.Info(ctx, "Initialized KmsProvider")
137+
117138
metrics.InitializeMetricsRegistry(ctx, &wg, &configInstance.MetricsServer, clockwork.NewRealClock())
118139
xlog.Info(ctx, "Initialized metrics registry")
119140
audit.EventsDestination = configInstance.Audit.EventsDestination
@@ -143,7 +164,9 @@ func main() {
143164
backup.NewBackupService(
144165
dbConnector,
145166
clientConnector,
167+
s3Connector,
146168
authProvider,
169+
kmsProvider,
147170
*configInstance,
148171
).Register(server)
149172
operation.NewOperationService(dbConnector, authProvider).Register(server)
@@ -189,9 +212,11 @@ func main() {
189212
handlers.NewTBWROperationHandler(
190213
dbConnector,
191214
clientConnector,
215+
s3Connector,
192216
queries.NewWriteTableQuery,
193217
clockwork.NewRealClock(),
194218
*configInstance,
219+
kmsProvider,
195220
),
196221
); err != nil {
197222
xlog.Error(ctx, "failed to register TBWR handler", zap.Error(err))
@@ -210,7 +235,7 @@ func main() {
210235
xlog.Info(ctx, "Created TtlWatcher")
211236
}
212237

213-
backupScheduleHandler := handlers.NewBackupScheduleHandler(queries.NewWriteTableQuery, clockwork.NewRealClock())
238+
backupScheduleHandler := handlers.NewBackupScheduleHandler(queries.NewWriteTableQuery, clockwork.NewRealClock(), configInstance.FeatureFlags)
214239

215240
schedule_watcher.NewScheduleWatcher(
216241
ctx, &wg, configInstance.OperationProcessor.ProcessorIntervalSeconds, dbConnector,

docker-compose.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ services:
109109
S3_SECRET_KEY: ${S3_SECRET_KEY}
110110
YDB_NAME: ${YDB_NAME}
111111
ENABLE_NEW_PATHS_FORMAT: ${ENABLE_NEW_PATHS_FORMAT}
112+
ENABLE_BACKUPS_ENCRYPTION: ${ENABLE_BACKUPS_ENCRYPTION}
112113
depends_on:
113114
setup_ydb:
114115
condition: service_completed_successfully

0 commit comments

Comments
 (0)