Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/fs/fs_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ def file_status(self, path: str) -> FileInfo:
def list(self, path: str) -> Iterable[FileInfo]:
pass

@abstractmethod
def write(self, path: str, content: str, **kwargs: Any) -> None:
"""Write content to a file at the given path."""
pass

@abstractmethod
def exists(self, path: str) -> bool:
"""Check if a file exists at the given path."""
pass


def get_path_schema(path: str) -> str:
scheme = parse.urlparse(path).scheme
Expand Down
12 changes: 12 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/fs/http_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,15 @@ def file_status(self, path: str) -> FileInfo:
def list(self, path: str) -> Iterable[FileInfo]:
status = self.file_status(path)
return [status]

def write(self, path: str, content: str, **kwargs: Any) -> None:
"""HTTP file system does not support writing."""
raise NotImplementedError("HTTP file system does not support write operations")

def exists(self, path: str) -> bool:
"""Check if an HTTP resource exists."""
try:
head = requests.head(path)
return head.ok
except Exception:
return False
14 changes: 14 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/fs/local_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,17 @@ def file_status(self, path: str) -> FileInfo:
return FileInfo(path, os.path.getsize(path), is_file=True)
else:
return FileInfo(path, 0, is_file=False)

def write(self, path: str, content: str, **kwargs: Any) -> None:
"""Write content to a local file."""
# Create parent directories if they don't exist
p = pathlib.Path(path)
p.parent.mkdir(parents=True, exist_ok=True)

# Write the content
with p.open("w", **kwargs) as f:
f.write(content)

def exists(self, path: str) -> bool:
"""Check if a file exists locally."""
return pathlib.Path(path).exists()
29 changes: 29 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/fs/s3_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,32 @@ def file_status(self, path: str) -> FileInfo:
def list(self, path: str) -> Iterable[FileInfo]:
s3_path = parse_s3_path(path)
return S3ListIterator(self.s3, s3_path.bucket, s3_path.key)

def write(self, path: str, content: str, **kwargs: Any) -> None:
"""Write content to S3."""
s3_path = parse_s3_path(path)

# Convert string content to bytes for S3
content_bytes = content.encode("utf-8")

# Upload to S3
response = self.s3.put_object(
Bucket=s3_path.bucket, Key=s3_path.key, Body=content_bytes, **kwargs
)
assert_ok_status(response)

def exists(self, path: str) -> bool:
"""Check if an object exists in S3."""
s3_path = parse_s3_path(path)
try:
self.s3.head_object(Bucket=s3_path.bucket, Key=s3_path.key)
return True
except Exception as e:
if (
hasattr(e, "response")
and e.response["ResponseMetadata"]["HTTPStatusCode"] == 404
):
return False
else:
# Re-raise other exceptions (access denied, etc.)
raise e
119 changes: 82 additions & 37 deletions metadata-ingestion/src/datahub/ingestion/sink/file.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,36 @@
import json
import logging
import pathlib
from typing import Iterable, Union
from typing import Iterable, List, Union

from datahub.configuration.common import ConfigModel
from datahub.emitter.aspect import JSON_CONTENT_TYPE, JSON_PATCH_CONTENT_TYPE
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import RecordEnvelope
from datahub.ingestion.api.sink import Sink, SinkReport, WriteCallback
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeEvent,
MetadataChangeProposal,
from datahub.ingestion.fs.fs_base import get_path_schema
from datahub.ingestion.fs.fs_registry import fs_registry
from datahub.metadata.schema_classes import (
MetadataChangeEventClass,
MetadataChangeProposalClass,
UsageAggregationClass,
)
from datahub.metadata.com.linkedin.pegasus2avro.usage import UsageAggregation

logger = logging.getLogger(__name__)


def _to_obj_for_file(
obj: Union[
MetadataChangeEvent,
MetadataChangeProposal,
MetadataChangeEventClass,
MetadataChangeProposalClass,
MetadataChangeProposalWrapper,
UsageAggregation,
UsageAggregationClass,
],
simplified_structure: bool = True,
) -> dict:
if isinstance(obj, MetadataChangeProposalWrapper):
return obj.to_obj(simplified_structure=simplified_structure)
elif isinstance(obj, MetadataChangeProposal) and simplified_structure:
elif isinstance(obj, MetadataChangeProposalClass) and simplified_structure:
serialized = obj.to_obj()
if serialized.get("aspect") and serialized["aspect"].get("contentType") in [
JSON_CONTENT_TYPE,
Expand All @@ -46,18 +48,28 @@ class FileSinkConfig(ConfigModel):


class FileSink(Sink[FileSinkConfig, SinkReport]):
"""
File sink that supports writing to various backends (local, S3, etc.)
using the pluggable file system architecture.
"""

def __post_init__(self) -> None:
fpath = pathlib.Path(self.config.filename)
self.file = fpath.open("w")
self.file.write("[\n")
self.wrote_something = False
self.filename = self.config.filename

# Determine file system based on path schema
schema = get_path_schema(self.filename)
fs_class = fs_registry.get(schema)
self.fs = fs_class.create()

# Initialize the records list
self.records: List[dict] = []

def write_record_async(
self,
record_envelope: RecordEnvelope[
Union[
MetadataChangeEvent,
MetadataChangeProposal,
MetadataChangeEventClass,
MetadataChangeProposalClass,
MetadataChangeProposalWrapper,
]
],
Expand All @@ -68,41 +80,74 @@ def write_record_async(
record, simplified_structure=not self.config.legacy_nested_json_string
)

if self.wrote_something:
self.file.write(",\n")

json.dump(obj, self.file, indent=4)
self.wrote_something = True
# Store records in memory until close()
self.records.append(obj)

self.report.report_record_written(record_envelope)
if write_callback:
write_callback.on_success(record_envelope, {})

def close(self):
super().close()
self.file.write("\n]")
self.file.close()
"""Write all records to the file system as a JSON array."""
if not self.records:
# Write empty array if no records
content = "[]"
else:
# Convert records to JSON string
content = "[\n"
for i, record in enumerate(self.records):
if i > 0:
content += ",\n"
content += json.dumps(record, indent=4)
content += "\n]"

# Write to file system
try:
self.fs.write(self.filename, content)
logger.info(
f"Successfully wrote {len(self.records)} records to {self.filename}"
)
except Exception as e:
logger.error(f"Failed to write to {self.filename}: {e}")
raise


def write_metadata_file(
file: pathlib.Path,
file_path: Union[str, pathlib.Path],
records: Iterable[
Union[
MetadataChangeEvent,
MetadataChangeProposal,
MetadataChangeEventClass,
MetadataChangeProposalClass,
MetadataChangeProposalWrapper,
UsageAggregation,
UsageAggregationClass,
dict, # Serialized MCE or MCP
]
],
) -> None:
# This simplified version of the FileSink can be used for testing purposes.
with file.open("w") as f:
f.write("[\n")
for i, record in enumerate(records):
if i > 0:
f.write(",\n")
if not isinstance(record, dict):
record = _to_obj_for_file(record)
json.dump(record, f, indent=4)
f.write("\n]")
"""
Write metadata records to any supported file system (local, S3, etc.).
This function uses the pluggable file system architecture.
"""
# Convert Path to string if needed
file_path_str = str(file_path)

# Determine file system based on path schema
schema = get_path_schema(file_path_str)
fs_class = fs_registry.get(schema)
fs = fs_class.create()

# Convert records to JSON string
content = "[\n"
record_list = list(records) # Convert iterable to list

for i, record in enumerate(record_list):
if i > 0:
content += ",\n"
if not isinstance(record, dict):
record = _to_obj_for_file(record)
content += json.dumps(record, indent=4)
content += "\n]"

# Write to file system
fs.write(file_path_str, content)
logger.info(f"Successfully wrote {len(record_list)} records to {file_path_str}")
Loading
Loading