|
8 | 8 |
|
9 | 9 | import sqlalchemy as sa |
10 | 10 | from singer_sdk.sinks import SQLSink |
11 | | -from sqlalchemy.sql import Executable |
12 | 11 | from sqlalchemy.sql.expression import bindparam |
13 | 12 |
|
14 | 13 | from target_postgres.connector import PostgresConnector |
15 | 14 |
|
16 | 15 | if t.TYPE_CHECKING: |
17 | 16 | from singer_sdk.connectors.sql import FullyQualifiedName |
| 17 | + from sqlalchemy.sql import Executable |
18 | 18 |
|
19 | 19 |
|
20 | 20 | class PostgresSink(SQLSink): |
@@ -52,10 +52,8 @@ def setup(self) -> None: |
52 | 52 | This method is called on Sink creation, and creates the required Schema and |
53 | 53 | Table entities in the target database. |
54 | 54 | """ |
55 | | - if self.key_properties is None or self.key_properties == []: |
56 | | - self.append_only = True |
57 | | - else: |
58 | | - self.append_only = False |
| 55 | + self.append_only = self.key_properties is None or self.key_properties == [] |
| 56 | + |
59 | 57 | if self.schema_name: |
60 | 58 | self.connector.prepare_schema(self.schema_name) |
61 | 59 | with self.connector._connect() as connection, connection.begin(): |
@@ -165,7 +163,7 @@ def bulk_insert_records( # type: ignore[override] |
165 | 163 | for column in columns: |
166 | 164 | insert_record[column.name] = record.get(column.name) |
167 | 165 | # No need to check for a KeyError here because the SDK already |
168 | | - # guaruntees that all key properties exist in the record. |
| 166 | + # guarantees that all key properties exist in the record. |
169 | 167 | primary_key_value = "".join([str(record[key]) for key in primary_keys]) |
170 | 168 | insert_records[primary_key_value] = insert_record |
171 | 169 | data_to_insert = list(insert_records.values()) |
@@ -296,7 +294,7 @@ def schema_name(self) -> str | None: |
296 | 294 | Returns: |
297 | 295 | The target schema name. |
298 | 296 | """ |
299 | | - # Look for a default_target_scheme in the configuraion fle |
| 297 | + # Look for a default_target_scheme in the configuration fle |
300 | 298 | default_target_schema: str = self.config.get("default_target_schema", None) |
301 | 299 | parts = self.stream_name.split("-") |
302 | 300 |
|
|
0 commit comments