11import json
22import logging
33import pathlib
4- from typing import Iterable , Union
4+ from typing import Iterable , List , Union
55
66from datahub .configuration .common import ConfigModel
77from datahub .emitter .aspect import JSON_CONTENT_TYPE , JSON_PATCH_CONTENT_TYPE
88from datahub .emitter .mcp import MetadataChangeProposalWrapper
99from datahub .ingestion .api .common import RecordEnvelope
1010from datahub .ingestion .api .sink import Sink , SinkReport , WriteCallback
11- from datahub .metadata .com .linkedin .pegasus2avro .mxe import (
12- MetadataChangeEvent ,
13- MetadataChangeProposal ,
11+ from datahub .ingestion .fs .fs_base import get_path_schema
12+ from datahub .ingestion .fs .fs_registry import fs_registry
13+ from datahub .metadata .schema_classes import (
14+ MetadataChangeEventClass ,
15+ MetadataChangeProposalClass ,
16+ UsageAggregationClass ,
1417)
15- from datahub .metadata .com .linkedin .pegasus2avro .usage import UsageAggregation
1618
1719logger = logging .getLogger (__name__ )
1820
1921
2022def _to_obj_for_file (
2123 obj : Union [
22- MetadataChangeEvent ,
23- MetadataChangeProposal ,
24+ MetadataChangeEventClass ,
25+ MetadataChangeProposalClass ,
2426 MetadataChangeProposalWrapper ,
25- UsageAggregation ,
27+ UsageAggregationClass ,
2628 ],
2729 simplified_structure : bool = True ,
2830) -> dict :
2931 if isinstance (obj , MetadataChangeProposalWrapper ):
3032 return obj .to_obj (simplified_structure = simplified_structure )
31- elif isinstance (obj , MetadataChangeProposal ) and simplified_structure :
33+ elif isinstance (obj , MetadataChangeProposalClass ) and simplified_structure :
3234 serialized = obj .to_obj ()
3335 if serialized .get ("aspect" ) and serialized ["aspect" ].get ("contentType" ) in [
3436 JSON_CONTENT_TYPE ,
@@ -46,18 +48,28 @@ class FileSinkConfig(ConfigModel):
4648
4749
4850class FileSink (Sink [FileSinkConfig , SinkReport ]):
51+ """
52+ File sink that supports writing to various backends (local, S3, etc.)
53+ using the pluggable file system architecture.
54+ """
55+
4956 def __post_init__ (self ) -> None :
50- fpath = pathlib .Path (self .config .filename )
51- self .file = fpath .open ("w" )
52- self .file .write ("[\n " )
53- self .wrote_something = False
57+ self .filename = self .config .filename
58+
59+ # Determine file system based on path schema
60+ schema = get_path_schema (self .filename )
61+ fs_class = fs_registry .get (schema )
62+ self .fs = fs_class .create ()
63+
64+ # Initialize the records list
65+ self .records : List [dict ] = []
5466
5567 def write_record_async (
5668 self ,
5769 record_envelope : RecordEnvelope [
5870 Union [
59- MetadataChangeEvent ,
60- MetadataChangeProposal ,
71+ MetadataChangeEventClass ,
72+ MetadataChangeProposalClass ,
6173 MetadataChangeProposalWrapper ,
6274 ]
6375 ],
@@ -68,41 +80,74 @@ def write_record_async(
6880 record , simplified_structure = not self .config .legacy_nested_json_string
6981 )
7082
71- if self .wrote_something :
72- self .file .write (",\n " )
73-
74- json .dump (obj , self .file , indent = 4 )
75- self .wrote_something = True
83+ # Store records in memory until close()
84+ self .records .append (obj )
7685
7786 self .report .report_record_written (record_envelope )
7887 if write_callback :
7988 write_callback .on_success (record_envelope , {})
8089
8190 def close (self ):
82- super ().close ()
83- self .file .write ("\n ]" )
84- self .file .close ()
91+ """Write all records to the file system as a JSON array."""
92+ if not self .records :
93+ # Write empty array if no records
94+ content = "[]"
95+ else :
96+ # Convert records to JSON string
97+ content = "[\n "
98+ for i , record in enumerate (self .records ):
99+ if i > 0 :
100+ content += ",\n "
101+ content += json .dumps (record , indent = 4 )
102+ content += "\n ]"
103+
104+ # Write to file system
105+ try :
106+ self .fs .write (self .filename , content )
107+ logger .info (
108+ f"Successfully wrote { len (self .records )} records to { self .filename } "
109+ )
110+ except Exception as e :
111+ logger .error (f"Failed to write to { self .filename } : { e } " )
112+ raise
85113
86114
87115def write_metadata_file (
88- file : pathlib .Path ,
116+ file_path : Union [ str , pathlib .Path ] ,
89117 records : Iterable [
90118 Union [
91- MetadataChangeEvent ,
92- MetadataChangeProposal ,
119+ MetadataChangeEventClass ,
120+ MetadataChangeProposalClass ,
93121 MetadataChangeProposalWrapper ,
94- UsageAggregation ,
122+ UsageAggregationClass ,
95123 dict , # Serialized MCE or MCP
96124 ]
97125 ],
98126) -> None :
99- # This simplified version of the FileSink can be used for testing purposes.
100- with file .open ("w" ) as f :
101- f .write ("[\n " )
102- for i , record in enumerate (records ):
103- if i > 0 :
104- f .write (",\n " )
105- if not isinstance (record , dict ):
106- record = _to_obj_for_file (record )
107- json .dump (record , f , indent = 4 )
108- f .write ("\n ]" )
127+ """
128+ Write metadata records to any supported file system (local, S3, etc.).
129+ This function uses the pluggable file system architecture.
130+ """
131+ # Convert Path to string if needed
132+ file_path_str = str (file_path )
133+
134+ # Determine file system based on path schema
135+ schema = get_path_schema (file_path_str )
136+ fs_class = fs_registry .get (schema )
137+ fs = fs_class .create ()
138+
139+ # Convert records to JSON string
140+ content = "[\n "
141+ record_list = list (records ) # Convert iterable to list
142+
143+ for i , record in enumerate (record_list ):
144+ if i > 0 :
145+ content += ",\n "
146+ if not isinstance (record , dict ):
147+ record = _to_obj_for_file (record )
148+ content += json .dumps (record , indent = 4 )
149+ content += "\n ]"
150+
151+ # Write to file system
152+ fs .write (file_path_str , content )
153+ logger .info (f"Successfully wrote { len (record_list )} records to { file_path_str } " )
0 commit comments