diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index ee99dbae969a8c..819c379d11408f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -96,6 +96,7 @@ StatusClass, SubTypesClass, TagAssociationClass, + TimeStampClass, UpstreamLineageClass, ViewPropertiesClass, ) @@ -1799,6 +1800,22 @@ def _create_dataset_properties_aspect( ) dbt_properties.externalUrl = self.get_external_url(node) + # Set lastModified from max_loaded_at for sources, or from model_performances for models + if node.max_loaded_at is not None: + # For sources: use max_loaded_at from sources.json + timestamp_millis = datetime_to_ts_millis(node.max_loaded_at) + dbt_properties.lastModified = TimeStampClass(timestamp_millis) + elif node.model_performances: + # For models: use the most recent successful run end_time + # Get the latest successful run (most recent end_time) + successful_runs = [ + perf for perf in node.model_performances if perf.is_success() + ] + if successful_runs: + latest_run = max(successful_runs, key=lambda p: p.end_time) + timestamp_millis = datetime_to_ts_millis(latest_run.end_time) + dbt_properties.lastModified = TimeStampClass(timestamp_millis) + return dbt_properties @abstractmethod @@ -1968,13 +1985,27 @@ def get_schema_metadata( canonical_schema.append(field) + # Set lastModified from max_loaded_at for sources, or from model_performances for models last_modified = None if node.max_loaded_at is not None: + # For sources: use max_loaded_at from sources.json actor = mce_builder.make_user_urn("dbt_executor") last_modified = AuditStamp( time=datetime_to_ts_millis(node.max_loaded_at), actor=actor, ) + elif node.model_performances: + # For models: use the most recent successful run end_time + successful_runs = [ + perf for perf in node.model_performances if perf.is_success() + ] + if successful_runs: + latest_run = max(successful_runs, key=lambda p: p.end_time) + actor = mce_builder.make_user_urn("dbt_executor") + last_modified = AuditStamp( + time=datetime_to_ts_millis(latest_run.end_time), + actor=actor, + ) return SchemaMetadata( schemaName=node.dbt_name, @@ -1982,8 +2013,8 @@ def get_schema_metadata( version=0, hash="", platformSchema=MySqlDDL(tableSchema=""), - lastModified=last_modified, fields=canonical_schema, + lastModified=last_modified, ) def _aggregate_owners( diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py index d2a794fb02e571..ddf564910bb353 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py @@ -249,10 +249,22 @@ def extract_dbt_entities( tags = manifest_node.get("tags", []) tags = [tag_prefix + tag for tag in tags] - max_loaded_at_str = sources_by_id.get(key, {}).get("max_loaded_at") + max_loaded_at_str = None + source_entry = sources_by_id.get(key) + if source_entry is not None: + max_loaded_at_str = source_entry.get("max_loaded_at") max_loaded_at = None if max_loaded_at_str: max_loaded_at = parse_dbt_timestamp(max_loaded_at_str) + elif manifest_node.get("resource_type") == "source": + # Log debug info for sources missing from sources.json or missing max_loaded_at + if source_entry is None: + logger.debug(f"Source {key} not found in sources.json. ") + else: + logger.debug( + f"Source {key} found in sources.json but missing max_loaded_at field. " + f"Source entry keys: {list(source_entry.keys())}" + ) test_info = None if manifest_node.get("resource_type") == "test": @@ -555,9 +567,12 @@ def loadManifestAndCatalog( dbt_sources_json = self.load_file_as_json( self.config.sources_path, self.config.aws_connection ) - sources_results = dbt_sources_json["results"] + sources_results = dbt_sources_json.get("results", []) + logger.debug( + f"Loaded {len(sources_results)} source freshness results from sources.json" + ) else: - sources_results = {} + sources_results = [] manifest_schema = dbt_manifest_json["metadata"].get("dbt_schema_version") manifest_version = dbt_manifest_json["metadata"].get("dbt_version") diff --git a/metadata-ingestion/tests/unit/dbt/test_dbt_source.py b/metadata-ingestion/tests/unit/dbt/test_dbt_source.py index 5aee41d44ff57f..4e50fe70871631 100644 --- a/metadata-ingestion/tests/unit/dbt/test_dbt_source.py +++ b/metadata-ingestion/tests/unit/dbt/test_dbt_source.py @@ -1,4 +1,4 @@ -from datetime import timedelta +from datetime import datetime, timedelta, timezone from typing import Any, Dict, List, TypedDict, Union from unittest import mock @@ -25,6 +25,7 @@ OwnershipSourceClass, OwnershipSourceTypeClass, OwnershipTypeClass, + TimeStampClass, ) from datahub.testing.doctest import assert_doctest @@ -815,3 +816,183 @@ def test_dbt_cloud_source_description_fallback() -> None: assert ( parsed_node.description == "This is the schema-level description for my_schema" ) + + +def test_max_loaded_at_in_dataset_properties() -> None: + """ + Test that max_loaded_at is correctly set in DatasetProperties.lastModified. + """ + ctx = PipelineContext(run_id="test-run-id", pipeline_name="dbt-source") + config = DBTCoreConfig(**create_base_dbt_config()) + source = DBTCoreSource(config, ctx) + + # Create a source node with max_loaded_at set + test_timestamp = datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc) + source_node = DBTNode( + name="test_source", + database="test_db", + schema="test_schema", + alias=None, + comment="", + description="Test source", + language=None, + raw_code=None, + dbt_adapter="postgres", + dbt_name="source.package.test_source", + dbt_file_path=None, + dbt_package_name="package", + node_type="source", + materialization=None, + max_loaded_at=test_timestamp, + catalog_type=None, + missing_from_catalog=False, + owner=None, + compiled_code=None, + ) + + # Test that DatasetProperties.lastModified is set + dataset_properties = source._create_dataset_properties_aspect(source_node, {}) + assert dataset_properties.lastModified is not None + assert isinstance(dataset_properties.lastModified, TimeStampClass) + # Verify the timestamp is correct (convert to milliseconds for comparison) + expected_timestamp_ms = int(test_timestamp.timestamp() * 1000) + assert dataset_properties.lastModified.time == expected_timestamp_ms + + # Test that SchemaMetadata.lastModified is also set from max_loaded_at + schema_metadata = source.get_schema_metadata(source.report, source_node, "dbt") + assert schema_metadata.lastModified is not None + assert schema_metadata.lastModified.time == expected_timestamp_ms + + +def test_max_loaded_at_none_in_dataset_properties() -> None: + """ + Test that when max_loaded_at is None, DatasetProperties.lastModified is also None. + """ + ctx = PipelineContext(run_id="test-run-id", pipeline_name="dbt-source") + config = DBTCoreConfig(**create_base_dbt_config()) + source = DBTCoreSource(config, ctx) + + # Create a source node without max_loaded_at + source_node = DBTNode( + name="test_source", + database="test_db", + schema="test_schema", + alias=None, + comment="", + description="Test source", + language=None, + raw_code=None, + dbt_adapter="postgres", + dbt_name="source.package.test_source", + dbt_file_path=None, + dbt_package_name="package", + node_type="source", + materialization=None, + max_loaded_at=None, + catalog_type=None, + missing_from_catalog=False, + owner=None, + compiled_code=None, + ) + + # Test that DatasetProperties.lastModified is None + dataset_properties = source._create_dataset_properties_aspect(source_node, {}) + assert dataset_properties.lastModified is None + + +def test_model_performance_in_dataset_properties() -> None: + """ + Test that model_performances end_time is correctly set in DatasetProperties.lastModified + for models. + """ + ctx = PipelineContext(run_id="test-run-id", pipeline_name="dbt-source") + config = DBTCoreConfig(**create_base_dbt_config()) + source = DBTCoreSource(config, ctx) + + # Create a model node with model_performances + from datahub.ingestion.source.dbt.dbt_common import DBTModelPerformance + + test_end_time_1 = datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc) + test_end_time_2 = datetime( + 2024, 1, 20, 14, 45, 0, tzinfo=timezone.utc + ) # More recent + + model_node = DBTNode( + name="test_model", + database="test_db", + schema="test_schema", + alias=None, + comment="", + description="Test model", + language="sql", + raw_code=None, + dbt_adapter="postgres", + dbt_name="model.package.test_model", + dbt_file_path=None, + dbt_package_name="package", + node_type="model", + materialization="table", + max_loaded_at=None, + catalog_type=None, + missing_from_catalog=False, + owner=None, + compiled_code=None, + model_performances=[ + DBTModelPerformance( + run_id="run1", + status="success", + start_time=datetime(2024, 1, 15, 10, 0, 0, tzinfo=timezone.utc), + end_time=test_end_time_1, + ), + DBTModelPerformance( + run_id="run2", + status="success", + start_time=datetime(2024, 1, 20, 14, 0, 0, tzinfo=timezone.utc), + end_time=test_end_time_2, + ), + ], + ) + + # Test that DatasetProperties.lastModified is set from the most recent successful run + dataset_properties = source._create_dataset_properties_aspect(model_node, {}) + assert dataset_properties.lastModified is not None + assert isinstance(dataset_properties.lastModified, TimeStampClass) + # Verify it uses the most recent end_time (test_end_time_2) + expected_timestamp_ms = int(test_end_time_2.timestamp() * 1000) + assert dataset_properties.lastModified.time == expected_timestamp_ms + + +def test_model_performance_none_in_dataset_properties() -> None: + """ + Test that when model_performances is empty, DatasetProperties.lastModified is None. + """ + ctx = PipelineContext(run_id="test-run-id", pipeline_name="dbt-source") + config = DBTCoreConfig(**create_base_dbt_config()) + source = DBTCoreSource(config, ctx) + + # Create a model node without model_performances + model_node = DBTNode( + name="test_model", + database="test_db", + schema="test_schema", + alias=None, + comment="", + description="Test model", + language="sql", + raw_code=None, + dbt_adapter="postgres", + dbt_name="model.package.test_model", + dbt_file_path=None, + dbt_package_name="package", + node_type="model", + materialization="table", + max_loaded_at=None, + catalog_type=None, + missing_from_catalog=False, + owner=None, + compiled_code=None, + ) + + # Test that DatasetProperties.lastModified is None + dataset_properties = source._create_dataset_properties_aspect(model_node, {}) + assert dataset_properties.lastModified is None