Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
StatusClass,
SubTypesClass,
TagAssociationClass,
TimeStampClass,
UpstreamLineageClass,
ViewPropertiesClass,
)
Expand Down Expand Up @@ -1799,6 +1800,11 @@ def _create_dataset_properties_aspect(
)
dbt_properties.externalUrl = self.get_external_url(node)

# Set lastModified from max_loaded_at if available
if node.max_loaded_at is not None:
timestamp_millis = datetime_to_ts_millis(node.max_loaded_at)
dbt_properties.lastModified = TimeStampClass(timestamp_millis)

return dbt_properties

@abstractmethod
Expand Down Expand Up @@ -1968,21 +1974,12 @@ def get_schema_metadata(

canonical_schema.append(field)

last_modified = None
if node.max_loaded_at is not None:
actor = mce_builder.make_user_urn("dbt_executor")
last_modified = AuditStamp(
time=datetime_to_ts_millis(node.max_loaded_at),
actor=actor,
)

return SchemaMetadata(
schemaName=node.dbt_name,
platform=mce_builder.make_data_platform_urn(platform),
version=0,
hash="",
platformSchema=MySqlDDL(tableSchema=""),
lastModified=last_modified,
fields=canonical_schema,
)

Expand Down
84 changes: 83 additions & 1 deletion metadata-ingestion/tests/unit/dbt/test_dbt_source.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import timedelta
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, TypedDict, Union
from unittest import mock

Expand All @@ -25,6 +25,7 @@
OwnershipSourceClass,
OwnershipSourceTypeClass,
OwnershipTypeClass,
TimeStampClass,
)
from datahub.testing.doctest import assert_doctest

Expand Down Expand Up @@ -815,3 +816,84 @@ def test_dbt_cloud_source_description_fallback() -> None:
assert (
parsed_node.description == "This is the schema-level description for my_schema"
)


def test_max_loaded_at_in_dataset_properties() -> None:
"""
Test that max_loaded_at is correctly set in DatasetProperties.lastModified.
"""
ctx = PipelineContext(run_id="test-run-id", pipeline_name="dbt-source")
config = DBTCoreConfig(**create_base_dbt_config())
source = DBTCoreSource(config, ctx)

# Create a source node with max_loaded_at set
test_timestamp = datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc)
source_node = DBTNode(
name="test_source",
database="test_db",
schema="test_schema",
alias=None,
comment="",
description="Test source",
language=None,
raw_code=None,
dbt_adapter="postgres",
dbt_name="source.package.test_source",
dbt_file_path=None,
dbt_package_name="package",
node_type="source",
materialization=None,
max_loaded_at=test_timestamp,
catalog_type=None,
missing_from_catalog=False,
owner=None,
compiled_code=None,
)

# Test that DatasetProperties.lastModified is set
dataset_properties = source._create_dataset_properties_aspect(source_node, {})
assert dataset_properties.lastModified is not None
assert isinstance(dataset_properties.lastModified, TimeStampClass)
# Verify the timestamp is correct (convert to milliseconds for comparison)
expected_timestamp_ms = int(test_timestamp.timestamp() * 1000)
assert dataset_properties.lastModified.time == expected_timestamp_ms

# Test that SchemaMetadata.lastModified is NOT set
schema_metadata = source.get_schema_metadata(source.report, source_node, "dbt")
assert schema_metadata.lastModified is None


def test_max_loaded_at_none_in_dataset_properties() -> None:
"""
Test that when max_loaded_at is None, DatasetProperties.lastModified is also None.
"""
ctx = PipelineContext(run_id="test-run-id", pipeline_name="dbt-source")
config = DBTCoreConfig(**create_base_dbt_config())
source = DBTCoreSource(config, ctx)

# Create a source node without max_loaded_at
source_node = DBTNode(
name="test_source",
database="test_db",
schema="test_schema",
alias=None,
comment="",
description="Test source",
language=None,
raw_code=None,
dbt_adapter="postgres",
dbt_name="source.package.test_source",
dbt_file_path=None,
dbt_package_name="package",
node_type="source",
materialization=None,
max_loaded_at=None,
catalog_type=None,
missing_from_catalog=False,
owner=None,
compiled_code=None,
)

# Test that DatasetProperties.lastModified is None
dataset_properties = source._create_dataset_properties_aspect(source_node, {})
assert dataset_properties.lastModified is None
Loading