@@ -115,6 +115,14 @@ def prepare_table( # type: ignore[override]
115115 connection = connection ,
116116 )
117117 return table
118+ # To make table reflection work properly with pgvector,
119+ # the module needs to be imported beforehand.
120+ try :
121+ from pgvector .sqlalchemy import Vector # noqa: F401
122+ except ImportError :
123+ self .logger .debug (
124+ "Unable to handle pgvector's `Vector` type. Please install `pgvector`."
125+ )
118126 meta .reflect (connection , only = [table_name ])
119127 table = meta .tables [
120128 full_table_name
@@ -280,6 +288,51 @@ def pick_individual_type(jsonschema_type: dict):
280288 if "object" in jsonschema_type ["type" ]:
281289 return JSONB ()
282290 if "array" in jsonschema_type ["type" ]:
291+ # Select between different kinds of `ARRAY` data types.
292+ #
293+ # This currently leverages an unspecified definition for the Singer SCHEMA,
294+ # using the `additionalProperties` attribute to convey additional type
295+ # information, agnostic of the target database.
296+ #
297+ # In this case, it is about telling different kinds of `ARRAY` types apart:
298+ # Either it is a vanilla `ARRAY`, to be stored into a `jsonb[]` type, or,
299+ # alternatively, it can be a "vector" kind `ARRAY` of floating point
300+ # numbers, effectively what pgvector is storing in its `VECTOR` type.
301+ #
302+ # Still, `type: "vector"` is only a surrogate label here, because other
303+ # database systems may use different types for implementing the same thing,
304+ # and need to translate accordingly.
305+ """
306+ Schema override rule in `meltano.yml`:
307+
308+ type: "array"
309+ items:
310+ type: "number"
311+ additionalProperties:
312+ storage:
313+ type: "vector"
314+ dim: 4
315+
316+ Produced schema annotation in `catalog.json`:
317+
318+ {"type": "array",
319+ "items": {"type": "number"},
320+ "additionalProperties": {"storage": {"type": "vector", "dim": 4}}}
321+ """
322+ if (
323+ "additionalProperties" in jsonschema_type
324+ and "storage" in jsonschema_type ["additionalProperties" ]
325+ ):
326+ storage_properties = jsonschema_type ["additionalProperties" ]["storage" ]
327+ if (
328+ "type" in storage_properties
329+ and storage_properties ["type" ] == "vector"
330+ ):
331+ # On PostgreSQL/pgvector, use the corresponding type definition
332+ # from its SQLAlchemy dialect.
333+ from pgvector .sqlalchemy import Vector
334+
335+ return Vector (storage_properties ["dim" ])
283336 return ARRAY (JSONB ())
284337 if jsonschema_type .get ("format" ) == "date-time" :
285338 return TIMESTAMP ()
@@ -313,6 +366,13 @@ def pick_best_sql_type(sql_type_array: list):
313366 NOTYPE ,
314367 ]
315368
369+ try :
370+ from pgvector .sqlalchemy import Vector
371+
372+ precedence_order .append (Vector )
373+ except ImportError :
374+ pass
375+
316376 for sql_type in precedence_order :
317377 for obj in sql_type_array :
318378 if isinstance (obj , sql_type ):
@@ -519,7 +579,7 @@ def _adapt_column_type( # type: ignore[override]
519579 return
520580
521581 # Not the same type, generic type or compatible types
522- # calling merge_sql_types for assistnace
582+ # calling merge_sql_types for assistance.
523583 compatible_sql_type = self .merge_sql_types ([current_type , sql_type ])
524584
525585 if str (compatible_sql_type ) == str (current_type ):
0 commit comments