diff --git a/metadata-ingestion/docs/sources/fivetran/fivetran_pre.md b/metadata-ingestion/docs/sources/fivetran/fivetran_pre.md index 2e04c9101939ca..8f20b34c1d270b 100644 --- a/metadata-ingestion/docs/sources/fivetran/fivetran_pre.md +++ b/metadata-ingestion/docs/sources/fivetran/fivetran_pre.md @@ -37,14 +37,27 @@ Works only for: ### Ingestion Limits -To prevent excessive data ingestion, the following limits apply per connector: +To prevent excessive data ingestion, the following configurable limits apply per connector: -- **Sync History**: Maximum of 500 sync runs per connector (controlled by `history_sync_lookback_period`) -- **Table Lineage**: Maximum of 120 table lineage entries per connector -- **Column Lineage**: Maximum of 1000 column lineage entries per connector +- **Sync History**: Maximum of 500 sync runs per connector (default: 500, configurable via `fivetran_log_config.max_jobs_per_connector`) +- **Table Lineage**: Maximum of 120 table lineage entries per connector (default: 120, configurable via `fivetran_log_config.max_table_lineage_per_connector`) +- **Column Lineage**: Maximum of 1000 column lineage entries per connector (default: 1000, configurable via `fivetran_log_config.max_column_lineage_per_connector`) When these limits are exceeded, only the most recent entries are ingested. Warnings will be logged during ingestion to notify you when truncation occurs. +These limits act as safety nets to prevent excessive data ingestion. You can increase them cautiously if you need to ingest more historical data or have connectors with many tables/columns. Example configuration: + +```yaml +source: + type: fivetran + config: + fivetran_log_config: + # ... other config ... + max_jobs_per_connector: 1000 # Increase sync history limit + max_table_lineage_per_connector: 500 # Increase table lineage limit + max_column_lineage_per_connector: 5000 # Increase column lineage limit +``` + ## Snowflake destination Configuration Guide 1. If your fivetran platform connector destination is snowflake, you need to provide user details and its role with correct privileges in order to fetch metadata. diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py index 0f54a84473f915..f04a33001eafaa 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py @@ -35,6 +35,11 @@ logger = logging.getLogger(__name__) +# Default safeguards to prevent fetching massive amounts of data. +MAX_TABLE_LINEAGE_PER_CONNECTOR_DEFAULT = 120 +MAX_COLUMN_LINEAGE_PER_CONNECTOR_DEFAULT = 1000 +MAX_JOBS_PER_CONNECTOR_DEFAULT = 500 + class Constant: """ @@ -142,6 +147,24 @@ class FivetranLogConfig(ConfigModel): "destination_config", "snowflake_destination_config" ) + max_jobs_per_connector: int = pydantic.Field( + default=MAX_JOBS_PER_CONNECTOR_DEFAULT, + gt=0, + description="Maximum number of sync jobs to retrieve per connector. This acts as a safety net to prevent excessive data ingestion. Increase cautiously if you need to see more historical sync runs.", + ) + + max_table_lineage_per_connector: int = pydantic.Field( + default=MAX_TABLE_LINEAGE_PER_CONNECTOR_DEFAULT, + gt=0, + description="Maximum number of table lineage entries to retrieve per connector. This acts as a safety net to prevent excessive data ingestion. When this limit is exceeded, only the most recent entries are ingested.", + ) + + max_column_lineage_per_connector: int = pydantic.Field( + default=MAX_COLUMN_LINEAGE_PER_CONNECTOR_DEFAULT, + gt=0, + description="Maximum number of column lineage entries to retrieve per connector. This acts as a safety net to prevent excessive data ingestion. When this limit is exceeded, only the most recent entries are ingested.", + ) + @model_validator(mode="after") def validate_destination_platform_and_config(self) -> "FivetranLogConfig": if self.destination_platform == "snowflake": diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index 736327f4afc26c..4e80ea37bc1f79 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -33,10 +33,6 @@ ) from datahub.ingestion.source.fivetran.data_classes import Connector, Job from datahub.ingestion.source.fivetran.fivetran_log_api import FivetranLogAPI -from datahub.ingestion.source.fivetran.fivetran_query import ( - MAX_JOBS_PER_CONNECTOR, - MAX_TABLE_LINEAGE_PER_CONNECTOR, -) from datahub.ingestion.source.fivetran.fivetran_rest_api import FivetranAPIClient from datahub.ingestion.source.fivetran.response_models import FivetranConnectionDetails from datahub.ingestion.source.state.stale_entity_removal_handler import ( @@ -133,11 +129,14 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, s if destination_details.database is None: destination_details.database = self.audit_log.fivetran_log_database - if len(connector.lineage) >= MAX_TABLE_LINEAGE_PER_CONNECTOR: + if ( + len(connector.lineage) + >= self.config.fivetran_log_config.max_table_lineage_per_connector + ): self.report.warning( title="Table lineage truncated", - message=f"The connector had more than {MAX_TABLE_LINEAGE_PER_CONNECTOR} table lineage entries. " - f"Only the most recent {MAX_TABLE_LINEAGE_PER_CONNECTOR} entries were ingested.", + message=f"The connector had more than {self.config.fivetran_log_config.max_table_lineage_per_connector} table lineage entries. " + f"Only the most recent {self.config.fivetran_log_config.max_table_lineage_per_connector} entries were ingested.", context=f"{connector.connector_name} (connector_id: {connector.connector_id})", ) @@ -475,11 +474,14 @@ def _get_connector_workunits( yield datajob # Map Fivetran's job/sync history entity with Datahub's data process entity - if len(connector.jobs) >= MAX_JOBS_PER_CONNECTOR: + if ( + len(connector.jobs) + >= self.config.fivetran_log_config.max_jobs_per_connector + ): self.report.warning( title="Not all sync history was captured", - message=f"The connector had more than {MAX_JOBS_PER_CONNECTOR} sync runs in the past {self.config.history_sync_lookback_period} days. " - f"Only the most recent {MAX_JOBS_PER_CONNECTOR} syncs were ingested.", + message=f"The connector had more than {self.config.fivetran_log_config.max_jobs_per_connector} sync runs in the past {self.config.history_sync_lookback_period} days. " + f"Only the most recent {self.config.fivetran_log_config.max_jobs_per_connector} syncs were ingested.", context=f"{connector.connector_name} (connector_id: {connector.connector_id})", ) for job in connector.jobs: diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py index 6f5530cee2c3da..2604d2880a53e3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py @@ -37,7 +37,11 @@ def __init__(self, fivetran_log_config: FivetranLogConfig) -> None: def _initialize_fivetran_variables( self, ) -> Tuple[Any, FivetranLogQuery, str]: - fivetran_log_query = FivetranLogQuery() + fivetran_log_query = FivetranLogQuery( + max_jobs_per_connector=self.fivetran_log_config.max_jobs_per_connector, + max_table_lineage_per_connector=self.fivetran_log_config.max_table_lineage_per_connector, + max_column_lineage_per_connector=self.fivetran_log_config.max_column_lineage_per_connector, + ) destination_platform = self.fivetran_log_config.destination_platform # For every destination, create sqlalchemy engine, # set db_clause to generate select queries and set fivetran_log_database class variable diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py index 89137f7c854afd..7235930dae42d8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py @@ -1,10 +1,10 @@ from typing import List -# Safeguards to prevent fetching massive amounts of data. -MAX_TABLE_LINEAGE_PER_CONNECTOR = 120 -MAX_COLUMN_LINEAGE_PER_CONNECTOR = 1000 -MAX_JOBS_PER_CONNECTOR = 500 - +from datahub.ingestion.source.fivetran.config import ( + MAX_COLUMN_LINEAGE_PER_CONNECTOR_DEFAULT, + MAX_JOBS_PER_CONNECTOR_DEFAULT, + MAX_TABLE_LINEAGE_PER_CONNECTOR_DEFAULT, +) """ ------------------------------------------------------------------------------------------------------------ @@ -25,9 +25,18 @@ class FivetranLogQuery: # Note: All queries are written in Snowflake SQL. # They will be transpiled to the target database's SQL dialect at runtime. - def __init__(self) -> None: + def __init__( + self, + max_jobs_per_connector: int = MAX_JOBS_PER_CONNECTOR_DEFAULT, + max_table_lineage_per_connector: int = MAX_TABLE_LINEAGE_PER_CONNECTOR_DEFAULT, + max_column_lineage_per_connector: int = MAX_COLUMN_LINEAGE_PER_CONNECTOR_DEFAULT, + ) -> None: # Select query db clause self.schema_clause: str = "" + # Configurable limits + self.max_jobs_per_connector = max_jobs_per_connector + self.max_table_lineage_per_connector = max_table_lineage_per_connector + self.max_column_lineage_per_connector = max_column_lineage_per_connector def use_database(self, db_name: str) -> str: return f"use database {db_name}" @@ -97,7 +106,7 @@ def get_sync_logs_query( end_time, end_message_data FROM ranked_syncs -WHERE rn <= {MAX_JOBS_PER_CONNECTOR} +WHERE rn <= {self.max_jobs_per_connector} AND start_time IS NOT NULL AND end_time IS NOT NULL ORDER BY connection_id, end_time DESC @@ -130,7 +139,7 @@ def get_table_lineage_query(self, connector_ids: List[str]) -> str: ) -- Ensure that we only get back one entry per source and destination pair. WHERE table_combo_rn = 1 -QUALIFY ROW_NUMBER() OVER (PARTITION BY connection_id ORDER BY created_at DESC) <= {MAX_TABLE_LINEAGE_PER_CONNECTOR} +QUALIFY ROW_NUMBER() OVER (PARTITION BY connection_id ORDER BY created_at DESC) <= {self.max_table_lineage_per_connector} ORDER BY connection_id, created_at DESC """ @@ -165,6 +174,6 @@ def get_column_lineage_query(self, connector_ids: List[str]) -> str: ) -- Ensure that we only get back one entry per (connector, source column, destination column) pair. WHERE column_combo_rn = 1 -QUALIFY ROW_NUMBER() OVER (PARTITION BY connection_id ORDER BY created_at DESC) <= {MAX_COLUMN_LINEAGE_PER_CONNECTOR} +QUALIFY ROW_NUMBER() OVER (PARTITION BY connection_id ORDER BY created_at DESC) <= {self.max_column_lineage_per_connector} ORDER BY connection_id, created_at DESC """ diff --git a/metadata-ingestion/tests/unit/fivetran/test_fivetran_config.py b/metadata-ingestion/tests/unit/fivetran/test_fivetran_config.py new file mode 100644 index 00000000000000..7d8eae2c09f70c --- /dev/null +++ b/metadata-ingestion/tests/unit/fivetran/test_fivetran_config.py @@ -0,0 +1,136 @@ +import pytest +from pydantic import ValidationError + +from datahub.ingestion.source.fivetran.config import ( + MAX_COLUMN_LINEAGE_PER_CONNECTOR_DEFAULT, + MAX_JOBS_PER_CONNECTOR_DEFAULT, + MAX_TABLE_LINEAGE_PER_CONNECTOR_DEFAULT, + FivetranLogConfig, + FivetranSourceConfig, + SnowflakeDestinationConfig, +) + + +class TestFivetranConfig: + """Test cases for Fivetran configuration with configurable limits.""" + + def test_fivetran_log_config_default_limits(self): + """Test that FivetranLogConfig uses default limits when not specified.""" + config = FivetranLogConfig( + destination_platform="snowflake", + snowflake_destination_config=SnowflakeDestinationConfig( + account_id="test_account", + warehouse="test_warehouse", + username="test_user", + password="test_password", + database="test_database", + log_schema="test_schema", + role="test_role", + ), + ) + + assert config.max_jobs_per_connector == MAX_JOBS_PER_CONNECTOR_DEFAULT + assert ( + config.max_table_lineage_per_connector + == MAX_TABLE_LINEAGE_PER_CONNECTOR_DEFAULT + ) + assert ( + config.max_column_lineage_per_connector + == MAX_COLUMN_LINEAGE_PER_CONNECTOR_DEFAULT + ) + + def test_fivetran_log_config_custom_limits(self): + """Test that FivetranLogConfig accepts custom limits.""" + config = FivetranLogConfig( + destination_platform="snowflake", + snowflake_destination_config=SnowflakeDestinationConfig( + account_id="test_account", + warehouse="test_warehouse", + username="test_user", + password="test_password", + database="test_database", + log_schema="test_schema", + role="test_role", + ), + max_jobs_per_connector=1000, + max_table_lineage_per_connector=200, + max_column_lineage_per_connector=2000, + ) + + assert config.max_jobs_per_connector == 1000 + assert config.max_table_lineage_per_connector == 200 + assert config.max_column_lineage_per_connector == 2000 + + def test_fivetran_log_config_invalid_limits(self): + """Test that FivetranLogConfig rejects invalid (non-positive) limits.""" + with pytest.raises(ValidationError) as excinfo: + FivetranLogConfig( + destination_platform="snowflake", + snowflake_destination_config=SnowflakeDestinationConfig( + account_id="test_account", + warehouse="test_warehouse", + username="test_user", + password="test_password", + database="test_database", + log_schema="test_schema", + role="test_role", + ), + max_jobs_per_connector=0, + ) + assert "greater than 0" in str(excinfo.value) + + with pytest.raises(ValidationError) as excinfo: + FivetranLogConfig( + destination_platform="snowflake", + snowflake_destination_config=SnowflakeDestinationConfig( + account_id="test_account", + warehouse="test_warehouse", + username="test_user", + password="test_password", + database="test_database", + log_schema="test_schema", + role="test_role", + ), + max_table_lineage_per_connector=-1, + ) + assert "greater than 0" in str(excinfo.value) + + with pytest.raises(ValidationError) as excinfo: + FivetranLogConfig( + destination_platform="snowflake", + snowflake_destination_config=SnowflakeDestinationConfig( + account_id="test_account", + warehouse="test_warehouse", + username="test_user", + password="test_password", + database="test_database", + log_schema="test_schema", + role="test_role", + ), + max_column_lineage_per_connector=-100, + ) + assert "greater than 0" in str(excinfo.value) + + def test_fivetran_source_config_with_log_limits(self): + """Test that FivetranSourceConfig properly contains FivetranLogConfig with limits.""" + config = FivetranSourceConfig( + fivetran_log_config=FivetranLogConfig( + destination_platform="snowflake", + snowflake_destination_config=SnowflakeDestinationConfig( + account_id="test_account", + warehouse="test_warehouse", + username="test_user", + password="test_password", + database="test_database", + log_schema="test_schema", + role="test_role", + ), + max_jobs_per_connector=750, + max_table_lineage_per_connector=150, + max_column_lineage_per_connector=1500, + ) + ) + + assert config.fivetran_log_config.max_jobs_per_connector == 750 + assert config.fivetran_log_config.max_table_lineage_per_connector == 150 + assert config.fivetran_log_config.max_column_lineage_per_connector == 1500 diff --git a/metadata-ingestion/tests/unit/fivetran/test_fivetran_query.py b/metadata-ingestion/tests/unit/fivetran/test_fivetran_query.py new file mode 100644 index 00000000000000..8576ef187116b3 --- /dev/null +++ b/metadata-ingestion/tests/unit/fivetran/test_fivetran_query.py @@ -0,0 +1,79 @@ +from datahub.ingestion.source.fivetran.config import ( + MAX_COLUMN_LINEAGE_PER_CONNECTOR_DEFAULT, + MAX_JOBS_PER_CONNECTOR_DEFAULT, + MAX_TABLE_LINEAGE_PER_CONNECTOR_DEFAULT, +) +from datahub.ingestion.source.fivetran.fivetran_query import FivetranLogQuery + + +class TestFivetranLogQuery: + """Test cases for FivetranLogQuery with configurable limits.""" + + def test_fivetran_log_query_default_limits(self): + """Test that FivetranLogQuery uses default limits when initialized with defaults.""" + query = FivetranLogQuery() + + assert query.max_jobs_per_connector == MAX_JOBS_PER_CONNECTOR_DEFAULT + assert ( + query.max_table_lineage_per_connector + == MAX_TABLE_LINEAGE_PER_CONNECTOR_DEFAULT + ) + assert ( + query.max_column_lineage_per_connector + == MAX_COLUMN_LINEAGE_PER_CONNECTOR_DEFAULT + ) + + def test_fivetran_log_query_custom_limits(self): + """Test that FivetranLogQuery uses custom limits when provided.""" + query = FivetranLogQuery( + max_jobs_per_connector=1234, + max_table_lineage_per_connector=567, + max_column_lineage_per_connector=8901, + ) + + assert query.max_jobs_per_connector == 1234 + assert query.max_table_lineage_per_connector == 567 + assert query.max_column_lineage_per_connector == 8901 + + def test_sync_logs_query_uses_configured_limit(self): + """Test that get_sync_logs_query uses the configured max_jobs_per_connector limit.""" + custom_limit = 999 + query = FivetranLogQuery(max_jobs_per_connector=custom_limit) + query.set_schema("test_schema") + + sql = query.get_sync_logs_query( + syncs_interval=7, + connector_ids=["connector1", "connector2"], + ) + + assert f"WHERE rn <= {custom_limit}" in sql + + def test_table_lineage_query_uses_configured_limit(self): + """Test that get_table_lineage_query uses the configured max_table_lineage_per_connector limit.""" + custom_limit = 333 + query = FivetranLogQuery(max_table_lineage_per_connector=custom_limit) + query.set_schema("test_schema") + + sql = query.get_table_lineage_query( + connector_ids=["connector1", "connector2"], + ) + + assert ( + f"QUALIFY ROW_NUMBER() OVER (PARTITION BY connection_id ORDER BY created_at DESC) <= {custom_limit}" + in sql + ) + + def test_column_lineage_query_uses_configured_limit(self): + """Test that get_column_lineage_query uses the configured max_column_lineage_per_connector limit.""" + custom_limit = 5555 + query = FivetranLogQuery(max_column_lineage_per_connector=custom_limit) + query.set_schema("test_schema") + + sql = query.get_column_lineage_query( + connector_ids=["connector1", "connector2"], + ) + + assert ( + f"QUALIFY ROW_NUMBER() OVER (PARTITION BY connection_id ORDER BY created_at DESC) <= {custom_limit}" + in sql + )