Skip to content

Commit 6539830

Browse files
committed
added jdbc source type parameter
1 parent 4d8acfb commit 6539830

File tree

3 files changed

+40
-5
lines changed

3 files changed

+40
-5
lines changed

dataframe-jdbc/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/db/DbType.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ public abstract class DbType(public val dbTypeInJdbcUrl: String) {
196196
}
197197

198198
return typeInformationWithPostprocessingFor(
199+
jdbcSourceType = kType.withNullability(tableColumnMetadata.isNullable),
199200
targetSchema = ColumnSchema.Value(kType.withNullability(tableColumnMetadata.isNullable)),
200201
columnPostprocessor = postprocessor?.castToAny(),
201202
)
@@ -242,6 +243,8 @@ public abstract class DbType(public val dbTypeInJdbcUrl: String) {
242243
type = schema.type,
243244
)
244245

246+
// TODO, this should be postponed to post-processing.
247+
// List<AnyRow>.toDataFrame() is heavy!
245248
is ColumnSchema.Group ->
246249
DataColumn.createColumnGroup(
247250
name = name,

dataframe-jdbc/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/db/DuckDb.kt

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ import kotlin.collections.toList
5959
import kotlin.reflect.KTypeProjection
6060
import kotlin.reflect.full.createType
6161
import kotlin.reflect.full.withNullability
62+
import kotlin.reflect.typeOf
6263
import kotlin.time.Instant
6364
import kotlin.time.toKotlinInstant
6465
import kotlin.uuid.Uuid
@@ -170,6 +171,7 @@ public object DuckDb : DbType("duckdb") {
170171
).withNullability(isNullable)
171172

172173
typeInformationWithPreprocessingForValueColumnOf<Map<String, Any?>, Map<String, Any?>>(
174+
jdbcSourceType = typeOf<Map<String, Any?>>().withNullability(isNullable), // unused
173175
targetColumnType = targetMapType,
174176
) { map, _ ->
175177
// only need to preprocess the values, as the keys are just Strings
@@ -195,6 +197,7 @@ public object DuckDb : DbType("duckdb") {
195197

196198
// todo maybe List<DataRow> should become FrameColumn
197199
typeInformationWithPreprocessingFor<SqlArray, List<Any?>>(
200+
jdbcSourceType = typeOf<SqlArray>().withNullability(isNullable),
198201
targetSchema = ColumnSchema.Value(targetListType),
199202
) { sqlArray, _ ->
200203
sqlArray
@@ -204,7 +207,11 @@ public object DuckDb : DbType("duckdb") {
204207
}
205208

206209
// TODO requires #1266 for specific types
207-
STRUCT -> typeInformationForValueColumnOf<Struct>(isNullable)
210+
STRUCT -> {
211+
val structTypes = parseStructType(sqlTypeName)
212+
213+
typeInformationForValueColumnOf<Struct>(isNullable)
214+
}
208215

209216
// Cannot handle this in Kotlin
210217
UNION -> typeInformationForValueColumnOf<Any>(isNullable)
@@ -269,6 +276,17 @@ public object DuckDb : DbType("duckdb") {
269276
return typeString.take(typeString.indexOfLast { it == '[' })
270277
}
271278

279+
/** Parses "STRUCT(v VARCHAR, i INTEGER)" into [("v", "VARCHAR"), ("i", "INTEGER")] */
280+
internal fun parseStructType(typeString: String): Map<String, String> {
281+
if (!typeString.startsWith("STRUCT(")) {
282+
error("invalid STRUCT type: $typeString")
283+
}
284+
return typeString.removeSurrounding("STRUCT(", ")")
285+
.split(",")
286+
.map { it.trim().split(" ") }
287+
.associate { (name, type) -> name to type }
288+
}
289+
272290
/**
273291
* How to filter out system tables from user-created ones when using
274292
* [DataFrame.readAllSqlTables][DataFrame.Companion.readAllSqlTables] and

dataframe-jdbc/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/db/TypeInformation.kt

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public typealias AnyTypeInformation = TypeInformation<*, *, *>
2828
* to a [DataColumn] of with values of type [P].
2929
*/
3030
public open class TypeInformation<J : Any, D : Any, P : Any>(
31+
public open val jdbcSourceType: KType,
3132
public open val targetSchema: ColumnSchema,
3233
public open val valuePreprocessor: DbValuePreprocessor<J, D>?,
3334
public open val columnPostprocessor: DbColumnPostprocessor<D, P>?,
@@ -55,38 +56,45 @@ public fun TypeInformation<*, *, *>.castToAny(): TypeInformation<Any, Any, Any>
5556
// region generic constructors
5657

5758
public fun <J : Any, D : Any, P : Any> typeInformationWithProcessingFor(
59+
jdbcSourceType: KType,
5860
targetSchema: ColumnSchema,
5961
valuePreprocessor: DbValuePreprocessor<J, D>?,
6062
columnPostprocessor: DbColumnPostprocessor<D, P>?,
6163
): TypeInformation<J, D, P> =
6264
TypeInformation(
65+
jdbcSourceType = jdbcSourceType,
6366
targetSchema = targetSchema,
6467
valuePreprocessor = valuePreprocessor,
6568
columnPostprocessor = columnPostprocessor,
6669
)
6770

68-
public fun <J : Any> typeInformationFor(targetSchema: ColumnSchema): TypeInformation<J, J, J> =
71+
public fun <J : Any> typeInformationFor(jdbcSourceType: KType, targetSchema: ColumnSchema): TypeInformation<J, J, J> =
6972
typeInformationWithProcessingFor(
73+
jdbcSourceType = jdbcSourceType,
7074
targetSchema = targetSchema,
7175
valuePreprocessor = null,
7276
columnPostprocessor = null,
7377
)
7478

7579
public fun <J : Any, D : Any> typeInformationWithPreprocessingFor(
80+
jdbcSourceType: KType,
7681
targetSchema: ColumnSchema,
7782
valuePreprocessor: DbValuePreprocessor<J, D>?,
7883
): TypeInformation<J, D, D> =
7984
typeInformationWithProcessingFor(
85+
jdbcSourceType = jdbcSourceType,
8086
targetSchema = targetSchema,
8187
valuePreprocessor = valuePreprocessor,
8288
columnPostprocessor = null,
8389
)
8490

8591
public fun <J : Any, P : Any> typeInformationWithPostprocessingFor(
92+
jdbcSourceType: KType,
8693
targetSchema: ColumnSchema,
8794
columnPostprocessor: DbColumnPostprocessor<J, P>?,
8895
): TypeInformation<J, J, P> =
8996
typeInformationWithProcessingFor(
97+
jdbcSourceType = jdbcSourceType,
9098
targetSchema = targetSchema,
9199
valuePreprocessor = null,
92100
columnPostprocessor = columnPostprocessor,
@@ -97,43 +105,49 @@ public fun <J : Any, P : Any> typeInformationWithPostprocessingFor(
97105
// region ValueColumn constructors
98106

99107
public fun <J : Any> typeInformationForValueColumnOf(kType: KType): TypeInformation<J, J, J> =
100-
typeInformationFor(targetSchema = ColumnSchema.Value(kType))
108+
typeInformationFor(jdbcSourceType = kType, targetSchema = ColumnSchema.Value(kType))
101109

102110
public inline fun <reified J : Any> typeInformationForValueColumnOf(isNullable: Boolean): TypeInformation<J, J, J> =
103111
typeInformationForValueColumnOf(typeOf<J>().withNullability(isNullable))
104112

105113
public fun <J : Any, D : Any> typeInformationWithPreprocessingForValueColumnOf(
114+
jdbcSourceType: KType,
106115
targetColumnType: KType,
107116
valuePreprocessor: DbValuePreprocessor<J, D>?,
108117
): TypeInformation<J, D, D> =
109118
typeInformationWithPreprocessingFor(
119+
jdbcSourceType = jdbcSourceType,
110120
targetSchema = ColumnSchema.Value(targetColumnType),
111121
valuePreprocessor = valuePreprocessor,
112122
)
113123

114-
public inline fun <J : Any, reified D : Any> typeInformationWithPreprocessingForValueColumnOf(
124+
public inline fun <reified J : Any, reified D : Any> typeInformationWithPreprocessingForValueColumnOf(
115125
isNullable: Boolean,
116126
valuePreprocessor: DbValuePreprocessor<J, D>?,
117127
): TypeInformation<J, D, D> =
118128
typeInformationWithPreprocessingForValueColumnOf(
129+
jdbcSourceType = typeOf<J>().withNullability(isNullable),
119130
targetColumnType = typeOf<D>().withNullability(isNullable),
120131
valuePreprocessor = valuePreprocessor,
121132
)
122133

123134
public fun <J : Any, P : Any> typeInformationWithPostprocessingForValueColumnOf(
135+
jdbcSourceType: KType,
124136
targetColumnType: KType,
125137
columnPostprocessor: DbColumnPostprocessor<J, P>?,
126138
): TypeInformation<J, J, P> =
127139
typeInformationWithPostprocessingFor(
140+
jdbcSourceType = jdbcSourceType,
128141
targetSchema = ColumnSchema.Value(targetColumnType),
129142
columnPostprocessor = columnPostprocessor,
130143
)
131144

132-
public inline fun <J : Any, reified P : Any> typeInformationWithPostprocessingForValueColumnOf(
145+
public inline fun <reified J : Any, reified P : Any> typeInformationWithPostprocessingForValueColumnOf(
133146
isNullable: Boolean,
134147
columnPostprocessor: DbColumnPostprocessor<J, P>?,
135148
): TypeInformation<J, J, P> =
136149
typeInformationWithPostprocessingForValueColumnOf(
150+
jdbcSourceType = typeOf<J>().withNullability(isNullable),
137151
targetColumnType = typeOf<P>().withNullability(isNullable),
138152
columnPostprocessor = columnPostprocessor,
139153
)

0 commit comments

Comments
 (0)