Skip to content

Commit b69df28

Browse files
Protect catalog change for high throughput (#121)
1 parent d972ca8 commit b69df28

File tree

2 files changed

+6
-1
lines changed

2 files changed

+6
-1
lines changed

engine/db/catalog/basic_meta_impl.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ Status BasicMetaImpl::LoadDatabase(const std::string& db_catalog_path, const std
263263
}
264264

265265
databases_[db_name] = db_schema;
266+
db_mutexes_[db_name]; // Initializes a mutex for the new database
266267
loaded_databases_paths_.insert(db_catalog_path);
267268

268269
return Status::OK();
@@ -304,6 +305,7 @@ Status BasicMetaImpl::DropDatabase(const std::string& db_name) {
304305
}
305306
loaded_databases_paths_.erase(path);
306307
databases_.erase(db_name);
308+
db_mutexes_.erase(db_name); // Remove mutex for the dropped database
307309
return Status::OK();
308310
}
309311

@@ -445,6 +447,7 @@ Status ValidateSchema(TableSchema& table_schema, std::vector<EmbeddingModel> &em
445447
}
446448

447449
Status BasicMetaImpl::CreateTable(const std::string& db_name, TableSchema& table_schema, size_t& table_id) {
450+
std::lock_guard<std::mutex> lock(db_mutexes_[db_name]); // Acquire lock for this database
448451
// Table name cannot be duplicated.
449452
bool has_table = false;
450453
auto status = HasTable(db_name, table_schema.name_, has_table);
@@ -519,6 +522,7 @@ Status BasicMetaImpl::GetTable(const std::string& db_name, const std::string& ta
519522
}
520523

521524
Status BasicMetaImpl::DropTable(const std::string& db_name, const std::string& table_name) {
525+
std::lock_guard<std::mutex> lock(db_mutexes_[db_name]); // Acquire lock for this database
522526
auto it = databases_.find(db_name);
523527
if (it == databases_.end()) {
524528
return Status(DB_NOT_FOUND, "Database not found: " + db_name);

engine/db/catalog/basic_meta_impl.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
#include <atomic>
55
#include <unordered_map>
66
#include <unordered_set>
7-
7+
#include <mutex>
88
#include "db/catalog/meta.hpp"
99
#include "services/embedding_service.hpp"
1010

@@ -42,6 +42,7 @@ class BasicMetaImpl : public Meta {
4242
void InjectEmbeddingService(std::shared_ptr<vectordb::engine::EmbeddingService> embedding_service) override;
4343

4444
private:
45+
std::unordered_map<std::string, std::mutex> db_mutexes_; // Map to hold a mutex for each database
4546
std::unordered_map<std::string, DatabaseSchema> databases_;
4647
std::unordered_set<std::string> loaded_databases_paths_; // We cannot allow loading the same database twice
4748
// If the segment is leader (handle sync to storage) or follower (passively sync from storage)

0 commit comments

Comments
 (0)