Skip to content

Commit d4fbd2a

Browse files
committed
add beut
Signed-off-by: zhengyu <[email protected]>
1 parent dab1863 commit d4fbd2a

File tree

7 files changed

+396
-10
lines changed

7 files changed

+396
-10
lines changed

be/src/common/config.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1163,6 +1163,7 @@ DEFINE_mBool(enable_reader_dryrun_when_download_file_cache, "true");
11631163
DEFINE_mInt64(file_cache_background_monitor_interval_ms, "5000");
11641164
DEFINE_mInt64(file_cache_background_ttl_gc_interval_ms, "180000");
11651165
DEFINE_mInt64(file_cache_background_ttl_info_update_interval_ms, "180000");
1166+
DEFINE_mInt64(file_cache_background_tablet_id_flush_interval_ms, "1000");
11661167
DEFINE_mInt64(file_cache_background_ttl_gc_batch, "1000");
11671168
DEFINE_mInt64(file_cache_background_lru_dump_interval_ms, "60000");
11681169
// dump queue only if the queue update specific times through several dump intervals

be/src/common/config.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1184,6 +1184,7 @@ DECLARE_mBool(enable_reader_dryrun_when_download_file_cache);
11841184
DECLARE_mInt64(file_cache_background_monitor_interval_ms);
11851185
DECLARE_mInt64(file_cache_background_ttl_gc_interval_ms);
11861186
DECLARE_mInt64(file_cache_background_ttl_info_update_interval_ms);
1187+
DECLARE_mInt64(file_cache_background_tablet_id_flush_interval_ms);
11871188
DECLARE_mInt64(file_cache_background_ttl_gc_batch);
11881189
DECLARE_Int32(file_cache_downloader_thread_num_min);
11891190
DECLARE_Int32(file_cache_downloader_thread_num_max);

be/src/io/cache/block_file_cache_ttl_mgr.cpp

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717

1818
#include "io/cache/block_file_cache_ttl_mgr.h"
1919

20-
#include <atomic>
2120
#include <chrono>
2221
#include <memory>
2322
#include <mutex>
2423
#include <thread>
2524
#include <unordered_set>
25+
#include <vector>
2626

2727
#include "common/config.h"
2828
#include "common/logging.h"
@@ -42,6 +42,8 @@ BlockFileCacheTtlMgr::BlockFileCacheTtlMgr(BlockFileCache* mgr, CacheBlockMetaSt
4242
std::thread(&BlockFileCacheTtlMgr::run_backgroud_update_ttl_info_map, this);
4343
_expiration_check_thread =
4444
std::thread(&BlockFileCacheTtlMgr::run_backgroud_expiration_check, this);
45+
_tablet_id_flush_thread =
46+
std::thread(&BlockFileCacheTtlMgr::run_background_tablet_id_flush, this);
4547
}
4648

4749
BlockFileCacheTtlMgr::~BlockFileCacheTtlMgr() {
@@ -54,11 +56,65 @@ BlockFileCacheTtlMgr::~BlockFileCacheTtlMgr() {
5456
if (_expiration_check_thread.joinable()) {
5557
_expiration_check_thread.join();
5658
}
59+
60+
if (_tablet_id_flush_thread.joinable()) {
61+
_tablet_id_flush_thread.join();
62+
}
5763
}
5864

5965
void BlockFileCacheTtlMgr::register_tablet_id(int64_t tablet_id) {
60-
std::lock_guard<std::mutex> lock(_tablet_id_mutex);
61-
_tablet_id_set.insert(tablet_id);
66+
_tablet_id_queue.enqueue(tablet_id);
67+
}
68+
69+
void BlockFileCacheTtlMgr::run_background_tablet_id_flush() {
70+
Thread::set_self_name("ttl_mgr_flush");
71+
72+
static constexpr size_t kBatchSize = 1024;
73+
std::vector<int64_t> pending;
74+
pending.reserve(kBatchSize);
75+
76+
auto flush_pending = [this](std::vector<int64_t>* items) {
77+
if (items->empty()) {
78+
return;
79+
}
80+
std::lock_guard<std::mutex> lock(_tablet_id_mutex);
81+
_tablet_id_set.insert(items->begin(), items->end());
82+
items->clear();
83+
};
84+
85+
while (!_stop_background.load(std::memory_order_acquire)) {
86+
bool drained = false;
87+
int64_t tablet_id = 0;
88+
while (_tablet_id_queue.try_dequeue(tablet_id)) {
89+
drained = true;
90+
pending.push_back(tablet_id);
91+
if (pending.size() >= kBatchSize) {
92+
flush_pending(&pending);
93+
}
94+
}
95+
96+
flush_pending(&pending);
97+
98+
if (!drained) {
99+
std::this_thread::sleep_for(std::chrono::milliseconds(
100+
config::file_cache_background_tablet_id_flush_interval_ms));
101+
}
102+
}
103+
104+
// Drain remaining items before exit
105+
int64_t tablet_id = 0;
106+
while (_tablet_id_queue.try_dequeue(tablet_id)) {
107+
pending.push_back(tablet_id);
108+
if (pending.size() >= kBatchSize) {
109+
std::lock_guard<std::mutex> lock(_tablet_id_mutex);
110+
_tablet_id_set.insert(pending.begin(), pending.end());
111+
pending.clear();
112+
}
113+
}
114+
if (!pending.empty()) {
115+
std::lock_guard<std::mutex> lock(_tablet_id_mutex);
116+
_tablet_id_set.insert(pending.begin(), pending.end());
117+
}
62118
}
63119

64120
FileBlocks BlockFileCacheTtlMgr::get_file_blocks_from_tablet_id(int64_t tablet_id) {
@@ -101,8 +157,6 @@ void BlockFileCacheTtlMgr::run_backgroud_update_ttl_info_map() {
101157
while (!_stop_background.load(std::memory_order_acquire)) {
102158
try {
103159
std::unordered_set<int64_t> tablet_ids_to_process;
104-
105-
// Copy tablet IDs to process
106160
{
107161
std::lock_guard<std::mutex> lock(_tablet_id_mutex);
108162
tablet_ids_to_process = _tablet_id_set;
@@ -202,7 +256,6 @@ void BlockFileCacheTtlMgr::run_backgroud_expiration_check() {
202256
}
203257
}
204258

205-
// Sleep for configured interval (use existing TTL GC interval)
206259
std::this_thread::sleep_for(
207260
std::chrono::milliseconds(config::file_cache_background_ttl_gc_interval_ms));
208261

be/src/io/cache/block_file_cache_ttl_mgr.h

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@
1919

2020
#pragma once
2121

22+
#include <concurrentqueue.h>
23+
2224
#include <atomic>
2325
#include <map>
2426
#include <mutex>
25-
#include <set>
2627
#include <thread>
2728
#include <unordered_set>
2829

@@ -50,22 +51,27 @@ class BlockFileCacheTtlMgr {
5051
void run_backgroud_update_ttl_info_map();
5152
// Background thread to find expired tablet and evict from ttl queue
5253
void run_backgroud_expiration_check();
54+
// Background thread to drain the concurrent tablet-id queue into the dedup set
55+
void run_background_tablet_id_flush();
5356

5457
private:
5558
FileBlocks get_file_blocks_from_tablet_id(int64_t tablet_id);
5659

5760
private:
58-
// the set contains all the tablet ids which has cache data
59-
std::unordered_set<int64_t> _tablet_id_set; //TODO(zhengyu): clean up old tablet ids
61+
// Tablet ids waiting to be deduplicated + set of unique ids known to have cached data
62+
moodycamel::ConcurrentQueue<int64_t> _tablet_id_queue;
63+
std::unordered_set<int64_t> _tablet_id_set; // TODO(zhengyu): clean up old tablet ids
64+
std::mutex _tablet_id_mutex;
65+
6066
std::map<int64_t /* tablet_id */, TtlInfo> _ttl_info_map;
6167
BlockFileCache* _mgr;
6268
CacheBlockMetaStore* _meta_store;
6369

6470
std::atomic<bool> _stop_background;
6571
std::thread _update_ttl_thread;
6672
std::thread _expiration_check_thread;
73+
std::thread _tablet_id_flush_thread;
6774

68-
std::mutex _tablet_id_mutex;
6975
std::mutex _ttl_info_mutex;
7076
};
7177

be/src/io/cache/cache_block_meta_store.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ size_t CacheBlockMetaStore::get_write_queue_size() const {
7474
}
7575

7676
Status CacheBlockMetaStore::init() {
77+
if (_initialized.load(std::memory_order_acquire)) {
78+
return Status::OK();
79+
}
80+
7781
std::filesystem::create_directories(_db_path);
7882

7983
_options.create_if_missing = true;
@@ -119,6 +123,7 @@ Status CacheBlockMetaStore::init() {
119123
}
120124

121125
_write_thread = std::thread(&CacheBlockMetaStore::async_write_worker, this);
126+
_initialized.store(true, std::memory_order_release);
122127

123128
return Status::OK();
124129
}

be/src/io/cache/cache_block_meta_store.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ class CacheBlockMetaStore {
118118
std::unique_ptr<rocksdb::DB> _db;
119119
rocksdb::Options _options;
120120
std::unique_ptr<rocksdb::ColumnFamilyHandle> _file_cache_meta_cf_handle;
121+
std::atomic<bool> _initialized {false};
121122

122123
enum class OperationType { PUT, DELETE };
123124
struct WriteOperation {

0 commit comments

Comments
 (0)