Skip to content

Commit c34b061

Browse files
authored
fix: deltalake fetch table attributes (#6455)
# Description - There seems to be some breaking changes as `DESCRIBE QUERY TABLE` for Databricks started returning 4 columns instead of 3 columns: col_name, data_type, comment and metadata. [Docs link.](https://docs.databricks.com/aws/en/sql/language-manual/sql-ref-syntax-aux-describe-table) <img width="633" height="104" alt="Screenshot 2025-10-23 at 8 30 45 AM" src="https://github.com/user-attachments/assets/60d76c9a-a50f-48fe-abee-1f6abb2e5399" /> - Dynamically checking for col_name and data_type and populating table schema. ## Linear Ticket - Resolves WAR-1216 ## Security - [x] The code changed/added as part of this pull request won't create any security issues with how the software is being used.
1 parent 709cdf5 commit c34b061

File tree

1 file changed

+42
-10
lines changed

1 file changed

+42
-10
lines changed

warehouse/integrations/deltalake/deltalake.go

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"github.com/rudderlabs/rudder-go-kit/jsonrs"
2525
obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
26+
2627
"github.com/rudderlabs/rudder-server/utils/misc"
2728
warehouseclient "github.com/rudderlabs/rudder-server/warehouse/client"
2829
sqlmiddleware "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
@@ -404,31 +405,62 @@ func (d *Deltalake) sendStatForMissingDatatype(missingDatatype string) {
404405

405406
// fetchTableAttributes fetches the attributes of a table
406407
func (d *Deltalake) fetchTableAttributes(ctx context.Context, tableName string) (model.TableSchema, error) {
407-
tableSchema := make(model.TableSchema)
408-
409408
query := fmt.Sprintf(`DESCRIBE QUERY TABLE %s.%s;`, d.Namespace, tableName)
410409

411410
rows, err := d.DB.QueryContext(ctx, query)
412411
if err != nil {
413-
return nil, fmt.Errorf("executing fetching table attributes: %w", err)
412+
return nil, fmt.Errorf("executing describe query table: %w", err)
414413
}
415414
defer func() { _ = rows.Close() }()
416415

416+
columns, err := rows.Columns()
417+
if err != nil {
418+
return nil, fmt.Errorf("getting rows columns: %w", err)
419+
}
420+
columnsMap := make(map[string]int)
421+
for i, col := range columns {
422+
columnsMap[col] = i
423+
}
424+
425+
columnNameIndex, ok := columnsMap["col_name"]
426+
if !ok {
427+
return nil, errors.New("column name index not found")
428+
}
429+
datatypeIndex, ok := columnsMap["data_type"]
430+
if !ok {
431+
return nil, errors.New("datatype index not found")
432+
}
433+
434+
scanDest := make([]any, len(columns))
435+
scanPtrs := make([]any, len(columns))
436+
for i := range scanDest {
437+
scanPtrs[i] = &scanDest[i]
438+
}
439+
440+
tableSchema := make(model.TableSchema)
441+
417442
for rows.Next() {
418-
var (
419-
colName, datatype string
420-
comment sql.NullString
421-
)
443+
if err := rows.Scan(scanPtrs...); err != nil {
444+
return nil, fmt.Errorf("scanning row: %w", err)
445+
}
446+
447+
colName, ok := scanDest[columnNameIndex].(string)
448+
if !ok {
449+
return nil, fmt.Errorf("column name is not a string: %v", scanDest[columnNameIndex])
450+
}
422451

423-
if err = rows.Scan(&colName, &datatype, &comment); err != nil {
424-
return nil, fmt.Errorf("processing fetched table attributes: %w", err)
452+
datatype, ok := scanDest[datatypeIndex].(string)
453+
if !ok {
454+
return nil, fmt.Errorf("datatype is not a string: %v", scanDest[datatypeIndex])
425455
}
426456

427457
tableSchema[colName] = datatype
428458
}
459+
429460
if err = rows.Err(); err != nil {
430-
return nil, fmt.Errorf("processing fetched table attributes: %w", err)
461+
return nil, fmt.Errorf("iterating rows: %w", err)
431462
}
463+
432464
return tableSchema, nil
433465
}
434466

0 commit comments

Comments
 (0)