Skip to content

Commit 8838210

Browse files
API to tune DB parameters (#130)
* Extract configurations * Reduce default rebuild thread to 1
1 parent be9ca9d commit 8838210

File tree

8 files changed

+236
-21
lines changed

8 files changed

+236
-21
lines changed

engine/config/config.hpp

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
#pragma once
2+
3+
#include <algorithm>
4+
#include <atomic> // Include for std::atomic
5+
#include <cstddef>
6+
#include <exception>
7+
#include <memory>
8+
#include <set>
9+
#include <string>
10+
#include <vector>
11+
#include <stdexcept> // Include for std::invalid_argument
12+
13+
#include "utils/json.hpp"
14+
15+
namespace vectordb {
16+
17+
struct Config {
18+
std::atomic<int> IntraQueryThreads{4};
19+
std::atomic<int> MasterQueueSize{500};
20+
std::atomic<int> LocalQueueSize{500};
21+
std::atomic<int> GlobalSyncInterval{15};
22+
std::atomic<int> MinimalGraphSize{100};
23+
std::atomic<int> NumExecutorPerField{16};
24+
std::atomic<int> RebuildThreads{1};
25+
26+
// Setter method for IntraQueryThreads
27+
void setIntraQueryThreads(int value) {
28+
if (value >= 1 && value <= 128) {
29+
IntraQueryThreads.store(value, std::memory_order_relaxed);
30+
} else {
31+
throw std::invalid_argument("Invalid value for IntraQueryThreads, valid range: [1, 128]");
32+
}
33+
}
34+
35+
// Setter method for SearchQueueSize (modifies both MasterQueueSize and LocalQueueSize atomically)
36+
void setSearchQueueSize(int value) {
37+
if (value >= 500 && value <= 10000000) {
38+
MasterQueueSize.store(value, std::memory_order_relaxed);
39+
LocalQueueSize.store(value, std::memory_order_relaxed);
40+
} else {
41+
throw std::invalid_argument("Invalid value for SearchQueueSize, valid range: [500, 10000000]");
42+
}
43+
}
44+
45+
// Setter method for NumExecutorPerField
46+
void setNumExecutorPerField(int value) {
47+
if (value >= 1 && value <= 128) {
48+
NumExecutorPerField.store(value, std::memory_order_relaxed);
49+
} else {
50+
throw std::invalid_argument("Invalid value for NumExecutorPerField, valid range: [1, 128]");
51+
}
52+
}
53+
54+
// Setter method for RebuildThreads
55+
void setRebuildThreads(int value) {
56+
if (value >= 1 && value <= 128) {
57+
RebuildThreads.store(value, std::memory_order_relaxed);
58+
} else {
59+
throw std::invalid_argument("Invalid value for RebuildThreads, valid range: [1, 128]");
60+
}
61+
}
62+
63+
// A setter function that takes a JSON config, and loop through the keys and values to set the corresponding fields
64+
void updateConfig(const vectordb::Json& json, bool& needSwapExecutors) {
65+
needSwapExecutors = false;
66+
if (json.HasMember("IntraQueryThreads")) {
67+
setIntraQueryThreads(json.GetInt("IntraQueryThreads"));
68+
needSwapExecutors = true;
69+
}
70+
if (json.HasMember("ConcurrentWorkersPerIndex")) {
71+
setNumExecutorPerField(json.GetInt("ConcurrentWorkersPerIndex"));
72+
needSwapExecutors = true;
73+
}
74+
if (json.HasMember("RebuildThreads")) {
75+
setRebuildThreads(json.GetInt("RebuildThreads"));
76+
}
77+
if (json.HasMember("SearchQueueSize")) {
78+
setSearchQueueSize(json.GetInt("SearchQueueSize"));
79+
needSwapExecutors = true;
80+
}
81+
}
82+
};
83+
84+
// Global config instance
85+
inline Config globalConfig;
86+
87+
} // namespace vectordb

engine/db/db_mvp.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,5 +91,19 @@ Status DBMVP::Rebuild() {
9191
return Status::OK();
9292
}
9393

94+
Status DBMVP::SwapExecutors() {
95+
// Loop through all tables and swap executors
96+
for (int64_t i = 0; i < tables_.size(); ++i) {
97+
std::shared_ptr<TableMVP> table = tables_[i];
98+
if (table != nullptr) {
99+
auto status = table->SwapExecutors();
100+
if (!status.ok()) {
101+
std::cout << "Swap executors for table " << table->table_schema_.name_ << " failed." << std::endl;
102+
}
103+
}
104+
}
105+
return Status::OK();
106+
}
107+
94108
} // namespace engine
95109
} // namespace vectordb

engine/db/db_mvp.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ class DBMVP {
2929
std::vector<std::string> GetTables();
3030
std::shared_ptr<TableMVP> GetTable(const std::string& table_name);
3131
Status Rebuild();
32+
Status SwapExecutors();
3233

3334
void SetWALEnabled(bool enabled) {
3435
for (auto table : tables_) {

engine/db/db_server.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,21 @@ Status DBServer::Rebuild() {
207207
return Status::OK();
208208
}
209209

210+
Status DBServer::SwapExecutors() {
211+
// Loop through all dbs and swap executors
212+
for (int64_t i = 0; i < dbs_.size(); ++i) {
213+
std::shared_ptr<DBMVP> db = dbs_[i];
214+
if (db != nullptr) {
215+
auto status = db->SwapExecutors();
216+
if (!status.ok()) {
217+
std::cout << "Swap executors for db of " << db->db_catalog_path_ << " failed."
218+
<< std::endl;
219+
}
220+
}
221+
}
222+
return Status::OK();
223+
}
224+
210225
Status DBServer::ListTables(const std::string& db_name, std::vector<std::string>& table_names) {
211226
auto db = GetDB(db_name);
212227
if (db == nullptr) {

engine/db/db_server.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ class DBServer {
102102

103103
Status Rebuild();
104104

105+
Status SwapExecutors();
106+
105107
void InjectEmbeddingService(std::string& embedding_service_url) {
106108
embedding_service_ = std::make_shared<vectordb::engine::EmbeddingService>(embedding_service_url);
107109
meta_->InjectEmbeddingService(embedding_service_);

engine/db/table_mvp.cpp

Lines changed: 80 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,12 @@
66

77
#include "db/catalog/meta_types.hpp"
88

9+
#include "config/config.hpp"
10+
911
namespace vectordb {
12+
13+
extern Config globalConfig;
14+
1015
namespace engine {
1116

1217
TableMVP::TableMVP(meta::TableSchema &table_schema,
@@ -65,7 +70,7 @@ TableMVP::TableMVP(meta::TableSchema &table_schema,
6570
auto distFunc = GetDistFunc(fType, mType);
6671

6772
auto pool = std::make_shared<execution::ExecutorPool>();
68-
for (int executorIdx = 0; executorIdx < NumExecutorPerField;
73+
for (int executorIdx = 0; executorIdx < globalConfig.NumExecutorPerField;
6974
executorIdx++) {
7075
pool->release(std::make_shared<execution::VecSearchExecutor>(
7176
table_schema_.fields_[i].vector_dimension_,
@@ -75,10 +80,10 @@ TableMVP::TableMVP(meta::TableSchema &table_schema,
7580
columnData,
7681
distFunc,
7782
&table_schema_.fields_[i].vector_dimension_,
78-
IntraQueryThreads,
79-
MasterQueueSize,
80-
LocalQueueSize,
81-
GlobalSyncInterval));
83+
globalConfig.IntraQueryThreads,
84+
globalConfig.MasterQueueSize,
85+
globalConfig.LocalQueueSize,
86+
globalConfig.GlobalSyncInterval));
8287
}
8388
executor_pool_.push_back(pool);
8489
}
@@ -87,7 +92,8 @@ TableMVP::TableMVP(meta::TableSchema &table_schema,
8792

8893
Status TableMVP::Rebuild(const std::string &db_catalog_path) {
8994
// Limit how many threads rebuild takes.
90-
omp_set_num_threads(RebuildThreads);
95+
omp_set_num_threads(globalConfig.RebuildThreads);
96+
std::cout << "Rebuild table segment with threads: " << globalConfig.RebuildThreads << std::endl;
9197

9298
// Get the current record number.
9399
int64_t record_number = table_segment_->record_number_;
@@ -112,7 +118,7 @@ Status TableMVP::Rebuild(const std::string &db_catalog_path) {
112118
fType == meta::FieldType::SPARSE_VECTOR_FLOAT ||
113119
fType == meta::FieldType::SPARSE_VECTOR_DOUBLE) {
114120
if (ann_graph_segment_[index]->record_number_ == record_number ||
115-
record_number < MinimalGraphSize) {
121+
record_number < globalConfig.MinimalGraphSize) {
116122
// No need to rebuild the ann graph.
117123
std::cout << "Skip rebuild ANN graph for attribute: "
118124
<< table_schema_.fields_[i].name_ << std::endl;
@@ -171,7 +177,7 @@ Status TableMVP::Rebuild(const std::string &db_catalog_path) {
171177
auto pool = std::make_shared<execution::ExecutorPool>();
172178
auto distFunc = GetDistFunc(fType, mType);
173179

174-
for (int executorIdx = 0; executorIdx < NumExecutorPerField;
180+
for (int executorIdx = 0; executorIdx < globalConfig.NumExecutorPerField;
175181
executorIdx++) {
176182
pool->release(std::make_shared<execution::VecSearchExecutor>(
177183
table_schema_.fields_[i].vector_dimension_,
@@ -182,10 +188,10 @@ Status TableMVP::Rebuild(const std::string &db_catalog_path) {
182188
columnData,
183189
distFunc,
184190
&table_schema_.fields_[i].vector_dimension_,
185-
IntraQueryThreads,
186-
MasterQueueSize,
187-
LocalQueueSize,
188-
GlobalSyncInterval));
191+
globalConfig.IntraQueryThreads,
192+
globalConfig.MasterQueueSize,
193+
globalConfig.LocalQueueSize,
194+
globalConfig.GlobalSyncInterval));
189195
}
190196
std::unique_lock<std::mutex> lock(executor_pool_mutex_);
191197
executor_pool_.set(index, pool);
@@ -199,6 +205,68 @@ Status TableMVP::Rebuild(const std::string &db_catalog_path) {
199205
return Status::OK();
200206
}
201207

208+
Status TableMVP::SwapExecutors() {
209+
// Get the current record number.
210+
int64_t record_number = table_segment_->record_number_;
211+
212+
int64_t index = 0;
213+
for (int i = 0; i < table_schema_.fields_.size(); ++i) {
214+
auto fType = table_schema_.fields_[i].field_type_;
215+
auto mType = table_schema_.fields_[i].metric_type_;
216+
217+
if (fType == meta::FieldType::VECTOR_FLOAT ||
218+
fType == meta::FieldType::VECTOR_DOUBLE ||
219+
fType == meta::FieldType::SPARSE_VECTOR_FLOAT ||
220+
fType == meta::FieldType::SPARSE_VECTOR_DOUBLE) {
221+
222+
VectorColumnData columnData;
223+
if (fType == meta::FieldType::VECTOR_FLOAT || fType == meta::FieldType::VECTOR_DOUBLE) {
224+
columnData = table_segment_
225+
->vector_tables_[table_segment_->field_name_mem_offset_map_
226+
[table_schema_.fields_[i].name_]];
227+
} else {
228+
// sparse vector
229+
columnData = &table_segment_
230+
->var_len_attr_table_[table_segment_->field_name_mem_offset_map_
231+
[table_schema_.fields_[i].name_]];
232+
}
233+
234+
// Rebuild the ann graph.
235+
std::cout << "Swap executors for attribute: "
236+
<< table_schema_.fields_[i].name_ << std::endl;
237+
238+
// Replace the executors.
239+
auto pool = std::make_shared<execution::ExecutorPool>();
240+
auto distFunc = GetDistFunc(fType, mType);
241+
242+
for (int executorIdx = 0; executorIdx < globalConfig.NumExecutorPerField;
243+
executorIdx++) {
244+
pool->release(std::make_shared<execution::VecSearchExecutor>(
245+
table_schema_.fields_[i].vector_dimension_,
246+
ann_graph_segment_[index]->navigation_point_,
247+
ann_graph_segment_[index],
248+
ann_graph_segment_[index]->offset_table_,
249+
ann_graph_segment_[index]->neighbor_list_,
250+
columnData,
251+
distFunc,
252+
&table_schema_.fields_[i].vector_dimension_,
253+
globalConfig.IntraQueryThreads,
254+
globalConfig.MasterQueueSize,
255+
globalConfig.LocalQueueSize,
256+
globalConfig.GlobalSyncInterval));
257+
}
258+
std::unique_lock<std::mutex> lock(executor_pool_mutex_);
259+
executor_pool_.set(index, pool);
260+
lock.unlock();
261+
262+
++index;
263+
}
264+
}
265+
266+
std::cout << "Swap executors done." << std::endl;
267+
return Status::OK();
268+
}
269+
202270
Status TableMVP::Insert(vectordb::Json &record, std::unordered_map<std::string, std::string> &headers, bool upsert) {
203271
int64_t wal_id =
204272
wal_->WriteEntry(upsert ? LogEntryType::UPSERT : LogEntryType::INSERT, record.DumpToString());

engine/db/table_mvp.hpp

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,6 @@
2525
namespace vectordb {
2626
namespace engine {
2727

28-
constexpr const int IntraQueryThreads = 4;
29-
constexpr const int MasterQueueSize = 500;
30-
constexpr const int LocalQueueSize = 500;
31-
constexpr const int GlobalSyncInterval = 15;
32-
constexpr const int MinimalGraphSize = 100;
33-
constexpr const int NumExecutorPerField = 16;
34-
35-
constexpr const int RebuildThreads = 4;
36-
3728
class TableMVP {
3829
public:
3930
explicit TableMVP(
@@ -47,6 +38,9 @@ class TableMVP {
4738
// Rebuild the table and ann graph, and save to disk.
4839
Status Rebuild(const std::string &db_catalog_path);
4940

41+
// Swap executors during config change.
42+
Status SwapExecutors();
43+
5044
Status Insert(vectordb::Json &records, std::unordered_map<std::string, std::string> &headers, bool upsert = false);
5145

5246
Status InsertPrepare(vectordb::Json &pks, vectordb::Json &result);

engine/server/web_server/web_controller.hpp

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,14 @@
2424
#include "utils/json.hpp"
2525
#include "utils/status.hpp"
2626
#include "utils/constants.hpp"
27+
#include "config/config.hpp"
2728

2829
#define WEB_LOG_PREFIX "[Web] "
2930

3031
namespace vectordb {
32+
33+
extern Config globalConfig;
34+
3135
namespace server {
3236
namespace web {
3337

@@ -895,6 +899,36 @@ class WebController : public oatpp::web::server::api::ApiController {
895899
return createDtoResponse(Status::CODE_200, dto);
896900
}
897901

902+
ADD_CORS(UpdateConfig)
903+
904+
ENDPOINT("POST", "api/config", UpdateConfig, BODY_STRING(String, body)) {
905+
vectordb::Json parsedBody;
906+
auto dto = StatusDto::createShared();
907+
auto valid = parsedBody.LoadFromString(body);
908+
if (!valid) {
909+
dto->statusCode = Status::CODE_400.code;
910+
dto->message = "Invalid payload.";
911+
return createDtoResponse(Status::CODE_400, dto);
912+
}
913+
914+
try {
915+
bool needSwapExecutors = false;
916+
globalConfig.updateConfig(parsedBody, needSwapExecutors);
917+
if (needSwapExecutors) {
918+
// Swap executors if necessary.
919+
db_server->SwapExecutors();
920+
}
921+
} catch (std::exception& ex) {
922+
dto->statusCode = Status::CODE_500.code;
923+
dto->message = std::string(ex.what());
924+
return createDtoResponse(Status::CODE_500, dto);
925+
}
926+
927+
dto->statusCode = Status::CODE_200.code;
928+
dto->message = std::string("Config updated successfully.");
929+
return createDtoResponse(Status::CODE_200, dto);
930+
}
931+
898932
/**
899933
* Finish ENDPOINTs generation ('ApiController' codegen)
900934
*/

0 commit comments

Comments
 (0)