Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ jobs:
- '24.8'
- '25.3'
- '25.8'
- '25.11'
steps:
- name: Checkout project
uses: actions/checkout@v4
Expand Down Expand Up @@ -219,7 +220,7 @@ jobs:
- '24.8'
- '25.3'
- '25.8'
- '25.10'
- '25.11'
steps:
- name: Checkout project
uses: actions/checkout@v4
Expand Down
47 changes: 28 additions & 19 deletions pkg/backup/backuper.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,7 @@ func (b *Backuper) getEmbeddedBackupLocation(ctx context.Context, backupName str
return fmt.Sprintf("Disk('%s','%s')", b.cfg.ClickHouse.EmbeddedBackupDisk, backupName), nil
}
if b.cfg.General.RemoteStorage == "s3" {
s3Endpoint, err := b.ch.ApplyMacros(ctx, b.buildEmbeddedLocationS3())
if err != nil {
return "", err
}
s3Endpoint := b.buildEmbeddedLocationS3(ctx)
if b.cfg.S3.AccessKey != "" {
return fmt.Sprintf("S3('%s/%s/','%s','%s')", s3Endpoint, backupName, b.cfg.S3.AccessKey, b.cfg.S3.SecretKey), nil
}
Expand All @@ -267,10 +264,7 @@ func (b *Backuper) getEmbeddedBackupLocation(ctx context.Context, backupName str
return "", errors.WithStack(errors.New("provide s3->access_key and s3->secret_key in config to allow embedded backup without `clickhouse->embedded_backup_disk`"))
}
if b.cfg.General.RemoteStorage == "gcs" {
gcsEndpoint, err := b.ch.ApplyMacros(ctx, b.buildEmbeddedLocationGCS())
if err != nil {
return "", err
}
gcsEndpoint := b.buildEmbeddedLocationGCS(ctx)
if b.cfg.GCS.EmbeddedAccessKey != "" {
return fmt.Sprintf("S3('%s/%s/','%s','%s')", gcsEndpoint, backupName, b.cfg.GCS.EmbeddedAccessKey, b.cfg.GCS.EmbeddedSecretKey), nil
}
Expand All @@ -280,45 +274,50 @@ func (b *Backuper) getEmbeddedBackupLocation(ctx context.Context, backupName str
return "", fmt.Errorf("provide gcs->embedded_access_key and gcs->embedded_secret_key in config to allow embedded backup without `clickhouse->embedded_backup_disk`")
}
if b.cfg.General.RemoteStorage == "azblob" {
azblobEndpoint, err := b.ch.ApplyMacros(ctx, b.buildEmbeddedLocationAZBLOB())
azblobEndpoint := b.buildEmbeddedLocationAZBLOB()
azblobPath, err := b.ch.ApplyMacros(ctx, b.cfg.AzureBlob.ObjectDiskPath)
if err != nil {
return "", err
}
if b.cfg.AzureBlob.Container != "" {
return fmt.Sprintf("AzureBlobStorage('%s','%s','%s/%s/')", azblobEndpoint, b.cfg.AzureBlob.Container, b.cfg.AzureBlob.ObjectDiskPath, backupName), nil
return fmt.Sprintf("AzureBlobStorage('%s','%s','%s/%s/')", azblobEndpoint, b.cfg.AzureBlob.Container, azblobPath, backupName), nil
}
return "", fmt.Errorf("provide azblob->container and azblob->account_name, azblob->account_key in config to allow embedded backup without `clickhouse->embedded_backup_disk`")
}
return "", fmt.Errorf("empty clickhouse->embedded_backup_disk and invalid general->remote_storage: %s", b.cfg.General.RemoteStorage)
}


func (b *Backuper) buildEmbeddedLocationS3() string {
func (b *Backuper) buildEmbeddedLocationS3(ctx context.Context) string {
s3backupURL := url.URL{}
s3backupURL.Scheme = "https"
s3Path, err := b.ch.ApplyMacros(ctx, b.cfg.S3.ObjectDiskPath)
if err != nil {
log.Error().Stack().Err(err).Send()
return ""
}
if strings.HasPrefix(b.cfg.S3.Endpoint, "http") {
newUrl, _ := s3backupURL.Parse(b.cfg.S3.Endpoint)
s3backupURL = *newUrl
s3backupURL.Path = path.Join(b.cfg.S3.Bucket, b.cfg.S3.ObjectDiskPath)
s3backupURL.Path = path.Join(b.cfg.S3.Bucket, s3Path)
} else {
s3backupURL.Host = b.cfg.S3.Endpoint
s3backupURL.Path = path.Join(b.cfg.S3.Bucket, b.cfg.S3.ObjectDiskPath)
s3backupURL.Path = path.Join(b.cfg.S3.Bucket, s3Path)
}
if b.cfg.S3.DisableSSL {
s3backupURL.Scheme = "http"
}
if s3backupURL.Host == "" && b.cfg.S3.Region != "" && b.cfg.S3.ForcePathStyle {
s3backupURL.Host = "s3." + b.cfg.S3.Region + ".amazonaws.com"
s3backupURL.Path = path.Join(b.cfg.S3.Bucket, b.cfg.S3.ObjectDiskPath)
s3backupURL.Path = path.Join(b.cfg.S3.Bucket, s3Path)
}
if s3backupURL.Host == "" && b.cfg.S3.Bucket != "" && !b.cfg.S3.ForcePathStyle {
s3backupURL.Host = b.cfg.S3.Bucket + "." + "s3." + b.cfg.S3.Region + ".amazonaws.com"
s3backupURL.Path = b.cfg.S3.ObjectDiskPath
s3backupURL.Path = s3Path
}
return s3backupURL.String()
}

func (b *Backuper) buildEmbeddedLocationGCS() string {
func (b *Backuper) buildEmbeddedLocationGCS(ctx context.Context) string {
gcsBackupURL := url.URL{}
gcsBackupURL.Scheme = "https"
if b.cfg.GCS.ForceHttp {
Expand All @@ -328,14 +327,24 @@ func (b *Backuper) buildEmbeddedLocationGCS() string {
if !strings.HasPrefix(b.cfg.GCS.Endpoint, "http") {
gcsBackupURL.Host = b.cfg.GCS.Endpoint
} else {
newUrl, _ := gcsBackupURL.Parse(b.cfg.GCS.Endpoint)
newUrl, err := gcsBackupURL.Parse(b.cfg.GCS.Endpoint)
if err != nil {
log.Error().Err(err).Stack().Send()
return ""
}
gcsBackupURL = *newUrl
}
}
if gcsBackupURL.Host == "" {
gcsBackupURL.Host = "storage.googleapis.com"
}
gcsBackupURL.Path = path.Join(b.cfg.GCS.Bucket, b.cfg.GCS.ObjectDiskPath)
gcsPath, err := b.ch.ApplyMacros(ctx, b.cfg.GCS.ObjectDiskPath)
if err != nil {
log.Error().Err(err).Stack().Send()
return ""
}

gcsBackupURL.Path = path.Join(b.cfg.GCS.Bucket, gcsPath)
return gcsBackupURL.String()
}

Expand Down
120 changes: 107 additions & 13 deletions pkg/backup/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1898,9 +1898,32 @@ func (b *Backuper) restoreDataRegular(ctx context.Context, backupName string, ba
}
// https://github.com/Altinity/clickhouse-backup/issues/937
if len(b.cfg.General.RestoreTableMapping) > 0 {
if targetTable, isMapped := b.cfg.General.RestoreTableMapping[table.Table]; isMapped {
dstTableName = targetTable
tablesForRestore[i].Table = targetTable
// Check full qualified name first (db.table), then table name only
fullName := table.Database + "." + table.Table
if targetValue, isMapped := b.cfg.General.RestoreTableMapping[fullName]; isMapped {
// Target may contain database (e.g., target_db.new_table)
if strings.Contains(targetValue, ".") {
parts := strings.SplitN(targetValue, ".", 2)
dstDatabase = parts[0]
dstTableName = parts[1]
tablesForRestore[i].Database = parts[0]
tablesForRestore[i].Table = parts[1]
} else {
dstTableName = targetValue
tablesForRestore[i].Table = targetValue
}
} else if targetTable, isMapped := b.cfg.General.RestoreTableMapping[table.Table]; isMapped {
// Handle target with database prefix
if strings.Contains(targetTable, ".") {
parts := strings.SplitN(targetTable, ".", 2)
dstDatabase = parts[0]
dstTableName = parts[1]
tablesForRestore[i].Database = parts[0]
tablesForRestore[i].Table = parts[1]
} else {
dstTableName = targetTable
tablesForRestore[i].Table = targetTable
}
}
}
logger := log.With().Str("table", fmt.Sprintf("%s.%s", dstDatabase, dstTableName)).Logger()
Expand Down Expand Up @@ -2255,8 +2278,24 @@ func (b *Backuper) checkMissingTables(tablesForRestore ListOfTables, chTables []
}
}
if len(b.cfg.General.RestoreTableMapping) > 0 {
if targetTable, isMapped := b.cfg.General.RestoreTableMapping[table.Table]; isMapped {
dstTable = targetTable
// Check full qualified name first (db.table), then table name only
fullName := table.Database + "." + table.Table
if targetValue, isMapped := b.cfg.General.RestoreTableMapping[fullName]; isMapped {
if strings.Contains(targetValue, ".") {
parts := strings.SplitN(targetValue, ".", 2)
dstDatabase = parts[0]
dstTable = parts[1]
} else {
dstTable = targetValue
}
} else if targetTable, isMapped := b.cfg.General.RestoreTableMapping[table.Table]; isMapped {
if strings.Contains(targetTable, ".") {
parts := strings.SplitN(targetTable, ".", 2)
dstDatabase = parts[0]
dstTable = parts[1]
} else {
dstTable = targetTable
}
}
}
found := false
Expand All @@ -2267,7 +2306,7 @@ func (b *Backuper) checkMissingTables(tablesForRestore ListOfTables, chTables []
}
}
if !found {
missingTables = append(missingTables, fmt.Sprintf("'%s.%s'", dstDatabase, table.Table))
missingTables = append(missingTables, fmt.Sprintf("'%s.%s'", dstDatabase, dstTable))
}
}
return missingTables
Expand All @@ -2290,25 +2329,80 @@ func (b *Backuper) changeTablePatternFromRestoreMapping(tablePattern, objType st
case "database":
mapping = b.cfg.General.RestoreDatabaseMapping
case "table":
mapping = b.cfg.General.RestoreDatabaseMapping
mapping = b.cfg.General.RestoreTableMapping
default:
return ""
return tablePattern
}
isDatabase := objType == "database"
for sourceObj, targetObj := range mapping {
if tablePattern != "" {
sourceObjRE := regexp.MustCompile(fmt.Sprintf("(^%s.*)|(,%s.*)", sourceObj, sourceObj))
var sourceObjRE *regexp.Regexp
if isDatabase {
sourceObjRE = regexp.MustCompile(fmt.Sprintf("(^%s\\.[^,]*)|(,%s\\.[^,]*)", sourceObj, sourceObj))
} else {
// Check if sourceObj is a full qualified name (db.table)
if strings.Contains(sourceObj, ".") {
// Full qualified mapping: source_db.table -> target_db.new_table
escapedSource := regexp.QuoteMeta(sourceObj)
sourceObjRE = regexp.MustCompile(fmt.Sprintf("(^%s)|(,%s)", escapedSource, escapedSource))
} else {
sourceObjRE = regexp.MustCompile(fmt.Sprintf("(^([^\\.]+)\\.%s)|(,([^\\.]+)\\.%s)", sourceObj, sourceObj))
}
}

if sourceObjRE.MatchString(tablePattern) {
matches := sourceObjRE.FindAllStringSubmatch(tablePattern, -1)
substitution := targetObj + ".*"
if strings.HasPrefix(matches[0][1], ",") {
var substitution string
if isDatabase {
substitution = targetObj + ".*"
} else {
// Check if sourceObj is full qualified
if strings.Contains(sourceObj, ".") {
// Use targetObj as-is (may contain database)
substitution = targetObj
} else {
// matches[0][2] has database name when first alternative matches (^...)
// matches[0][4] has database name when second alternative matches (,...)
dbName := matches[0][2]
if dbName == "" && len(matches[0]) > 4 {
dbName = matches[0][4]
}
// Check if targetObj contains database
if strings.Contains(targetObj, ".") {
substitution = targetObj
} else {
substitution = dbName + "." + targetObj
}
}
}
if strings.HasPrefix(matches[0][0], ",") {
substitution = "," + substitution
}

tablePattern = sourceObjRE.ReplaceAllString(tablePattern, substitution)
} else {
tablePattern += "," + targetObj + ".*"
if isDatabase {
tablePattern += "," + targetObj + ".*"
} else {
// Check if targetObj contains database
if strings.Contains(targetObj, ".") {
tablePattern += "," + targetObj
} else {
tablePattern += ",*." + targetObj
}
}
}
} else {
tablePattern += targetObj + ".*"
if isDatabase {
tablePattern += targetObj + ".*"
} else {
// Check if targetObj contains database
if strings.Contains(targetObj, ".") {
tablePattern += targetObj
} else {
tablePattern += "*." + targetObj
}
}
}
}
return tablePattern
Expand Down
Loading