Skip to content

Commit abd5581

Browse files
committed
feat(ingestion): Restores "Extending file sink to support writing to S3 (#14160) (#14248)"
This reverts commit fe3ffc1.
1 parent ed72cf4 commit abd5581

File tree

7 files changed

+591
-37
lines changed

7 files changed

+591
-37
lines changed

metadata-ingestion/src/datahub/ingestion/fs/fs_base.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,16 @@ def file_status(self, path: str) -> FileInfo:
3131
def list(self, path: str) -> Iterable[FileInfo]:
3232
pass
3333

34+
@abstractmethod
35+
def write(self, path: str, content: str, **kwargs: Any) -> None:
36+
"""Write content to a file at the given path."""
37+
pass
38+
39+
@abstractmethod
40+
def exists(self, path: str) -> bool:
41+
"""Check if a file exists at the given path."""
42+
pass
43+
3444

3545
def get_path_schema(path: str) -> str:
3646
scheme = parse.urlparse(path).scheme

metadata-ingestion/src/datahub/ingestion/fs/http_fs.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,15 @@ def file_status(self, path: str) -> FileInfo:
2626
def list(self, path: str) -> Iterable[FileInfo]:
2727
status = self.file_status(path)
2828
return [status]
29+
30+
def write(self, path: str, content: str, **kwargs: Any) -> None:
31+
"""HTTP file system does not support writing."""
32+
raise NotImplementedError("HTTP file system does not support write operations")
33+
34+
def exists(self, path: str) -> bool:
35+
"""Check if an HTTP resource exists."""
36+
try:
37+
head = requests.head(path)
38+
return head.ok
39+
except Exception:
40+
return False

metadata-ingestion/src/datahub/ingestion/fs/local_fs.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,17 @@ def file_status(self, path: str) -> FileInfo:
2727
return FileInfo(path, os.path.getsize(path), is_file=True)
2828
else:
2929
return FileInfo(path, 0, is_file=False)
30+
31+
def write(self, path: str, content: str, **kwargs: Any) -> None:
32+
"""Write content to a local file."""
33+
# Create parent directories if they don't exist
34+
p = pathlib.Path(path)
35+
p.parent.mkdir(parents=True, exist_ok=True)
36+
37+
# Write the content
38+
with p.open("w", **kwargs) as f:
39+
f.write(content)
40+
41+
def exists(self, path: str) -> bool:
42+
"""Check if a file exists locally."""
43+
return pathlib.Path(path).exists()

metadata-ingestion/src/datahub/ingestion/fs/s3_fs.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,3 +105,32 @@ def file_status(self, path: str) -> FileInfo:
105105
def list(self, path: str) -> Iterable[FileInfo]:
106106
s3_path = parse_s3_path(path)
107107
return S3ListIterator(self.s3, s3_path.bucket, s3_path.key)
108+
109+
def write(self, path: str, content: str, **kwargs: Any) -> None:
110+
"""Write content to S3."""
111+
s3_path = parse_s3_path(path)
112+
113+
# Convert string content to bytes for S3
114+
content_bytes = content.encode("utf-8")
115+
116+
# Upload to S3
117+
response = self.s3.put_object(
118+
Bucket=s3_path.bucket, Key=s3_path.key, Body=content_bytes, **kwargs
119+
)
120+
assert_ok_status(response)
121+
122+
def exists(self, path: str) -> bool:
123+
"""Check if an object exists in S3."""
124+
s3_path = parse_s3_path(path)
125+
try:
126+
self.s3.head_object(Bucket=s3_path.bucket, Key=s3_path.key)
127+
return True
128+
except Exception as e:
129+
if (
130+
hasattr(e, "response")
131+
and e.response["ResponseMetadata"]["HTTPStatusCode"] == 404
132+
):
133+
return False
134+
else:
135+
# Re-raise other exceptions (access denied, etc.)
136+
raise e
Lines changed: 82 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,36 @@
11
import json
22
import logging
33
import pathlib
4-
from typing import Iterable, Union
4+
from typing import Iterable, List, Union
55

66
from datahub.configuration.common import ConfigModel
77
from datahub.emitter.aspect import JSON_CONTENT_TYPE, JSON_PATCH_CONTENT_TYPE
88
from datahub.emitter.mcp import MetadataChangeProposalWrapper
99
from datahub.ingestion.api.common import RecordEnvelope
1010
from datahub.ingestion.api.sink import Sink, SinkReport, WriteCallback
11-
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
12-
MetadataChangeEvent,
13-
MetadataChangeProposal,
11+
from datahub.ingestion.fs.fs_base import get_path_schema
12+
from datahub.ingestion.fs.fs_registry import fs_registry
13+
from datahub.metadata.schema_classes import (
14+
MetadataChangeEventClass,
15+
MetadataChangeProposalClass,
16+
UsageAggregationClass,
1417
)
15-
from datahub.metadata.com.linkedin.pegasus2avro.usage import UsageAggregation
1618

1719
logger = logging.getLogger(__name__)
1820

1921

2022
def _to_obj_for_file(
2123
obj: Union[
22-
MetadataChangeEvent,
23-
MetadataChangeProposal,
24+
MetadataChangeEventClass,
25+
MetadataChangeProposalClass,
2426
MetadataChangeProposalWrapper,
25-
UsageAggregation,
27+
UsageAggregationClass,
2628
],
2729
simplified_structure: bool = True,
2830
) -> dict:
2931
if isinstance(obj, MetadataChangeProposalWrapper):
3032
return obj.to_obj(simplified_structure=simplified_structure)
31-
elif isinstance(obj, MetadataChangeProposal) and simplified_structure:
33+
elif isinstance(obj, MetadataChangeProposalClass) and simplified_structure:
3234
serialized = obj.to_obj()
3335
if serialized.get("aspect") and serialized["aspect"].get("contentType") in [
3436
JSON_CONTENT_TYPE,
@@ -46,18 +48,28 @@ class FileSinkConfig(ConfigModel):
4648

4749

4850
class FileSink(Sink[FileSinkConfig, SinkReport]):
51+
"""
52+
File sink that supports writing to various backends (local, S3, etc.)
53+
using the pluggable file system architecture.
54+
"""
55+
4956
def __post_init__(self) -> None:
50-
fpath = pathlib.Path(self.config.filename)
51-
self.file = fpath.open("w")
52-
self.file.write("[\n")
53-
self.wrote_something = False
57+
self.filename = self.config.filename
58+
59+
# Determine file system based on path schema
60+
schema = get_path_schema(self.filename)
61+
fs_class = fs_registry.get(schema)
62+
self.fs = fs_class.create()
63+
64+
# Initialize the records list
65+
self.records: List[dict] = []
5466

5567
def write_record_async(
5668
self,
5769
record_envelope: RecordEnvelope[
5870
Union[
59-
MetadataChangeEvent,
60-
MetadataChangeProposal,
71+
MetadataChangeEventClass,
72+
MetadataChangeProposalClass,
6173
MetadataChangeProposalWrapper,
6274
]
6375
],
@@ -68,41 +80,74 @@ def write_record_async(
6880
record, simplified_structure=not self.config.legacy_nested_json_string
6981
)
7082

71-
if self.wrote_something:
72-
self.file.write(",\n")
73-
74-
json.dump(obj, self.file, indent=4)
75-
self.wrote_something = True
83+
# Store records in memory until close()
84+
self.records.append(obj)
7685

7786
self.report.report_record_written(record_envelope)
7887
if write_callback:
7988
write_callback.on_success(record_envelope, {})
8089

8190
def close(self):
82-
super().close()
83-
self.file.write("\n]")
84-
self.file.close()
91+
"""Write all records to the file system as a JSON array."""
92+
if not self.records:
93+
# Write empty array if no records
94+
content = "[]"
95+
else:
96+
# Convert records to JSON string
97+
content = "[\n"
98+
for i, record in enumerate(self.records):
99+
if i > 0:
100+
content += ",\n"
101+
content += json.dumps(record, indent=4)
102+
content += "\n]"
103+
104+
# Write to file system
105+
try:
106+
self.fs.write(self.filename, content)
107+
logger.info(
108+
f"Successfully wrote {len(self.records)} records to {self.filename}"
109+
)
110+
except Exception as e:
111+
logger.error(f"Failed to write to {self.filename}: {e}")
112+
raise
85113

86114

87115
def write_metadata_file(
88-
file: pathlib.Path,
116+
file_path: Union[str, pathlib.Path],
89117
records: Iterable[
90118
Union[
91-
MetadataChangeEvent,
92-
MetadataChangeProposal,
119+
MetadataChangeEventClass,
120+
MetadataChangeProposalClass,
93121
MetadataChangeProposalWrapper,
94-
UsageAggregation,
122+
UsageAggregationClass,
95123
dict, # Serialized MCE or MCP
96124
]
97125
],
98126
) -> None:
99-
# This simplified version of the FileSink can be used for testing purposes.
100-
with file.open("w") as f:
101-
f.write("[\n")
102-
for i, record in enumerate(records):
103-
if i > 0:
104-
f.write(",\n")
105-
if not isinstance(record, dict):
106-
record = _to_obj_for_file(record)
107-
json.dump(record, f, indent=4)
108-
f.write("\n]")
127+
"""
128+
Write metadata records to any supported file system (local, S3, etc.).
129+
This function uses the pluggable file system architecture.
130+
"""
131+
# Convert Path to string if needed
132+
file_path_str = str(file_path)
133+
134+
# Determine file system based on path schema
135+
schema = get_path_schema(file_path_str)
136+
fs_class = fs_registry.get(schema)
137+
fs = fs_class.create()
138+
139+
# Convert records to JSON string
140+
content = "[\n"
141+
record_list = list(records) # Convert iterable to list
142+
143+
for i, record in enumerate(record_list):
144+
if i > 0:
145+
content += ",\n"
146+
if not isinstance(record, dict):
147+
record = _to_obj_for_file(record)
148+
content += json.dumps(record, indent=4)
149+
content += "\n]"
150+
151+
# Write to file system
152+
fs.write(file_path_str, content)
153+
logger.info(f"Successfully wrote {len(record_list)} records to {file_path_str}")

0 commit comments

Comments
 (0)