Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions metadata-ingestion/docs/sources/fivetran/fivetran_pre.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,27 @@ Works only for:

### Ingestion Limits

To prevent excessive data ingestion, the following limits apply per connector:
To prevent excessive data ingestion, the following configurable limits apply per connector:

- **Sync History**: Maximum of 500 sync runs per connector (controlled by `history_sync_lookback_period`)
- **Table Lineage**: Maximum of 120 table lineage entries per connector
- **Column Lineage**: Maximum of 1000 column lineage entries per connector
- **Sync History**: Maximum of 500 sync runs per connector (default: 500, configurable via `fivetran_log_config.max_jobs_per_connector`)
- **Table Lineage**: Maximum of 120 table lineage entries per connector (default: 120, configurable via `fivetran_log_config.max_table_lineage_per_connector`)
- **Column Lineage**: Maximum of 1000 column lineage entries per connector (default: 1000, configurable via `fivetran_log_config.max_column_lineage_per_connector`)

When these limits are exceeded, only the most recent entries are ingested. Warnings will be logged during ingestion to notify you when truncation occurs.

These limits act as safety nets to prevent excessive data ingestion. You can increase them cautiously if you need to ingest more historical data or have connectors with many tables/columns. Example configuration:

```yaml
source:
type: fivetran
config:
fivetran_log_config:
# ... other config ...
max_jobs_per_connector: 1000 # Increase sync history limit
max_table_lineage_per_connector: 500 # Increase table lineage limit
max_column_lineage_per_connector: 5000 # Increase column lineage limit
```

## Snowflake destination Configuration Guide

1. If your fivetran platform connector destination is snowflake, you need to provide user details and its role with correct privileges in order to fetch metadata.
Expand Down
23 changes: 23 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@

logger = logging.getLogger(__name__)

# Default safeguards to prevent fetching massive amounts of data.
MAX_TABLE_LINEAGE_PER_CONNECTOR_DEFAULT = 120
MAX_COLUMN_LINEAGE_PER_CONNECTOR_DEFAULT = 1000
MAX_JOBS_PER_CONNECTOR_DEFAULT = 500


class Constant:
"""
Expand Down Expand Up @@ -142,6 +147,24 @@ class FivetranLogConfig(ConfigModel):
"destination_config", "snowflake_destination_config"
)

max_jobs_per_connector: int = pydantic.Field(
default=MAX_JOBS_PER_CONNECTOR_DEFAULT,
gt=0,
description="Maximum number of sync jobs to retrieve per connector. This acts as a safety net to prevent excessive data ingestion. Increase cautiously if you need to see more historical sync runs.",
)

max_table_lineage_per_connector: int = pydantic.Field(
default=MAX_TABLE_LINEAGE_PER_CONNECTOR_DEFAULT,
gt=0,
description="Maximum number of table lineage entries to retrieve per connector. This acts as a safety net to prevent excessive data ingestion. When this limit is exceeded, only the most recent entries are ingested.",
)

max_column_lineage_per_connector: int = pydantic.Field(
default=MAX_COLUMN_LINEAGE_PER_CONNECTOR_DEFAULT,
gt=0,
description="Maximum number of column lineage entries to retrieve per connector. This acts as a safety net to prevent excessive data ingestion. When this limit is exceeded, only the most recent entries are ingested.",
)

@model_validator(mode="after")
def validate_destination_platform_and_config(self) -> "FivetranLogConfig":
if self.destination_platform == "snowflake":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@
)
from datahub.ingestion.source.fivetran.data_classes import Connector, Job
from datahub.ingestion.source.fivetran.fivetran_log_api import FivetranLogAPI
from datahub.ingestion.source.fivetran.fivetran_query import (
MAX_JOBS_PER_CONNECTOR,
MAX_TABLE_LINEAGE_PER_CONNECTOR,
)
from datahub.ingestion.source.fivetran.fivetran_rest_api import FivetranAPIClient
from datahub.ingestion.source.fivetran.response_models import FivetranConnectionDetails
from datahub.ingestion.source.state.stale_entity_removal_handler import (
Expand Down Expand Up @@ -133,11 +129,14 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, s
if destination_details.database is None:
destination_details.database = self.audit_log.fivetran_log_database

if len(connector.lineage) >= MAX_TABLE_LINEAGE_PER_CONNECTOR:
if (
len(connector.lineage)
>= self.config.fivetran_log_config.max_table_lineage_per_connector
):
self.report.warning(
title="Table lineage truncated",
message=f"The connector had more than {MAX_TABLE_LINEAGE_PER_CONNECTOR} table lineage entries. "
f"Only the most recent {MAX_TABLE_LINEAGE_PER_CONNECTOR} entries were ingested.",
message=f"The connector had more than {self.config.fivetran_log_config.max_table_lineage_per_connector} table lineage entries. "
f"Only the most recent {self.config.fivetran_log_config.max_table_lineage_per_connector} entries were ingested.",
context=f"{connector.connector_name} (connector_id: {connector.connector_id})",
)

Expand Down Expand Up @@ -475,11 +474,14 @@ def _get_connector_workunits(
yield datajob

# Map Fivetran's job/sync history entity with Datahub's data process entity
if len(connector.jobs) >= MAX_JOBS_PER_CONNECTOR:
if (
len(connector.jobs)
>= self.config.fivetran_log_config.max_jobs_per_connector
):
self.report.warning(
title="Not all sync history was captured",
message=f"The connector had more than {MAX_JOBS_PER_CONNECTOR} sync runs in the past {self.config.history_sync_lookback_period} days. "
f"Only the most recent {MAX_JOBS_PER_CONNECTOR} syncs were ingested.",
message=f"The connector had more than {self.config.fivetran_log_config.max_jobs_per_connector} sync runs in the past {self.config.history_sync_lookback_period} days. "
f"Only the most recent {self.config.fivetran_log_config.max_jobs_per_connector} syncs were ingested.",
context=f"{connector.connector_name} (connector_id: {connector.connector_id})",
)
for job in connector.jobs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ def __init__(self, fivetran_log_config: FivetranLogConfig) -> None:
def _initialize_fivetran_variables(
self,
) -> Tuple[Any, FivetranLogQuery, str]:
fivetran_log_query = FivetranLogQuery()
fivetran_log_query = FivetranLogQuery(
max_jobs_per_connector=self.fivetran_log_config.max_jobs_per_connector,
max_table_lineage_per_connector=self.fivetran_log_config.max_table_lineage_per_connector,
max_column_lineage_per_connector=self.fivetran_log_config.max_column_lineage_per_connector,
)
destination_platform = self.fivetran_log_config.destination_platform
# For every destination, create sqlalchemy engine,
# set db_clause to generate select queries and set fivetran_log_database class variable
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from typing import List

# Safeguards to prevent fetching massive amounts of data.
MAX_TABLE_LINEAGE_PER_CONNECTOR = 120
MAX_COLUMN_LINEAGE_PER_CONNECTOR = 1000
MAX_JOBS_PER_CONNECTOR = 500

from datahub.ingestion.source.fivetran.config import (
MAX_COLUMN_LINEAGE_PER_CONNECTOR_DEFAULT,
MAX_JOBS_PER_CONNECTOR_DEFAULT,
MAX_TABLE_LINEAGE_PER_CONNECTOR_DEFAULT,
)

"""
------------------------------------------------------------------------------------------------------------
Expand All @@ -25,9 +25,18 @@ class FivetranLogQuery:
# Note: All queries are written in Snowflake SQL.
# They will be transpiled to the target database's SQL dialect at runtime.

def __init__(self) -> None:
def __init__(
self,
max_jobs_per_connector: int = MAX_JOBS_PER_CONNECTOR_DEFAULT,
max_table_lineage_per_connector: int = MAX_TABLE_LINEAGE_PER_CONNECTOR_DEFAULT,
max_column_lineage_per_connector: int = MAX_COLUMN_LINEAGE_PER_CONNECTOR_DEFAULT,
) -> None:
# Select query db clause
self.schema_clause: str = ""
# Configurable limits
self.max_jobs_per_connector = max_jobs_per_connector
self.max_table_lineage_per_connector = max_table_lineage_per_connector
self.max_column_lineage_per_connector = max_column_lineage_per_connector

def use_database(self, db_name: str) -> str:
return f"use database {db_name}"
Expand Down Expand Up @@ -97,7 +106,7 @@ def get_sync_logs_query(
end_time,
end_message_data
FROM ranked_syncs
WHERE rn <= {MAX_JOBS_PER_CONNECTOR}
WHERE rn <= {self.max_jobs_per_connector}
AND start_time IS NOT NULL
AND end_time IS NOT NULL
ORDER BY connection_id, end_time DESC
Expand Down Expand Up @@ -130,7 +139,7 @@ def get_table_lineage_query(self, connector_ids: List[str]) -> str:
)
-- Ensure that we only get back one entry per source and destination pair.
WHERE table_combo_rn = 1
QUALIFY ROW_NUMBER() OVER (PARTITION BY connection_id ORDER BY created_at DESC) <= {MAX_TABLE_LINEAGE_PER_CONNECTOR}
QUALIFY ROW_NUMBER() OVER (PARTITION BY connection_id ORDER BY created_at DESC) <= {self.max_table_lineage_per_connector}
ORDER BY connection_id, created_at DESC
"""

Expand Down Expand Up @@ -165,6 +174,6 @@ def get_column_lineage_query(self, connector_ids: List[str]) -> str:
)
-- Ensure that we only get back one entry per (connector, source column, destination column) pair.
WHERE column_combo_rn = 1
QUALIFY ROW_NUMBER() OVER (PARTITION BY connection_id ORDER BY created_at DESC) <= {MAX_COLUMN_LINEAGE_PER_CONNECTOR}
QUALIFY ROW_NUMBER() OVER (PARTITION BY connection_id ORDER BY created_at DESC) <= {self.max_column_lineage_per_connector}
ORDER BY connection_id, created_at DESC
"""
136 changes: 136 additions & 0 deletions metadata-ingestion/tests/unit/fivetran/test_fivetran_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import pytest
from pydantic import ValidationError

from datahub.ingestion.source.fivetran.config import (
MAX_COLUMN_LINEAGE_PER_CONNECTOR_DEFAULT,
MAX_JOBS_PER_CONNECTOR_DEFAULT,
MAX_TABLE_LINEAGE_PER_CONNECTOR_DEFAULT,
FivetranLogConfig,
FivetranSourceConfig,
SnowflakeDestinationConfig,
)


class TestFivetranConfig:
"""Test cases for Fivetran configuration with configurable limits."""

def test_fivetran_log_config_default_limits(self):
"""Test that FivetranLogConfig uses default limits when not specified."""
config = FivetranLogConfig(
destination_platform="snowflake",
snowflake_destination_config=SnowflakeDestinationConfig(
account_id="test_account",
warehouse="test_warehouse",
username="test_user",
password="test_password",
database="test_database",
log_schema="test_schema",
role="test_role",
),
)

assert config.max_jobs_per_connector == MAX_JOBS_PER_CONNECTOR_DEFAULT
assert (
config.max_table_lineage_per_connector
== MAX_TABLE_LINEAGE_PER_CONNECTOR_DEFAULT
)
assert (
config.max_column_lineage_per_connector
== MAX_COLUMN_LINEAGE_PER_CONNECTOR_DEFAULT
)

def test_fivetran_log_config_custom_limits(self):
"""Test that FivetranLogConfig accepts custom limits."""
config = FivetranLogConfig(
destination_platform="snowflake",
snowflake_destination_config=SnowflakeDestinationConfig(
account_id="test_account",
warehouse="test_warehouse",
username="test_user",
password="test_password",
database="test_database",
log_schema="test_schema",
role="test_role",
),
max_jobs_per_connector=1000,
max_table_lineage_per_connector=200,
max_column_lineage_per_connector=2000,
)

assert config.max_jobs_per_connector == 1000
assert config.max_table_lineage_per_connector == 200
assert config.max_column_lineage_per_connector == 2000

def test_fivetran_log_config_invalid_limits(self):
"""Test that FivetranLogConfig rejects invalid (non-positive) limits."""
with pytest.raises(ValidationError) as excinfo:
FivetranLogConfig(
destination_platform="snowflake",
snowflake_destination_config=SnowflakeDestinationConfig(
account_id="test_account",
warehouse="test_warehouse",
username="test_user",
password="test_password",
database="test_database",
log_schema="test_schema",
role="test_role",
),
max_jobs_per_connector=0,
)
assert "greater than 0" in str(excinfo.value)

with pytest.raises(ValidationError) as excinfo:
FivetranLogConfig(
destination_platform="snowflake",
snowflake_destination_config=SnowflakeDestinationConfig(
account_id="test_account",
warehouse="test_warehouse",
username="test_user",
password="test_password",
database="test_database",
log_schema="test_schema",
role="test_role",
),
max_table_lineage_per_connector=-1,
)
assert "greater than 0" in str(excinfo.value)

with pytest.raises(ValidationError) as excinfo:
FivetranLogConfig(
destination_platform="snowflake",
snowflake_destination_config=SnowflakeDestinationConfig(
account_id="test_account",
warehouse="test_warehouse",
username="test_user",
password="test_password",
database="test_database",
log_schema="test_schema",
role="test_role",
),
max_column_lineage_per_connector=-100,
)
assert "greater than 0" in str(excinfo.value)

def test_fivetran_source_config_with_log_limits(self):
"""Test that FivetranSourceConfig properly contains FivetranLogConfig with limits."""
config = FivetranSourceConfig(
fivetran_log_config=FivetranLogConfig(
destination_platform="snowflake",
snowflake_destination_config=SnowflakeDestinationConfig(
account_id="test_account",
warehouse="test_warehouse",
username="test_user",
password="test_password",
database="test_database",
log_schema="test_schema",
role="test_role",
),
max_jobs_per_connector=750,
max_table_lineage_per_connector=150,
max_column_lineage_per_connector=1500,
)
)

assert config.fivetran_log_config.max_jobs_per_connector == 750
assert config.fivetran_log_config.max_table_lineage_per_connector == 150
assert config.fivetran_log_config.max_column_lineage_per_connector == 1500
Loading
Loading