Skip to content

Commit 557c9da

Browse files
feat: Optionally store content marked as "base16" encoded into bytea columns (#287)
Motivation: more efficient `bytea` storage of data marked with `"contentEncoding": "base16"` JSON Schema References: - https://datatracker.ietf.org/doc/html/rfc4648#section-8 - https://json-schema.org/draft/2020-12/draft-bhutton-json-schema-validation-00#rfc.section.8.3 Bytea is at least twice as efficient as string to store hex data: ``` select octet_length('\x2BdfBd329984Cf0DC9027734681A16f542cF3bB4'::bytea) as "bytea", octet_length('0x2BdfBd329984Cf0DC9027734681A16f542cF3bB4') as "string" ; bytea | string -------+-------- 20 | 42 ``` It's probably a good idea to make this behaviour opt-in. I'm not sure how to implement that since most of the type detection code is inside static methods of `PostgresConnector`. I can't find a way to inject the target config without making a big change. --------- Co-authored-by: Edgar Ramírez Mondragón <[email protected]>
1 parent 1f7b73d commit 557c9da

File tree

6 files changed

+221
-9
lines changed

6 files changed

+221
-9
lines changed

README.md

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ The below table shows how this tap will map between jsonschema datatypes and Pos
174174
| UNSUPPORTED | bit varying [ (n) ] |
175175
| boolean | boolean |
176176
| UNSUPPORTED | box |
177-
| UNSUPPORTED | bytea |
177+
| string with contentEncoding="base16" ([opt-in feature](#content-encoding-support)) | bytea |
178178
| UNSUPPORTED | character [ (n) ] |
179179
| UNSUPPORTED | character varying [ (n) ] |
180180
| UNSUPPORTED | cidr |
@@ -215,6 +215,7 @@ The below table shows how this tap will map between jsonschema datatypes and Pos
215215
Note that while object types are mapped directly to jsonb, array types are mapped to a jsonb array.
216216

217217
If a column has multiple jsonschema types, the following order is using to order Postgres types, from highest priority to lowest priority.
218+
- BYTEA
218219
- ARRAY(JSONB)
219220
- JSONB
220221
- TEXT
@@ -227,3 +228,50 @@ If a column has multiple jsonschema types, the following order is using to order
227228
- INTEGER
228229
- BOOLEAN
229230
- NOTYPE
231+
232+
## Content Encoding Support
233+
234+
Json Schema supports the [`contentEncoding` keyword](https://datatracker.ietf.org/doc/html/rfc4648#section-8), which can be used to specify the encoding of input string types.
235+
236+
This target can detect content encoding clues in the schema to determine how to store the data in the postgres in a more efficient way.
237+
238+
Content encoding interpretation is disabled by default. This is because the default config is meant to be as permissive as possible, and do not make any assumptions about the data that could lead to data loss.
239+
240+
However if you know your data respects the advertised content encoding way, you can enable this feature to get better performance and storage efficiency.
241+
242+
To enable it, set the `interpret_content_encoding` option to `True`.
243+
244+
### base16
245+
246+
The string is encoded using the base16 encoding, as defined in [RFC 4648](https://json-schema.org/draft/2020-12/draft-bhutton-json-schema-validation-00#rfc.section.8.3
247+
).
248+
249+
Example schema:
250+
```json
251+
{
252+
"type": "object",
253+
"properties": {
254+
"my_hex": {
255+
"type": "string",
256+
"contentEncoding": "base16"
257+
}
258+
}
259+
}
260+
```
261+
262+
Data will be stored as a `bytea` in the database.
263+
264+
Example data:
265+
```json
266+
# valid data
267+
{ "my_hex": "01AF" }
268+
{ "my_hex": "01af" }
269+
{ "my_hex": "1af" }
270+
{ "my_hex": "0x1234" }
271+
272+
# invalid data
273+
{ "my_hex": " 0x1234 " }
274+
{ "my_hex": "House" }
275+
```
276+
277+
For convenience, data prefixed with `0x` or containing an odd number of characters is supported although it's not part of the standard.

target_postgres/connector.py

Lines changed: 65 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import signal
88
import typing as t
99
from contextlib import contextmanager
10+
from functools import cached_property
1011
from os import chmod, path
1112
from typing import cast
1213

@@ -15,7 +16,7 @@
1516
import sqlalchemy as sa
1617
from singer_sdk import SQLConnector
1718
from singer_sdk import typing as th
18-
from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, JSONB
19+
from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, BYTEA, JSONB
1920
from sqlalchemy.engine import URL
2021
from sqlalchemy.engine.url import make_url
2122
from sqlalchemy.types import (
@@ -79,6 +80,18 @@ def __init__(self, config: dict) -> None:
7980
sqlalchemy_url=url.render_as_string(hide_password=False),
8081
)
8182

83+
@cached_property
84+
def interpret_content_encoding(self) -> bool:
85+
"""Whether to interpret schema contentEncoding to set the column type.
86+
87+
It is an opt-in feature because it might result in data loss if the
88+
actual data does not match the schema's advertised encoding.
89+
90+
Returns:
91+
True if the feature is enabled, False otherwise.
92+
"""
93+
return self.config.get("interpret_content_encoding", False)
94+
8295
def prepare_table( # type: ignore[override]
8396
self,
8497
full_table_name: str,
@@ -205,8 +218,7 @@ def clone_table(
205218
new_table.create(bind=connection)
206219
return new_table
207220

208-
@staticmethod
209-
def to_sql_type(jsonschema_type: dict) -> sa.types.TypeEngine:
221+
def to_sql_type(self, jsonschema_type: dict) -> sa.types.TypeEngine: # type: ignore[override]
210222
"""Return a JSON Schema representation of the provided type.
211223
212224
By default will call `typing.to_sql_type()`.
@@ -232,6 +244,8 @@ def to_sql_type(jsonschema_type: dict) -> sa.types.TypeEngine:
232244
json_type_dict = {"type": entry}
233245
if jsonschema_type.get("format", False):
234246
json_type_dict["format"] = jsonschema_type["format"]
247+
if encoding := jsonschema_type.get("contentEncoding", False):
248+
json_type_dict["contentEncoding"] = encoding
235249
json_type_array.append(json_type_dict)
236250
else:
237251
msg = "Invalid format for jsonschema type: not str or list."
@@ -246,16 +260,13 @@ def to_sql_type(jsonschema_type: dict) -> sa.types.TypeEngine:
246260
return NOTYPE()
247261
sql_type_array = []
248262
for json_type in json_type_array:
249-
picked_type = PostgresConnector.pick_individual_type(
250-
jsonschema_type=json_type
251-
)
263+
picked_type = self.pick_individual_type(jsonschema_type=json_type)
252264
if picked_type is not None:
253265
sql_type_array.append(picked_type)
254266

255267
return PostgresConnector.pick_best_sql_type(sql_type_array=sql_type_array)
256268

257-
@staticmethod
258-
def pick_individual_type(jsonschema_type: dict):
269+
def pick_individual_type(self, jsonschema_type: dict):
259270
"""Select the correct sql type assuming jsonschema_type has only a single type.
260271
261272
Args:
@@ -272,8 +283,15 @@ def pick_individual_type(jsonschema_type: dict):
272283
return JSONB()
273284
if "array" in jsonschema_type["type"]:
274285
return ARRAY(JSONB())
286+
287+
# string formats
275288
if jsonschema_type.get("format") == "date-time":
276289
return TIMESTAMP()
290+
if (
291+
self.interpret_content_encoding
292+
and jsonschema_type.get("contentEncoding") == "base16"
293+
):
294+
return HexByteString()
277295
individual_type = th.to_sql_type(jsonschema_type)
278296
if isinstance(individual_type, VARCHAR):
279297
return TEXT()
@@ -290,6 +308,7 @@ def pick_best_sql_type(sql_type_array: list):
290308
An instance of the best SQL type class based on defined precedence order.
291309
"""
292310
precedence_order = [
311+
HexByteString,
293312
ARRAY,
294313
JSONB,
295314
TEXT,
@@ -834,3 +853,41 @@ def python_type(self):
834853
def as_generic(self, *args: t.Any, **kwargs: t.Any):
835854
"""Return the generic type for this column."""
836855
return TEXT()
856+
857+
858+
class HexByteString(TypeDecorator):
859+
"""Convert Python string representing Hex data to bytes and vice versa.
860+
861+
This is used to store binary data in more efficient format in the database.
862+
The string is encoded using the base16 encoding, as defined in RFC 4648
863+
https://json-schema.org/draft/2020-12/draft-bhutton-json-schema-validation-00#rfc.section.8.3
864+
For convenience, data prefixed with `0x` or containing an odd number of characters
865+
is supported although it's not part of the standard.
866+
"""
867+
868+
impl = BYTEA
869+
870+
def process_bind_param(self, value, dialect):
871+
"""Convert hex string to bytes."""
872+
if value is None:
873+
return None
874+
875+
if isinstance(value, str):
876+
if value.startswith("\\x") or value.startswith("0x"):
877+
value = value[2:]
878+
879+
if len(value) % 2:
880+
value = f"0{value}"
881+
882+
try:
883+
value = bytes.fromhex(value)
884+
except ValueError as ex:
885+
raise ValueError(f"Invalid hexadecimal string: {value}") from ex
886+
887+
if not isinstance(value, bytearray | memoryview | bytes):
888+
raise TypeError(
889+
"HexByteString columns support only bytes or hex string values. "
890+
f"{type(value)} is not supported"
891+
)
892+
893+
return value

target_postgres/target.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,17 @@ def __init__(
189189
+ "for more information."
190190
),
191191
),
192+
th.Property(
193+
"interpret_content_encoding",
194+
th.BooleanType,
195+
default=False,
196+
description=(
197+
"If set to true, the target will interpret the content encoding of the "
198+
"schema to determine how to store the data. Using this option may "
199+
"result in a more efficient storage of the data but may also result "
200+
"in an error if the data is not encoded as expected."
201+
),
202+
),
192203
th.Property(
193204
"ssl_enable",
194205
th.BooleanType,
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{"type":"SCHEMA","stream":"test_base_16_encoding_interpreted","schema":{"type":"object","properties":{"id":{"type":"string"},"contract_address":{"type":"string","contentEncoding":"base16"},"raw_event_data":{"type":["string","null"],"contentEncoding":"base16"}},"required":["id","contract_address","raw_event_data"]},"key_properties":["id"]}
2+
{"type":"RECORD","stream":"test_base_16_encoding_interpreted","record":{"id":"test_handle_an_hex_str","contract_address":"0xA1B2C3D4E5F607080910","raw_event_data":"0xA1B2C3D4E5F60708091001020304050607080910010203040506070809100102030405060708091001020304050607080910"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
3+
{"type":"RECORD","stream":"test_base_16_encoding_interpreted","record":{"id":"empty_0x_str","contract_address":"0x","raw_event_data":"0x"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
4+
{"type":"RECORD","stream":"test_base_16_encoding_interpreted","record":{"id":"empty_str","contract_address":"","raw_event_data":""},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
5+
{"type":"RECORD","stream":"test_base_16_encoding_interpreted","record":{"id":"test_nullable_field","contract_address":"","raw_event_data":null},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
6+
{"type":"RECORD","stream":"test_base_16_encoding_interpreted","record":{"id":"test_handle_hex_without_the_0x_prefix","contract_address":"A1B2C3D4E5F607080910","raw_event_data":"A1B2C3D4E5F6070809100102030405060"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
7+
{"type":"RECORD","stream":"test_base_16_encoding_interpreted","record":{"id":"test_handle_odd_and_even_number_of_chars","contract_address":"0xA1","raw_event_data":"A12"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
8+
{"type":"RECORD","stream":"test_base_16_encoding_interpreted","record":{"id":"test_handle_upper_and_lowercase_hex","contract_address":"0xa1","raw_event_data":"A12b"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{"type":"SCHEMA","stream":"test_base_16_encoding_not_interpreted","schema":{"type":"object","properties":{"id":{"type":"string"},"contract_address":{"type":"string","contentEncoding":"base16"},"raw_event_data":{"type":["string","null"],"contentEncoding":"base16"}},"required":["id","contract_address","raw_event_data"]},"key_properties":["id"]}
2+
{"type":"RECORD","stream":"test_base_16_encoding_not_interpreted","record":{"id":"test_handle_an_hex_str","contract_address":"0xA1B2C3D4E5F607080910","raw_event_data":"0xA1B2C3D4E5F60708091001020304050607080910010203040506070809100102030405060708091001020304050607080910"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
3+
{"type":"RECORD","stream":"test_base_16_encoding_not_interpreted","record":{"id":"empty_0x_str","contract_address":"0x","raw_event_data":"0x"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
4+
{"type":"RECORD","stream":"test_base_16_encoding_not_interpreted","record":{"id":"empty_str","contract_address":"","raw_event_data":""},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
5+
{"type":"RECORD","stream":"test_base_16_encoding_not_interpreted","record":{"id":"test_nullable_field","contract_address":"","raw_event_data":null},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
6+
{"type":"RECORD","stream":"test_base_16_encoding_not_interpreted","record":{"id":"test_handle_hex_without_the_0x_prefix","contract_address":"A1B2C3D4E5F607080910","raw_event_data":"A1B2C3D4E5F6070809100102030405060"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
7+
{"type":"RECORD","stream":"test_base_16_encoding_not_interpreted","record":{"id":"test_handle_odd_and_even_number_of_chars","contract_address":"0xA1","raw_event_data":"A12"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
8+
{"type":"RECORD","stream":"test_base_16_encoding_not_interpreted","record":{"id":"test_handle_upper_and_lowercase_hex","contract_address":"0xa1","raw_event_data":"A12b"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}

target_postgres/tests/test_target_postgres.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,14 @@ def verify_data(
125125
result_dict = [
126126
remove_metadata_columns(row._asdict()) for row in result.all()
127127
]
128+
129+
# bytea columns are returned as memoryview objects
130+
# we need to convert them to bytes to allow comparison with check_data
131+
for row in result_dict:
132+
for col in row:
133+
if isinstance(row[col], memoryview):
134+
row[col] = bytes(row[col])
135+
128136
assert result_dict == check_data
129137
else:
130138
raise ValueError("Invalid check_data - not dict or list of dicts")
@@ -500,6 +508,78 @@ def test_new_array_column(postgres_target):
500508
singer_file_to_target(file_name, postgres_target)
501509

502510

511+
def test_base16_content_encoding_not_interpreted(postgres_config_no_ssl):
512+
"""Make sure we can insert base16 encoded data into the database without interpretation"""
513+
postgres_config_modified = copy.deepcopy(postgres_config_no_ssl)
514+
postgres_config_modified["interpret_content_encoding"] = False
515+
target = TargetPostgres(config=postgres_config_modified)
516+
517+
singer_file_to_target("base16_content_encoding_not_interpreted.singer", target)
518+
519+
rows = [
520+
{"id": "empty_0x_str", "contract_address": "0x", "raw_event_data": "0x"},
521+
{"id": "empty_str", "contract_address": "", "raw_event_data": ""},
522+
{
523+
"id": "test_handle_an_hex_str",
524+
"contract_address": "0xA1B2C3D4E5F607080910",
525+
"raw_event_data": "0xA1B2C3D4E5F60708091001020304050607080910010203040506070809100102030405060708091001020304050607080910",
526+
},
527+
{
528+
"id": "test_handle_hex_without_the_0x_prefix",
529+
"contract_address": "A1B2C3D4E5F607080910",
530+
"raw_event_data": "A1B2C3D4E5F6070809100102030405060",
531+
},
532+
{
533+
"id": "test_handle_odd_and_even_number_of_chars",
534+
"contract_address": "0xA1",
535+
"raw_event_data": "A12",
536+
},
537+
{
538+
"id": "test_handle_upper_and_lowercase_hex",
539+
"contract_address": "0xa1",
540+
"raw_event_data": "A12b",
541+
},
542+
{"id": "test_nullable_field", "contract_address": "", "raw_event_data": None},
543+
]
544+
verify_data(target, "test_base_16_encoding_not_interpreted", 7, "id", rows)
545+
546+
547+
def test_base16_content_encoding_interpreted(postgres_config_no_ssl):
548+
"""Make sure we can insert base16 encoded data into the database with interpretation"""
549+
postgres_config_modified = copy.deepcopy(postgres_config_no_ssl)
550+
postgres_config_modified["interpret_content_encoding"] = True
551+
target = TargetPostgres(config=postgres_config_modified)
552+
553+
singer_file_to_target("base16_content_encoding_interpreted.singer", target)
554+
555+
rows = [
556+
{"id": "empty_0x_str", "contract_address": b"", "raw_event_data": b""},
557+
{"id": "empty_str", "contract_address": b"", "raw_event_data": b""},
558+
{
559+
"id": "test_handle_an_hex_str",
560+
"contract_address": b"\xa1\xb2\xc3\xd4\xe5\xf6\x07\x08\x09\x10",
561+
"raw_event_data": b"\xa1\xb2\xc3\xd4\xe5\xf6\x07\x08\x09\x10\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10",
562+
},
563+
{
564+
"id": "test_handle_hex_without_the_0x_prefix",
565+
"contract_address": b"\xa1\xb2\xc3\xd4\xe5\xf6\x07\x08\x09\x10",
566+
"raw_event_data": b"\x0a\x1b\x2c\x3d\x4e\x5f\x60\x70\x80\x91\x00\x10\x20\x30\x40\x50\x60",
567+
},
568+
{
569+
"id": "test_handle_odd_and_even_number_of_chars",
570+
"contract_address": b"\xa1",
571+
"raw_event_data": b"\x0a\x12",
572+
},
573+
{
574+
"id": "test_handle_upper_and_lowercase_hex",
575+
"contract_address": b"\xa1",
576+
"raw_event_data": b"\xa1\x2b",
577+
},
578+
{"id": "test_nullable_field", "contract_address": b"", "raw_event_data": None},
579+
]
580+
verify_data(target, "test_base_16_encoding_interpreted", 7, "id", rows)
581+
582+
503583
def test_activate_version_hard_delete(postgres_config_no_ssl):
504584
"""Activate Version Hard Delete Test"""
505585
table_name = "test_activate_version_hard"

0 commit comments

Comments
 (0)