Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -401,11 +401,9 @@ ChunkedSegmentSealedImpl::LoadColumnGroup(

auto chunk_reader = std::move(chunk_reader_result).ValueOrDie();

LOG_INFO(
"[StorageV2] segment {} loads manifest cg index {} with field ids "
"{} ",
this->get_segment_id(),
index);
LOG_INFO("[StorageV2] segment {} loads manifest cg index {}",
this->get_segment_id(),
index);

auto translator =
std::make_unique<storagev2translator::ManifestGroupTranslator>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,13 @@ ManifestGroupTranslator::get_cells(
auto chunks = read_result.ValueOrDie();
for (size_t i = 0; i < chunks.size(); ++i) {
auto& chunk = chunks[i];
AssertInfo(chunk != nullptr,
"chunk is null, idx = {}, group index = {}, segment id = "
"{}, parallel degree = {}",
i,
column_group_index_,
segment_id_,
parallel_degree);
auto cid = cids[i];
auto group_chunk = load_group_chunk(chunk, cid);
cells.emplace_back(cid, std::move(group_chunk));
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/storage/loon_ffi/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

# FFI Reader source files for interfacing with milvus-storage through FFI
set(FFI_SRCS
${CMAKE_CURRENT_SOURCE_DIR}/ffi_writer_c.cpp
${CMAKE_CURRENT_SOURCE_DIR}/ffi_reader_c.cpp
${CMAKE_CURRENT_SOURCE_DIR}/util.cpp
)
Expand Down
190 changes: 190 additions & 0 deletions internal/core/src/storage/loon_ffi/ffi_writer_c.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
// Copyright 2023 Zilliz
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <arrow/c/bridge.h>
#include <arrow/array.h>
#include <exception>
#include "log/Log.h"
#include "common/EasyAssert.h"
#include "milvus-storage/column_groups.h"
#include "milvus-storage/transaction/transaction.h"
#include "milvus-storage/writer.h"
#include "storage/loon_ffi/ffi_writer_c.h"
#include "common/common_type_c.h"
#include "milvus-storage/ffi_c.h"
#include "storage/loon_ffi/util.h"
#include "monitor/scope_metric.h"

CStatus
NewPackedLoonWriter(const char* base_path,
struct ArrowSchema* schema,
CStorageConfig c_storage_config,
const char* split_pattern,
LoonWriterHandler* writer_handle) {
SCOPE_CGO_CALL_METRIC();
try {
auto schema_result = arrow::ImportSchema(schema);
AssertInfo(schema_result.ok(), "Import arrow schema failed");
auto arrow_schema = schema_result.ValueOrDie();

auto properties =
MakeInternalPropertiesFromStorageConfig(c_storage_config);
if (split_pattern != nullptr) {
auto result = milvus_storage::api::SetValue(
*properties, "writer.policy", "schema_based", true);
AssertInfo(result == std::nullopt, "Set writer.policy failed");
result = milvus_storage::api::SetValue(
*properties,
"writer.split.schema_based.patterns",
split_pattern,
true);
AssertInfo(result == std::nullopt,
"Set writer.schema_base_patterns failed, error {}",
result.value());
}
std::unique_ptr<milvus_storage::api::ColumnGroupPolicy> policy;

auto policy_status =
milvus_storage::api::ColumnGroupPolicy::create_column_group_policy(
*properties, arrow_schema)
.Value(&policy);
AssertInfo(policy_status.ok(), "create column group policy failed");

auto writer = milvus_storage::api::Writer::create(
base_path, arrow_schema, std::move(policy), *properties);
*writer_handle = writer.release();
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(&e);
}
}

CStatus
PackedLoonWrite(LoonWriterHandler writer_handle,
struct ArrowArray* arrays,
struct ArrowSchema* array_schemas,
struct ArrowSchema* schema) {
SCOPE_CGO_CALL_METRIC();
try {
auto writer = static_cast<milvus_storage::api::Writer*>(writer_handle);
auto import_schema = arrow::ImportSchema(schema);
if (!import_schema.ok()) {
return milvus::FailureCStatus(
milvus::ErrorCode::FileWriteFailed,
"Failed to import schema: " +
import_schema.status().ToString());
}
auto arrow_schema = import_schema.ValueOrDie();

int num_fields = arrow_schema->num_fields();
std::vector<std::shared_ptr<arrow::Array>> all_arrays;
all_arrays.reserve(num_fields);

for (int i = 0; i < num_fields; i++) {
auto array = arrow::ImportArray(&arrays[i], &array_schemas[i]);
if (!array.ok()) {
return milvus::FailureCStatus(
milvus::ErrorCode::FileWriteFailed,
"Failed to import array " + std::to_string(i) + ": " +
array.status().ToString());
}
all_arrays.push_back(array.ValueOrDie());
}

auto record_batch = arrow::RecordBatch::Make(
arrow_schema, all_arrays[0]->length(), all_arrays);

auto status = writer->write(record_batch);
AssertInfo(
status.ok(), "failed to write batch, error: {}", status.ToString());
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(&e);
}
}

CStatus
ClosePackedLoonWriter(LoonWriterHandler writer_handle,
const char* base_path,
CStorageConfig c_storage_config,
int64_t* latest_version) {
SCOPE_CGO_CALL_METRIC();
try {
auto writer = static_cast<milvus_storage::api::Writer*>(writer_handle);
// delete writer;
auto status = writer->flush();
AssertInfo(status.ok(),
"failed to flush writer, error: {}",
status.ToString());

auto result = writer->close();
AssertInfo(result.ok(), "failed to close writer");

auto cgs = result.ValueOrDie();

auto manifest_content = cgs->serialize().ValueOrDie();

delete writer;

auto properties =
MakeInternalPropertiesFromStorageConfig(c_storage_config);
AssertInfo(properties != nullptr, "properties is null");

// commit first
{
auto transaction = std::make_unique<
milvus_storage::api::transaction::TransactionImpl<
milvus_storage::api::ColumnGroups>>(*properties, base_path);
auto status = transaction->begin();
AssertInfo(status.ok(),
"failed to begin transaction, error: {}",
status.ToString());

auto commit_result = transaction->commit(
cgs,
milvus_storage::api::transaction::UpdateType::APPENDFILES,
milvus_storage::api::transaction::TransResolveStrategy::
RESOLVE_FAIL);

AssertInfo(commit_result.ok(),
"failed to commit transaction, error: {}",
commit_result.status().ToString());

AssertInfo(commit_result.ValueOrDie(),
"failed to commit transaction, error: {}",
commit_result.status().ToString());
}

// get latest version, temp solution here
{
auto transaction = std::make_unique<
milvus_storage::api::transaction::TransactionImpl<
milvus_storage::api::ColumnGroups>>(*properties, base_path);
auto latest_manifest_result = transaction->get_latest_manifest();
AssertInfo(latest_manifest_result.ok(),
"failed to get latest manifest, error: {}",
latest_manifest_result.status().ToString());
*latest_version = transaction->read_version();
LOG_WARN(
"Loon writer transaction(base path {}) succeeded with latest "
"version: {}",
base_path,
*latest_version);
}

return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(&e);
}
}
71 changes: 71 additions & 0 deletions internal/core/src/storage/loon_ffi/ffi_writer_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,74 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#ifdef __cplusplus
extern "C" {
#endif

#include "common/common_type_c.h"
#include "common/type_c.h"
#include <arrow/c/abi.h>
#include "milvus-storage/ffi_c.h"

// Opaque handle to a Loon packed writer instance
typedef void* LoonWriterHandler;

// NewPackedLoonWriter creates a new packed writer for writing data to Loon storage.
//
// Parameters:
// - base_path: The base path in object storage where data will be written
// - schema: Arrow schema defining the structure of data to be written
// - c_storage_config: Storage configuration (endpoint, credentials, bucket, etc.)
// - split_pattern: Pattern defining how columns should be split into groups
// - writer_handle: Output parameter that receives the created writer handle
//
// Returns:
// - CStatus indicating success or failure with error message
CStatus
NewPackedLoonWriter(const char* base_path,
struct ArrowSchema* schema,
CStorageConfig c_storage_config,
const char* split_pattern,
LoonWriterHandler* writer_handle);

// PackedLoonWrite writes a batch of Arrow arrays to the Loon packed writer.
//
// Parameters:
// - writer_handle: The writer handle created by NewPackedLoonWriter
// - arrays: Arrow arrays containing the data to write
// - array_schemas: Schemas for the individual arrays
// - schema: The overall schema for the write operation
//
// Returns:
// - CStatus indicating success or failure with error message
CStatus
PackedLoonWrite(LoonWriterHandler writer_handle,
struct ArrowArray* arrays,
struct ArrowSchema* array_schemas,
struct ArrowSchema* schema);

// ClosePackedLoonWriter finalizes the writer and commits all data to storage.
//
// This function flushes any buffered data, writes the manifest file,
// and releases all resources associated with the writer.
//
// Parameters:
// - writer_handle: The writer handle to close (will be invalidated after this call)
// - base_path: The base path where data was written
// - c_storage_config: Storage configuration for finalizing the write
// - latest_version: Output parameter that receives the version number of the written data
//
// Returns:
// - CStatus indicating success or failure with error message
CStatus
ClosePackedLoonWriter(LoonWriterHandler writer_handle,
const char* base_path,
CStorageConfig c_storage_config,
int64_t* latest_version);

#ifdef __cplusplus
}
#endif
2 changes: 1 addition & 1 deletion internal/core/thirdparty/milvus-storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# Update milvus-storage_VERSION for the first occurrence
milvus_add_pkg_config("milvus-storage")
set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "")
set( milvus-storage_VERSION 302143c)
set( milvus-storage_VERSION ba7df7b)
set( GIT_REPOSITORY "https://github.com/milvus-io/milvus-storage.git")
message(STATUS "milvus-storage repo: ${GIT_REPOSITORY}")
message(STATUS "milvus-storage version: ${milvus-storage_VERSION}")
Expand Down
1 change: 1 addition & 0 deletions internal/datacoord/task_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ func (it *indexBuildTask) prepareJobRequest(ctx context.Context, segment *Segmen
TaskSlot: it.taskSlot,
LackBinlogRows: segIndex.NumRows - totalRows,
InsertLogs: segment.GetBinlogs(),
Manifest: segment.GetManifestPath(),
}

WrapPluginContext(segment.GetCollectionID(), schema.GetProperties(), req)
Expand Down
1 change: 1 addition & 0 deletions internal/datanode/index/task_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ func (it *indexBuildTask) Execute(ctx context.Context) error {
it.req.GetCollectionID(),
it.req.GetPartitionID(),
it.req.GetSegmentID())
buildIndexParams.Manifest = it.req.GetManifest()
}
log.Info("create index", zap.Any("buildIndexParams", buildIndexParams))

Expand Down
Loading
Loading