Skip to content

Commit e64142a

Browse files
authored
fix: support ON CLUSTER clause in CREATE TABLE for clickhouse (#237)
* fix: support ON CLUSTER clause in CREATE TABLE for clickhouse * fix: SQL spacing and column comparison issues in migrator tests * fix: translate comments to English in migrator.go * Refine cluster option handling and update tests
1 parent 73932e7 commit e64142a

File tree

2 files changed

+132
-31
lines changed

2 files changed

+132
-31
lines changed

migrator.go

Lines changed: 64 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"database/sql"
66
"errors"
77
"fmt"
8+
"regexp"
89
"strconv"
910
"strings"
1011

@@ -26,6 +27,49 @@ type Migrator struct {
2627
Dialector
2728
}
2829

30+
// isolateClusterOption splits ON CLUSTER clause from raw table options
31+
func isolateClusterOption(tableOpts string) (clusterOpts, cleanedOpts string) {
32+
cleanedOpts = tableOpts
33+
if tableOpts == "" {
34+
return
35+
}
36+
re := regexp.MustCompile(`ON CLUSTER (?:'([^']+)'|([^\s]+))`)
37+
clusterMatch := re.FindString(tableOpts)
38+
if clusterMatch == "" {
39+
return
40+
}
41+
clusterOpts = formatClusterClause(clusterMatch)
42+
cleanedOpts = strings.TrimSpace(strings.Replace(tableOpts, clusterMatch, "", 1))
43+
return
44+
}
45+
46+
// formatClusterClause ensures ON CLUSTER clause begins with a single space and has no trailing space
47+
func formatClusterClause(cluster string) string {
48+
clause := strings.TrimSpace(cluster)
49+
if clause == "" {
50+
return ""
51+
}
52+
if !strings.HasPrefix(clause, " ") {
53+
clause = " " + clause
54+
}
55+
return clause
56+
}
57+
58+
// extractClusterOption extracts ON CLUSTER clause from table options
59+
func (m Migrator) extractClusterOption() string {
60+
// Extract ON CLUSTER from gorm:table_options
61+
if tableOption, ok := m.DB.Get("gorm:table_options"); ok {
62+
if clusterOpts, _ := isolateClusterOption(fmt.Sprint(tableOption)); clusterOpts != "" {
63+
return clusterOpts
64+
}
65+
}
66+
// Also support legacy gorm:table_cluster_options (for backward compatibility)
67+
if clusterOption, ok := m.DB.Get("gorm:table_cluster_options"); ok {
68+
return formatClusterClause(fmt.Sprint(clusterOption))
69+
}
70+
return ""
71+
}
72+
2973
// Database
3074

3175
func (m Migrator) CurrentDatabase() (name string) {
@@ -85,7 +129,7 @@ func (m Migrator) CreateTable(models ...interface{}) error {
85129
tx := m.DB.Session(new(gorm.Session))
86130
if err := m.RunWithValue(model, func(stmt *gorm.Statement) (err error) {
87131
var (
88-
createTableSQL = "CREATE TABLE ?%s(%s %s %s) %s"
132+
createTableSQL = "CREATE TABLE ?%s (%s %s %s) %s"
89133
args = []interface{}{clause.Table{Name: stmt.Table}}
90134
)
91135

@@ -156,13 +200,20 @@ func (m Migrator) CreateTable(models ...interface{}) error {
156200

157201
// Step 4. Finally assemble CREATE TABLE ... SQL string
158202
engineOpts := m.Dialector.DefaultTableEngineOpts
159-
if tableOption, ok := m.DB.Get("gorm:table_options"); ok {
160-
engineOpts = fmt.Sprint(tableOption)
203+
tableOption, hasTableOption := m.DB.Get("gorm:table_options")
204+
clusterOpts := ""
205+
if hasTableOption {
206+
tableOpts := fmt.Sprint(tableOption)
207+
var cleanedOpts string
208+
clusterOpts, cleanedOpts = isolateClusterOption(tableOpts)
209+
engineOpts = cleanedOpts
161210
}
162211

163-
clusterOpts := ""
212+
// Also support legacy gorm:table_cluster_options (for backward compatibility)
164213
if clusterOption, ok := m.DB.Get("gorm:table_cluster_options"); ok {
165-
clusterOpts = " " + fmt.Sprint(clusterOption) + " "
214+
if clusterOpts == "" {
215+
clusterOpts = " " + fmt.Sprint(clusterOption) + " "
216+
}
166217
}
167218

168219
createTableSQL = fmt.Sprintf(createTableSQL, clusterOpts, columnStr, constrStr, indexStr, engineOpts)
@@ -218,11 +269,8 @@ func (m Migrator) GetTables() (tableList []string, err error) {
218269
func (m Migrator) AddColumn(value interface{}, field string) error {
219270
return m.RunWithValue(value, func(stmt *gorm.Statement) error {
220271
if field := stmt.Schema.LookUpField(field); field != nil {
221-
clusterOpts := ""
222-
if clusterOption, ok := m.DB.Get("gorm:table_cluster_options"); ok {
223-
clusterOpts = " " + fmt.Sprint(clusterOption) + " "
224-
}
225-
sQL := fmt.Sprintf("ALTER TABLE ? %s ADD COLUMN ? ?", clusterOpts)
272+
clusterOpts := m.extractClusterOption()
273+
sQL := fmt.Sprintf("ALTER TABLE ?%s ADD COLUMN ? ?", clusterOpts)
226274
return m.DB.Exec(
227275
sQL,
228276
clause.Table{Name: stmt.Table}, clause.Column{Name: field.DBName},
@@ -238,11 +286,8 @@ func (m Migrator) DropColumn(value interface{}, name string) error {
238286
if field := stmt.Schema.LookUpField(name); field != nil {
239287
name = field.DBName
240288
}
241-
clusterOpts := ""
242-
if clusterOption, ok := m.DB.Get("gorm:table_cluster_options"); ok {
243-
clusterOpts = " " + fmt.Sprint(clusterOption) + " "
244-
}
245-
sQL := fmt.Sprintf("ALTER TABLE ? %s DROP COLUMN ?", clusterOpts)
289+
clusterOpts := m.extractClusterOption()
290+
sQL := fmt.Sprintf("ALTER TABLE ?%s DROP COLUMN ?", clusterOpts)
246291
return m.DB.Exec(
247292
sQL,
248293
clause.Table{Name: stmt.Table}, clause.Column{Name: name},
@@ -253,11 +298,8 @@ func (m Migrator) DropColumn(value interface{}, name string) error {
253298
func (m Migrator) AlterColumn(value interface{}, field string) error {
254299
return m.RunWithValue(value, func(stmt *gorm.Statement) error {
255300
if field := stmt.Schema.LookUpField(field); field != nil {
256-
clusterOpts := ""
257-
if clusterOption, ok := m.DB.Get("gorm:table_cluster_options"); ok {
258-
clusterOpts = " " + fmt.Sprint(clusterOption) + " "
259-
}
260-
sQL := fmt.Sprintf("ALTER TABLE ? %s MODIFY COLUMN ? ?", clusterOpts)
301+
clusterOpts := m.extractClusterOption()
302+
sQL := fmt.Sprintf("ALTER TABLE ?%s MODIFY COLUMN ? ?", clusterOpts)
261303
return m.DB.Exec(
262304
sQL,
263305
clause.Table{Name: stmt.Table},
@@ -284,11 +326,8 @@ func (m Migrator) RenameColumn(value interface{}, oldName, newName string) error
284326
field = f
285327
}
286328
if field != nil {
287-
clusterOpts := ""
288-
if clusterOption, ok := m.DB.Get("gorm:table_cluster_options"); ok {
289-
clusterOpts = " " + fmt.Sprint(clusterOption) + " "
290-
}
291-
sQL := fmt.Sprintf("ALTER TABLE ? %s RENAME COLUMN ? TO ?", clusterOpts)
329+
clusterOpts := m.extractClusterOption()
330+
sQL := fmt.Sprintf("ALTER TABLE ?%s RENAME COLUMN ? TO ?", clusterOpts)
292331
return m.DB.Exec(
293332
sQL,
294333
clause.Table{Name: stmt.Table},

migrator_test.go

Lines changed: 68 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package clickhouse_test
22

33
import (
4+
"regexp"
45
"testing"
56
"time"
67

@@ -130,19 +131,80 @@ func TestMigrator_DontSupportEmptyDefaultValue(t *testing.T) {
130131
t.Fatalf("no error should happen when auto migrate, but got %v", err)
131132
}
132133

133-
// Replace every gorm raw SQL command with a function that appends the SQL string to a slice
134+
if err := DB.Table("mytable").AutoMigrate(&MyTable{}); err != nil {
135+
t.Fatalf("no error should happen when auto migrate, but got %v", err)
136+
}
137+
138+
columnTypes, err := DB.Migrator().ColumnTypes("mytable")
139+
if err != nil {
140+
t.Fatalf("failed to inspect columns, got %v", err)
141+
}
142+
foundString := false
143+
for _, col := range columnTypes {
144+
if col.Name() == "my_field" {
145+
foundString = true
146+
if col.DatabaseTypeName() != "String" {
147+
t.Fatalf("my_field column should remain String, got %s", col.DatabaseTypeName())
148+
}
149+
}
150+
}
151+
if !foundString {
152+
t.Fatalf("my_field column not found after auto migrate")
153+
}
154+
}
155+
156+
func TestMigrator_OnClusterSupport(t *testing.T) {
157+
type ClusterTable struct {
158+
ID uint64
159+
Name string
160+
CreatedAt time.Time
161+
}
162+
163+
// Test ON CLUSTER extraction from gorm:table_options
134164
sqlStrings := make([]string, 0)
135-
if err := DB.Callback().Raw().Replace("gorm:raw", func(db *gorm.DB) {
165+
166+
// Create a new DB instance for this test
167+
options, err := clickhousego.ParseDSN(dbDSN)
168+
if err != nil {
169+
t.Fatalf("Can not parse dsn, got error %v", err)
170+
}
171+
172+
testDB, err := gorm.Open(clickhouse.New(clickhouse.Config{
173+
Conn: clickhousego.OpenDB(options),
174+
}))
175+
if err != nil {
176+
t.Fatalf("failed to connect database, got error %v", err)
177+
}
178+
179+
// Replace raw callback to capture SQL
180+
if err := testDB.Callback().Raw().Replace("gorm:raw", func(db *gorm.DB) {
136181
sqlToExecute := db.Statement.SQL.String()
137182
sqlStrings = append(sqlStrings, sqlToExecute)
183+
// Don't actually execute for this test
138184
}); err != nil {
139185
t.Fatalf("no error should happen when registering a callback, but got %v", err)
140186
}
141187

142-
if err := DB.Table("mytable").AutoMigrate(&MyTable{}); err != nil {
143-
t.Fatalf("no error should happen when auto migrate, but got %v", err)
188+
// Test with ON CLUSTER in table_options
189+
err = testDB.Set("gorm:table_options", "ON CLUSTER 'test_cluster' ENGINE ReplicatedMergeTree ORDER BY id").Table("cluster_test").AutoMigrate(&ClusterTable{})
190+
if err != nil {
191+
t.Fatalf("no error should happen when auto migrate with ON CLUSTER, but got %v", err)
192+
}
193+
194+
// Check if ON CLUSTER was placed correctly in the SQL
195+
if len(sqlStrings) == 0 {
196+
t.Fatalf("expected SQL to be captured")
197+
}
198+
199+
createSQL := sqlStrings[len(sqlStrings)-1] // Get the last (CREATE TABLE) statement
200+
201+
// Verify ON CLUSTER appears after table name but before column definitions
202+
expectedPattern := `CREATE TABLE.*cluster_test.* ON CLUSTER 'test_cluster' \(.*id.*\).*ENGINE ReplicatedMergeTree`
203+
matched, err := regexp.MatchString(expectedPattern, createSQL)
204+
if err != nil {
205+
t.Fatalf("regex error: %v", err)
144206
}
145-
if len(sqlStrings) > 0 {
146-
t.Fatalf("should not auto-migrate table if there have not been any changes to the schema")
207+
if !matched {
208+
t.Fatalf("ON CLUSTER not placed correctly. Got SQL: %s", createSQL)
147209
}
148210
}

0 commit comments

Comments
 (0)