diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp index 0d65e2ba1b252..a80e300d9a8c3 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp @@ -401,11 +401,9 @@ ChunkedSegmentSealedImpl::LoadColumnGroup( auto chunk_reader = std::move(chunk_reader_result).ValueOrDie(); - LOG_INFO( - "[StorageV2] segment {} loads manifest cg index {} with field ids " - "{} ", - this->get_segment_id(), - index); + LOG_INFO("[StorageV2] segment {} loads manifest cg index {}", + this->get_segment_id(), + index); auto translator = std::make_unique( diff --git a/internal/core/src/segcore/storagev2translator/ManifestGroupTranslator.cpp b/internal/core/src/segcore/storagev2translator/ManifestGroupTranslator.cpp index fce1e57f49874..c9bf3aaf4b7ca 100644 --- a/internal/core/src/segcore/storagev2translator/ManifestGroupTranslator.cpp +++ b/internal/core/src/segcore/storagev2translator/ManifestGroupTranslator.cpp @@ -165,6 +165,13 @@ ManifestGroupTranslator::get_cells( auto chunks = read_result.ValueOrDie(); for (size_t i = 0; i < chunks.size(); ++i) { auto& chunk = chunks[i]; + AssertInfo(chunk != nullptr, + "chunk is null, idx = {}, group index = {}, segment id = " + "{}, parallel degree = {}", + i, + column_group_index_, + segment_id_, + parallel_degree); auto cid = cids[i]; auto group_chunk = load_group_chunk(chunk, cid); cells.emplace_back(cid, std::move(group_chunk)); diff --git a/internal/core/src/storage/loon_ffi/CMakeLists.txt b/internal/core/src/storage/loon_ffi/CMakeLists.txt index 3370e33a64ca8..fd65a1417513b 100644 --- a/internal/core/src/storage/loon_ffi/CMakeLists.txt +++ b/internal/core/src/storage/loon_ffi/CMakeLists.txt @@ -11,6 +11,7 @@ # FFI Reader source files for interfacing with milvus-storage through FFI set(FFI_SRCS + ${CMAKE_CURRENT_SOURCE_DIR}/ffi_writer_c.cpp ${CMAKE_CURRENT_SOURCE_DIR}/ffi_reader_c.cpp ${CMAKE_CURRENT_SOURCE_DIR}/util.cpp ) diff --git a/internal/core/src/storage/loon_ffi/ffi_writer_c.cpp b/internal/core/src/storage/loon_ffi/ffi_writer_c.cpp index e69de29bb2d1d..c7356d2dcfdb6 100644 --- a/internal/core/src/storage/loon_ffi/ffi_writer_c.cpp +++ b/internal/core/src/storage/loon_ffi/ffi_writer_c.cpp @@ -0,0 +1,190 @@ +// Copyright 2023 Zilliz +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include "log/Log.h" +#include "common/EasyAssert.h" +#include "milvus-storage/column_groups.h" +#include "milvus-storage/transaction/transaction.h" +#include "milvus-storage/writer.h" +#include "storage/loon_ffi/ffi_writer_c.h" +#include "common/common_type_c.h" +#include "milvus-storage/ffi_c.h" +#include "storage/loon_ffi/util.h" +#include "monitor/scope_metric.h" + +CStatus +NewPackedLoonWriter(const char* base_path, + struct ArrowSchema* schema, + CStorageConfig c_storage_config, + const char* split_pattern, + LoonWriterHandler* writer_handle) { + SCOPE_CGO_CALL_METRIC(); + try { + auto schema_result = arrow::ImportSchema(schema); + AssertInfo(schema_result.ok(), "Import arrow schema failed"); + auto arrow_schema = schema_result.ValueOrDie(); + + auto properties = + MakeInternalPropertiesFromStorageConfig(c_storage_config); + if (split_pattern != nullptr) { + auto result = milvus_storage::api::SetValue( + *properties, "writer.policy", "schema_based", true); + AssertInfo(result == std::nullopt, "Set writer.policy failed"); + result = milvus_storage::api::SetValue( + *properties, + "writer.split.schema_based.patterns", + split_pattern, + true); + AssertInfo(result == std::nullopt, + "Set writer.schema_base_patterns failed, error {}", + result.value()); + } + std::unique_ptr policy; + + auto policy_status = + milvus_storage::api::ColumnGroupPolicy::create_column_group_policy( + *properties, arrow_schema) + .Value(&policy); + AssertInfo(policy_status.ok(), "create column group policy failed"); + + auto writer = milvus_storage::api::Writer::create( + base_path, arrow_schema, std::move(policy), *properties); + *writer_handle = writer.release(); + return milvus::SuccessCStatus(); + } catch (std::exception& e) { + return milvus::FailureCStatus(&e); + } +} + +CStatus +PackedLoonWrite(LoonWriterHandler writer_handle, + struct ArrowArray* arrays, + struct ArrowSchema* array_schemas, + struct ArrowSchema* schema) { + SCOPE_CGO_CALL_METRIC(); + try { + auto writer = static_cast(writer_handle); + auto import_schema = arrow::ImportSchema(schema); + if (!import_schema.ok()) { + return milvus::FailureCStatus( + milvus::ErrorCode::FileWriteFailed, + "Failed to import schema: " + + import_schema.status().ToString()); + } + auto arrow_schema = import_schema.ValueOrDie(); + + int num_fields = arrow_schema->num_fields(); + std::vector> all_arrays; + all_arrays.reserve(num_fields); + + for (int i = 0; i < num_fields; i++) { + auto array = arrow::ImportArray(&arrays[i], &array_schemas[i]); + if (!array.ok()) { + return milvus::FailureCStatus( + milvus::ErrorCode::FileWriteFailed, + "Failed to import array " + std::to_string(i) + ": " + + array.status().ToString()); + } + all_arrays.push_back(array.ValueOrDie()); + } + + auto record_batch = arrow::RecordBatch::Make( + arrow_schema, all_arrays[0]->length(), all_arrays); + + auto status = writer->write(record_batch); + AssertInfo( + status.ok(), "failed to write batch, error: {}", status.ToString()); + return milvus::SuccessCStatus(); + } catch (std::exception& e) { + return milvus::FailureCStatus(&e); + } +} + +CStatus +ClosePackedLoonWriter(LoonWriterHandler writer_handle, + const char* base_path, + CStorageConfig c_storage_config, + int64_t* latest_version) { + SCOPE_CGO_CALL_METRIC(); + try { + auto writer = static_cast(writer_handle); + // delete writer; + auto status = writer->flush(); + AssertInfo(status.ok(), + "failed to flush writer, error: {}", + status.ToString()); + + auto result = writer->close(); + AssertInfo(result.ok(), "failed to close writer"); + + auto cgs = result.ValueOrDie(); + + auto manifest_content = cgs->serialize().ValueOrDie(); + + delete writer; + + auto properties = + MakeInternalPropertiesFromStorageConfig(c_storage_config); + AssertInfo(properties != nullptr, "properties is null"); + + // commit first + { + auto transaction = std::make_unique< + milvus_storage::api::transaction::TransactionImpl< + milvus_storage::api::ColumnGroups>>(*properties, base_path); + auto status = transaction->begin(); + AssertInfo(status.ok(), + "failed to begin transaction, error: {}", + status.ToString()); + + auto commit_result = transaction->commit( + cgs, + milvus_storage::api::transaction::UpdateType::APPENDFILES, + milvus_storage::api::transaction::TransResolveStrategy:: + RESOLVE_FAIL); + + AssertInfo(commit_result.ok(), + "failed to commit transaction, error: {}", + commit_result.status().ToString()); + + AssertInfo(commit_result.ValueOrDie(), + "failed to commit transaction, error: {}", + commit_result.status().ToString()); + } + + // get latest version, temp solution here + { + auto transaction = std::make_unique< + milvus_storage::api::transaction::TransactionImpl< + milvus_storage::api::ColumnGroups>>(*properties, base_path); + auto latest_manifest_result = transaction->get_latest_manifest(); + AssertInfo(latest_manifest_result.ok(), + "failed to get latest manifest, error: {}", + latest_manifest_result.status().ToString()); + *latest_version = transaction->read_version(); + LOG_WARN( + "Loon writer transaction(base path {}) succeeded with latest " + "version: {}", + base_path, + *latest_version); + } + + return milvus::SuccessCStatus(); + } catch (std::exception& e) { + return milvus::FailureCStatus(&e); + } +} \ No newline at end of file diff --git a/internal/core/src/storage/loon_ffi/ffi_writer_c.h b/internal/core/src/storage/loon_ffi/ffi_writer_c.h index bcfa1e5354ed0..42f4bdbbb2361 100644 --- a/internal/core/src/storage/loon_ffi/ffi_writer_c.h +++ b/internal/core/src/storage/loon_ffi/ffi_writer_c.h @@ -11,3 +11,74 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + +#pragma once + +#ifdef __cplusplus +extern "C" { +#endif + +#include "common/common_type_c.h" +#include "common/type_c.h" +#include +#include "milvus-storage/ffi_c.h" + +// Opaque handle to a Loon packed writer instance +typedef void* LoonWriterHandler; + +// NewPackedLoonWriter creates a new packed writer for writing data to Loon storage. +// +// Parameters: +// - base_path: The base path in object storage where data will be written +// - schema: Arrow schema defining the structure of data to be written +// - c_storage_config: Storage configuration (endpoint, credentials, bucket, etc.) +// - split_pattern: Pattern defining how columns should be split into groups +// - writer_handle: Output parameter that receives the created writer handle +// +// Returns: +// - CStatus indicating success or failure with error message +CStatus +NewPackedLoonWriter(const char* base_path, + struct ArrowSchema* schema, + CStorageConfig c_storage_config, + const char* split_pattern, + LoonWriterHandler* writer_handle); + +// PackedLoonWrite writes a batch of Arrow arrays to the Loon packed writer. +// +// Parameters: +// - writer_handle: The writer handle created by NewPackedLoonWriter +// - arrays: Arrow arrays containing the data to write +// - array_schemas: Schemas for the individual arrays +// - schema: The overall schema for the write operation +// +// Returns: +// - CStatus indicating success or failure with error message +CStatus +PackedLoonWrite(LoonWriterHandler writer_handle, + struct ArrowArray* arrays, + struct ArrowSchema* array_schemas, + struct ArrowSchema* schema); + +// ClosePackedLoonWriter finalizes the writer and commits all data to storage. +// +// This function flushes any buffered data, writes the manifest file, +// and releases all resources associated with the writer. +// +// Parameters: +// - writer_handle: The writer handle to close (will be invalidated after this call) +// - base_path: The base path where data was written +// - c_storage_config: Storage configuration for finalizing the write +// - latest_version: Output parameter that receives the version number of the written data +// +// Returns: +// - CStatus indicating success or failure with error message +CStatus +ClosePackedLoonWriter(LoonWriterHandler writer_handle, + const char* base_path, + CStorageConfig c_storage_config, + int64_t* latest_version); + +#ifdef __cplusplus +} +#endif \ No newline at end of file diff --git a/internal/core/thirdparty/milvus-storage/CMakeLists.txt b/internal/core/thirdparty/milvus-storage/CMakeLists.txt index f0c6873dc58a3..800c1fd5d9935 100644 --- a/internal/core/thirdparty/milvus-storage/CMakeLists.txt +++ b/internal/core/thirdparty/milvus-storage/CMakeLists.txt @@ -14,7 +14,7 @@ # Update milvus-storage_VERSION for the first occurrence milvus_add_pkg_config("milvus-storage") set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "") -set( milvus-storage_VERSION 302143c) +set( milvus-storage_VERSION ba7df7b) set( GIT_REPOSITORY "https://github.com/milvus-io/milvus-storage.git") message(STATUS "milvus-storage repo: ${GIT_REPOSITORY}") message(STATUS "milvus-storage version: ${milvus-storage_VERSION}") diff --git a/internal/datacoord/task_index.go b/internal/datacoord/task_index.go index ae03cca51430e..1812716b54752 100644 --- a/internal/datacoord/task_index.go +++ b/internal/datacoord/task_index.go @@ -327,6 +327,7 @@ func (it *indexBuildTask) prepareJobRequest(ctx context.Context, segment *Segmen TaskSlot: it.taskSlot, LackBinlogRows: segIndex.NumRows - totalRows, InsertLogs: segment.GetBinlogs(), + Manifest: segment.GetManifestPath(), } WrapPluginContext(segment.GetCollectionID(), schema.GetProperties(), req) diff --git a/internal/datanode/index/task_index.go b/internal/datanode/index/task_index.go index cf78f61042915..8e22f847d01a5 100644 --- a/internal/datanode/index/task_index.go +++ b/internal/datanode/index/task_index.go @@ -310,6 +310,7 @@ func (it *indexBuildTask) Execute(ctx context.Context) error { it.req.GetCollectionID(), it.req.GetPartitionID(), it.req.GetSegmentID()) + buildIndexParams.Manifest = it.req.GetManifest() } log.Info("create index", zap.Any("buildIndexParams", buildIndexParams)) diff --git a/internal/storagev2/packed/packed_writer_ffi.go b/internal/storagev2/packed/packed_writer_ffi.go index a9387159de8d0..16473d7f8aebb 100644 --- a/internal/storagev2/packed/packed_writer_ffi.go +++ b/internal/storagev2/packed/packed_writer_ffi.go @@ -21,6 +21,7 @@ package packed #include "milvus-storage/ffi_c.h" #include "segcore/packed_writer_c.h" #include "segcore/column_groups_c.h" +#include "storage/loon_ffi/ffi_writer_c.h" #include "arrow/c/abi.h" #include "arrow/c/helpers.h" */ @@ -33,10 +34,8 @@ import ( "github.com/apache/arrow/go/v17/arrow" "github.com/apache/arrow/go/v17/arrow/cdata" "github.com/samber/lo" - "go.uber.org/zap" "github.com/milvus-io/milvus/internal/storagecommon" - "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/indexcgopb" "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" @@ -91,91 +90,123 @@ func NewFFIPackedWriter(basePath string, schema *arrow.Schema, columnGroups []st return schema.Field(index).Name }), "|") }), ",") - - cProperties, err := MakePropertiesFromStorageConfig(storageConfig, map[string]string{ - PropertyWriterPolicy: "schema_based", - PropertyWriterSchemaBasedPattern: pattern, - }) - if err != nil { - return nil, err + cPattern := C.CString(pattern) + defer C.free(unsafe.Pointer(cPattern)) + + cStorageConfig := C.CStorageConfig{ + address: C.CString(storageConfig.GetAddress()), + bucket_name: C.CString(storageConfig.GetBucketName()), + access_key_id: C.CString(storageConfig.GetAccessKeyID()), + access_key_value: C.CString(storageConfig.GetSecretAccessKey()), + root_path: C.CString(storageConfig.GetRootPath()), + storage_type: C.CString(storageConfig.GetStorageType()), + cloud_provider: C.CString(storageConfig.GetCloudProvider()), + iam_endpoint: C.CString(storageConfig.GetIAMEndpoint()), + log_level: C.CString("Warn"), // TODO use config after storage support lower case configuration + useSSL: C.bool(storageConfig.GetUseSSL()), + sslCACert: C.CString(storageConfig.GetSslCACert()), + useIAM: C.bool(storageConfig.GetUseIAM()), + region: C.CString(storageConfig.GetRegion()), + useVirtualHost: C.bool(storageConfig.GetUseVirtualHost()), + requestTimeoutMs: C.int64_t(storageConfig.GetRequestTimeoutMs()), + gcp_credential_json: C.CString(storageConfig.GetGcpCredentialJSON()), + use_custom_part_upload: true, + max_connections: C.uint32_t(storageConfig.GetMaxConnections()), } - - var writerHandle C.WriterHandle - - result := C.writer_new(cBasePath, cSchema, cProperties, &writerHandle) - - err = HandleFFIResult(result) - if err != nil { + defer C.free(unsafe.Pointer(cStorageConfig.address)) + defer C.free(unsafe.Pointer(cStorageConfig.bucket_name)) + defer C.free(unsafe.Pointer(cStorageConfig.access_key_id)) + defer C.free(unsafe.Pointer(cStorageConfig.access_key_value)) + defer C.free(unsafe.Pointer(cStorageConfig.root_path)) + defer C.free(unsafe.Pointer(cStorageConfig.storage_type)) + defer C.free(unsafe.Pointer(cStorageConfig.cloud_provider)) + defer C.free(unsafe.Pointer(cStorageConfig.iam_endpoint)) + defer C.free(unsafe.Pointer(cStorageConfig.log_level)) + defer C.free(unsafe.Pointer(cStorageConfig.sslCACert)) + defer C.free(unsafe.Pointer(cStorageConfig.region)) + defer C.free(unsafe.Pointer(cStorageConfig.gcp_credential_json)) + + var writerHandler C.LoonWriterHandler + + status := C.NewPackedLoonWriter(cBasePath, cSchema, cStorageConfig, cPattern, &writerHandler) + if err := ConsumeCStatusIntoError(&status); err != nil { return nil, err } return &FFIPackedWriter{ basePath: basePath, - cWriterHandle: writerHandle, - cProperties: cProperties, + loonWriter: writerHandler, + storageConfig: storageConfig, }, nil } func (pw *FFIPackedWriter) WriteRecordBatch(recordBatch arrow.Record) error { - var caa cdata.CArrowArray - var cas cdata.CArrowSchema + cArrays := make([]CArrowArray, recordBatch.NumCols()) + cSchemas := make([]CArrowSchema, recordBatch.NumCols()) + + for i := range recordBatch.NumCols() { + var caa cdata.CArrowArray + var cas cdata.CArrowSchema + cdata.ExportArrowArray(recordBatch.Column(int(i)), &caa, &cas) + cArrays[i] = *(*CArrowArray)(unsafe.Pointer(&caa)) + cSchemas[i] = *(*CArrowSchema)(unsafe.Pointer(&cas)) + } - // Export the record batch to C Arrow format - cdata.ExportArrowRecordBatch(recordBatch, &caa, &cas) - defer cdata.ReleaseCArrowArray(&caa) - defer cdata.ReleaseCArrowSchema(&cas) + var cas cdata.CArrowSchema + cdata.ExportArrowSchema(recordBatch.Schema(), &cas) + cSchema := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas)) - // Convert to C struct - cArray := (*C.struct_ArrowArray)(unsafe.Pointer(&caa)) + status := C.PackedLoonWrite(pw.loonWriter, &cArrays[0], &cSchemas[0], cSchema) + if err := ConsumeCStatusIntoError(&status); err != nil { + return err + } - result := C.writer_write(pw.cWriterHandle, cArray) - return HandleFFIResult(result) + return nil } func (pw *FFIPackedWriter) Close() (string, error) { - var manifest *C.char - - result := C.writer_close(pw.cWriterHandle, nil, nil, 0, &manifest) - if err := HandleFFIResult(result); err != nil { - return "", err - } - + var outVersion C.int64_t cBasePath := C.CString(pw.basePath) defer C.free(unsafe.Pointer(cBasePath)) - var transationHandle C.TransactionHandle - result = C.transaction_begin(cBasePath, pw.cProperties, &transationHandle) - if err := HandleFFIResult(result); err != nil { - return "", err - } - defer C.transaction_destroy(transationHandle) - - // #define LOON_TRANSACTION_UPDATE_ADDFILES 0 - // #define LOON_TRANSACTION_UPDATE_ADDFEILD 1 - // #define LOON_TRANSACTION_UPDATE_MAX 2 - // #define LOON_TRANSACTION_RESOLVE_FAIL 0 - // #define LOON_TRANSACTION_RESOLVE_MERGE 1 - // #define LOON_TRANSACTION_RESOLVE_MAX 2 - - var commitResult C.bool - result = C.transaction_commit(transationHandle, C.int16_t(0), C.int16_t(0), manifest, &commitResult) - if err := HandleFFIResult(result); err != nil { - return "", err + storageConfig := pw.storageConfig + cStorageConfig := C.CStorageConfig{ + address: C.CString(storageConfig.GetAddress()), + bucket_name: C.CString(storageConfig.GetBucketName()), + access_key_id: C.CString(storageConfig.GetAccessKeyID()), + access_key_value: C.CString(storageConfig.GetSecretAccessKey()), + root_path: C.CString(storageConfig.GetRootPath()), + storage_type: C.CString(storageConfig.GetStorageType()), + cloud_provider: C.CString(storageConfig.GetCloudProvider()), + iam_endpoint: C.CString(storageConfig.GetIAMEndpoint()), + log_level: C.CString("Warn"), // TODO use config after storage support lower case configuration + useSSL: C.bool(storageConfig.GetUseSSL()), + sslCACert: C.CString(storageConfig.GetSslCACert()), + useIAM: C.bool(storageConfig.GetUseIAM()), + region: C.CString(storageConfig.GetRegion()), + useVirtualHost: C.bool(storageConfig.GetUseVirtualHost()), + requestTimeoutMs: C.int64_t(storageConfig.GetRequestTimeoutMs()), + gcp_credential_json: C.CString(storageConfig.GetGcpCredentialJSON()), + use_custom_part_upload: true, + max_connections: C.uint32_t(storageConfig.GetMaxConnections()), } - - defer C.transaction_destroy(transationHandle) - - var readVersion C.int64_t - - // TODO: not atomic, need to get version from transaction - var cOutManifest *C.char - result = C.get_latest_column_groups(cBasePath, pw.cProperties, &cOutManifest, &readVersion) - if err := HandleFFIResult(result); err != nil { + defer C.free(unsafe.Pointer(cStorageConfig.address)) + defer C.free(unsafe.Pointer(cStorageConfig.bucket_name)) + defer C.free(unsafe.Pointer(cStorageConfig.access_key_id)) + defer C.free(unsafe.Pointer(cStorageConfig.access_key_value)) + defer C.free(unsafe.Pointer(cStorageConfig.root_path)) + defer C.free(unsafe.Pointer(cStorageConfig.storage_type)) + defer C.free(unsafe.Pointer(cStorageConfig.cloud_provider)) + defer C.free(unsafe.Pointer(cStorageConfig.iam_endpoint)) + defer C.free(unsafe.Pointer(cStorageConfig.log_level)) + defer C.free(unsafe.Pointer(cStorageConfig.sslCACert)) + defer C.free(unsafe.Pointer(cStorageConfig.region)) + defer C.free(unsafe.Pointer(cStorageConfig.gcp_credential_json)) + + status := C.ClosePackedLoonWriter(pw.loonWriter, cBasePath, cStorageConfig, &outVersion) + if err := ConsumeCStatusIntoError(&status); err != nil { return "", err } - outManifest := C.GoString(cOutManifest) - log.Info("FFI writer closed with output manifest", zap.String("manifest", outManifest), zap.Int64("version", int64(readVersion))) - defer C.properties_free(pw.cProperties) - return MarshalManifestPath(pw.basePath, int64(readVersion)), nil + return MarshalManifestPath(pw.basePath, int64(outVersion)), nil } diff --git a/internal/storagev2/packed/type.go b/internal/storagev2/packed/type.go index 0c54b1a8109dc..40db2a18b049d 100644 --- a/internal/storagev2/packed/type.go +++ b/internal/storagev2/packed/type.go @@ -19,6 +19,7 @@ package packed #include "arrow/c/abi.h" #include "arrow/c/helpers.h" #include "storage/loon_ffi/ffi_reader_c.h" +#include "storage/loon_ffi/ffi_writer_c.h" #include "segcore/packed_reader_c.h" #include "segcore/packed_writer_c.h" */ @@ -28,6 +29,8 @@ import ( "github.com/apache/arrow/go/v17/arrow" "github.com/apache/arrow/go/v17/arrow/arrio" "github.com/apache/arrow/go/v17/arrow/cdata" + + "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" ) type PackedWriter struct { @@ -36,8 +39,8 @@ type PackedWriter struct { type FFIPackedWriter struct { basePath string - cWriterHandle C.WriterHandle - cProperties *C.Properties + loonWriter C.LoonWriterHandler + storageConfig *indexpb.StorageConfig } type PackedReader struct {