Skip to content

Commit eace969

Browse files
committed
response to the reviewer
Signed-off-by: zhengyu <[email protected]>
1 parent 60b39fc commit eace969

14 files changed

+111
-53
lines changed

be/src/cloud/cloud_storage_engine.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -280,8 +280,9 @@ bool CloudStorageEngine::stopped() {
280280

281281
Result<BaseTabletSPtr> CloudStorageEngine::get_tablet(int64_t tablet_id,
282282
SyncRowsetStats* sync_stats,
283-
bool force_use_cache) {
284-
return _tablet_mgr->get_tablet(tablet_id, false, true, sync_stats, force_use_cache)
283+
bool force_use_cache, bool cache_on_miss) {
284+
return _tablet_mgr
285+
->get_tablet(tablet_id, false, true, sync_stats, force_use_cache, cache_on_miss)
285286
.transform([](auto&& t) { return static_pointer_cast<BaseTablet>(std::move(t)); });
286287
}
287288

be/src/cloud/cloud_storage_engine.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ class CloudStorageEngine final : public BaseStorageEngine {
6363
bool stopped() override;
6464

6565
Result<BaseTabletSPtr> get_tablet(int64_t tablet_id, SyncRowsetStats* sync_stats = nullptr,
66-
bool force_use_cache = false) override;
66+
bool force_use_cache = false,
67+
bool cache_on_miss = true) override;
6768

6869
Status start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr = nullptr) override;
6970

be/src/cloud/cloud_tablet_mgr.cpp

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,8 @@ void set_tablet_access_time_ms(CloudTablet* tablet) {
161161
Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_id, bool warmup_data,
162162
bool sync_delete_bitmap,
163163
SyncRowsetStats* sync_stats,
164-
bool local_only) {
164+
bool local_only,
165+
bool cache_on_miss) {
165166
// LRU value type. `Value`'s lifetime MUST NOT be longer than `CloudTabletMgr`
166167
class Value : public LRUCacheValueBase {
167168
public:
@@ -195,7 +196,7 @@ Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_i
195196
if (sync_stats) {
196197
++sync_stats->tablet_meta_cache_miss;
197198
}
198-
auto load_tablet = [this, &key, warmup_data, sync_delete_bitmap,
199+
auto load_tablet = [this, warmup_data, sync_delete_bitmap,
199200
sync_stats](int64_t tablet_id) -> Result<std::shared_ptr<CloudTablet>> {
200201
TabletMetaSharedPtr tablet_meta;
201202
auto start = std::chrono::steady_clock::now();
@@ -211,7 +212,6 @@ Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_i
211212
}
212213

213214
auto tablet = std::make_shared<CloudTablet>(_engine, std::move(tablet_meta));
214-
auto value = std::make_unique<Value>(tablet, *_tablet_map);
215215
// MUST sync stats to let compaction scheduler work correctly
216216
SyncOptions options;
217217
options.warmup_delta_data = warmup_data;
@@ -221,16 +221,7 @@ Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_i
221221
LOG(WARNING) << "failed to sync tablet " << tablet_id << ": " << st;
222222
return ResultError(st);
223223
}
224-
225-
auto* handle = _cache->insert(key, value.release(), 1, sizeof(CloudTablet),
226-
CachePriority::NORMAL);
227-
auto ret =
228-
std::shared_ptr<CloudTablet>(tablet.get(), [this, handle](CloudTablet* tablet) {
229-
set_tablet_access_time_ms(tablet);
230-
_cache->release(handle);
231-
});
232-
_tablet_map->put(std::move(tablet));
233-
return ret;
224+
return tablet;
234225
};
235226

236227
auto load_result = s_singleflight_load_tablet.load(tablet_id, std::move(load_tablet));
@@ -239,8 +230,22 @@ Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_i
239230
load_result.error()));
240231
}
241232
auto tablet = load_result.value();
242-
set_tablet_access_time_ms(tablet.get());
243-
return tablet;
233+
if (!cache_on_miss) {
234+
set_tablet_access_time_ms(tablet.get());
235+
return tablet;
236+
}
237+
238+
auto value = std::make_unique<Value>(tablet, *_tablet_map);
239+
auto* insert_handle =
240+
_cache->insert(key, value.release(), 1, sizeof(CloudTablet), CachePriority::NORMAL);
241+
auto ret = std::shared_ptr<CloudTablet>(tablet.get(),
242+
[this, insert_handle](CloudTablet* tablet_ptr) {
243+
set_tablet_access_time_ms(tablet_ptr);
244+
_cache->release(insert_handle);
245+
});
246+
_tablet_map->put(std::move(tablet));
247+
set_tablet_access_time_ms(ret.get());
248+
return ret;
244249
}
245250
if (sync_stats) {
246251
++sync_stats->tablet_meta_cache_hit;

be/src/cloud/cloud_tablet_mgr.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ class CloudTabletMgr {
4747
Result<std::shared_ptr<CloudTablet>> get_tablet(int64_t tablet_id, bool warmup_data = false,
4848
bool sync_delete_bitmap = true,
4949
SyncRowsetStats* sync_stats = nullptr,
50-
bool local_only = false);
50+
bool local_only = false,
51+
bool cache_on_miss = true);
5152

5253
void erase_tablet(int64_t tablet_id);
5354

be/src/common/config.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1161,7 +1161,8 @@ DEFINE_mInt64(file_cache_background_block_lru_update_interval_ms, "5000");
11611161
DEFINE_mInt64(file_cache_background_block_lru_update_qps_limit, "1000");
11621162
DEFINE_mBool(enable_reader_dryrun_when_download_file_cache, "true");
11631163
DEFINE_mInt64(file_cache_background_monitor_interval_ms, "5000");
1164-
DEFINE_mInt64(file_cache_background_ttl_gc_interval_ms, "3000");
1164+
DEFINE_mInt64(file_cache_background_ttl_gc_interval_ms, "180000");
1165+
DEFINE_mInt64(file_cache_background_ttl_info_update_interval_ms, "180000");
11651166
DEFINE_mInt64(file_cache_background_ttl_gc_batch, "1000");
11661167
DEFINE_mInt64(file_cache_background_lru_dump_interval_ms, "60000");
11671168
// dump queue only if the queue update specific times through several dump intervals

be/src/common/config.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1183,6 +1183,7 @@ DECLARE_mInt64(file_cache_background_block_lru_update_qps_limit);
11831183
DECLARE_mBool(enable_reader_dryrun_when_download_file_cache);
11841184
DECLARE_mInt64(file_cache_background_monitor_interval_ms);
11851185
DECLARE_mInt64(file_cache_background_ttl_gc_interval_ms);
1186+
DECLARE_mInt64(file_cache_background_ttl_info_update_interval_ms);
11861187
DECLARE_mInt64(file_cache_background_ttl_gc_batch);
11871188
DECLARE_Int32(file_cache_downloader_thread_num_min);
11881189
DECLARE_Int32(file_cache_downloader_thread_num_max);

be/src/io/cache/block_file_cache.cpp

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,6 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path,
285285

286286
_lru_recorder = std::make_unique<LRUQueueRecorder>(this);
287287
_lru_dumper = std::make_unique<CacheLRUDumper>(this, _lru_recorder.get());
288-
_ttl_mgr = std::make_unique<BlockFileCacheTtlMgr>(this, _meta_store.get());
289288
if (cache_settings.storage == "memory") {
290289
_storage = std::make_unique<MemFileCacheStorage>();
291290
_cache_base_path = "memory";
@@ -384,6 +383,13 @@ Status BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lo
384383
restore_lru_queues_from_disk(cache_lock);
385384
}
386385
RETURN_IF_ERROR(_storage->init(this));
386+
if (!_ttl_mgr) {
387+
if (auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get())) {
388+
if (auto* meta_store = fs_storage->get_meta_store()) {
389+
_ttl_mgr = std::make_unique<BlockFileCacheTtlMgr>(this, meta_store);
390+
}
391+
}
392+
}
387393
_cache_background_monitor_thread = std::thread(&BlockFileCache::run_background_monitor, this);
388394
_cache_background_gc_thread = std::thread(&BlockFileCache::run_background_gc, this);
389395
_cache_background_evict_in_advance_thread =
@@ -641,7 +647,9 @@ FileBlocks BlockFileCache::split_range_into_cells(const UInt128Wrapper& hash,
641647
cell->update_atime();
642648
}
643649
}
644-
_ttl_mgr->register_tablet_id(context.tablet_id);
650+
if (_ttl_mgr && context.tablet_id != 0) {
651+
_ttl_mgr->register_tablet_id(context.tablet_id);
652+
}
645653
}
646654

647655
current_pos += current_size;
@@ -2140,6 +2148,21 @@ std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const UInt128W
21402148
return offset_to_block;
21412149
}
21422150

2151+
void BlockFileCache::modify_expiration_time(const UInt128Wrapper& hash, uint64_t expiration_time) {
2152+
SCOPED_CACHE_LOCK(_mutex, this);
2153+
if (auto iter = _files.find(hash); iter != _files.end()) {
2154+
for (auto& [_, cell] : iter->second) {
2155+
if (cell.file_block) {
2156+
auto st = cell.file_block->update_expiration_time(expiration_time);
2157+
if (!st.ok()) {
2158+
LOG(WARNING) << "Failed to update expiration time for block "
2159+
<< cell.file_block->get_info_for_log() << ", error=" << st;
2160+
}
2161+
}
2162+
}
2163+
}
2164+
}
2165+
21432166
void BlockFileCache::update_ttl_atime(const UInt128Wrapper& hash) {
21442167
SCOPED_CACHE_LOCK(_mutex, this);
21452168
if (auto iter = _files.find(hash); iter != _files.end()) {

be/src/io/cache/block_file_cache.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include <optional>
2828
#include <thread>
2929

30+
#include "io/cache/block_file_cache_ttl_mgr.h"
3031
#include "io/cache/cache_lru_dumper.h"
3132
#include "io/cache/file_block.h"
3233
#include "io/cache/file_cache_common.h"
@@ -72,7 +73,6 @@ class LockScopedTimer {
7273
LockScopedTimer cache_lock_timer;
7374

7475
class FSFileCacheStorage;
75-
class BlockFileCacheTtlMgr;
7676

7777
// The BlockFileCache is responsible for the management of the blocks
7878
// The current strategies are lru and ttl.
@@ -207,6 +207,8 @@ class BlockFileCache {
207207
std::string reset_capacity(size_t new_capacity);
208208

209209
std::map<size_t, FileBlockSPtr> get_blocks_by_key(const UInt128Wrapper& hash);
210+
/// Adjust expiration time for every block sharing the specified hash key.
211+
void modify_expiration_time(const UInt128Wrapper& hash, uint64_t expiration_time);
210212
/// For debug and UT
211213
std::string dump_structure(const UInt128Wrapper& hash);
212214
std::string dump_single_cache_type(const UInt128Wrapper& hash, size_t offset);

be/src/io/cache/block_file_cache_ttl_mgr.cpp

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
#include "io/cache/block_file_cache.h"
3030
#include "io/cache/cache_block_meta_store.h"
3131
#include "io/cache/file_block.h"
32+
#include "olap/base_tablet.h"
33+
#include "runtime/exec_env.h"
3234
#include "util/time.h"
3335

3436
namespace doris::io {
@@ -71,7 +73,6 @@ FileBlocks BlockFileCacheTtlMgr::get_file_blocks_from_tablet_id(int64_t tablet_i
7173

7274
while (iterator->valid()) {
7375
BlockMetaKey key = iterator->key();
74-
BlockMeta meta = iterator->value();
7576

7677
// Get all blocks for this hash using get_blocks_by_key
7778
try {
@@ -108,32 +109,45 @@ void BlockFileCacheTtlMgr::run_backgroud_update_ttl_info_map() {
108109
}
109110

110111
for (int64_t tablet_id : tablet_ids_to_process) {
111-
// TODO(zhengyu): Implement actual CloudMetaMgr or CloudTabletMgr integration
112-
// For now, we'll use placeholder values
113-
uint64_t create_time = 0;
112+
uint64_t tablet_ctime = 0;
114113
uint64_t ttl = 0;
115114

116-
// Simulate getting tablet metadata
117-
// This should be replaced with actual CloudMetaMgr::get_tablet_meta call
118-
bool has_ttl = (tablet_id % 10 == 0); // Example condition
119-
if (has_ttl) {
120-
create_time = UnixSeconds(); // Current time as create time
121-
ttl = 3600; // 1 hour TTL
115+
auto res = ExecEnv::get_tablet(tablet_id, nullptr, false, false);
116+
if (!res.has_value()) {
117+
LOG(WARNING) << "Failed to get tablet for tablet_id: " << tablet_id
118+
<< ", err: " << res.error();
119+
continue;
120+
}
121+
122+
auto tablet = res.value();
123+
const auto& tablet_meta = tablet->tablet_meta();
124+
if (tablet_meta != nullptr) {
125+
tablet_ctime = tablet_meta->creation_time();
126+
}
127+
128+
int64_t ttl_seconds = tablet->ttl_seconds();
129+
if (ttl_seconds > 0 && tablet_ctime > 0) {
130+
ttl = static_cast<uint64_t>(ttl_seconds);
122131
}
123132

124133
// Update TTL info map
125134
{
126135
std::lock_guard<std::mutex> lock(_ttl_info_mutex);
127136
if (ttl > 0) {
128-
_ttl_info_map[tablet_id] = TtlInfo {ttl, create_time};
137+
auto old_info_it = _ttl_info_map.find(tablet_id);
138+
bool was_zero_ttl = (old_info_it == _ttl_info_map.end() ||
139+
old_info_it->second.ttl == 0);
140+
_ttl_info_map[tablet_id] = TtlInfo {ttl, tablet_ctime};
129141

130142
// If TTL changed from 0 to non-zero, convert blocks to TTL type
131-
auto old_info_it = _ttl_info_map.find(tablet_id);
132-
if (old_info_it == _ttl_info_map.end() || old_info_it->second.ttl == 0) {
143+
if (was_zero_ttl) {
133144
FileBlocks blocks = get_file_blocks_from_tablet_id(tablet_id);
134145
for (auto& block : blocks) {
135146
if (block->cache_type() != FileCacheType::TTL) {
136-
block->change_cache_type(FileCacheType::TTL);
147+
auto st = block->change_cache_type(FileCacheType::TTL);
148+
if (!st.ok()) {
149+
LOG(WARNING) << "Failed to convert block to TTL cache_type";
150+
}
137151
}
138152
}
139153
}
@@ -144,9 +158,8 @@ void BlockFileCacheTtlMgr::run_backgroud_update_ttl_info_map() {
144158
}
145159
}
146160

147-
// Sleep for configured interval (use existing TTL GC interval)
148-
std::this_thread::sleep_for(
149-
std::chrono::milliseconds(config::file_cache_background_ttl_gc_interval_ms));
161+
std::this_thread::sleep_for(std::chrono::milliseconds(
162+
config::file_cache_background_ttl_info_update_interval_ms));
150163

151164
} catch (const std::exception& e) {
152165
LOG(WARNING) << "Exception in TTL update thread: " << e.what();
@@ -160,7 +173,7 @@ void BlockFileCacheTtlMgr::run_backgroud_expiration_check() {
160173

161174
while (!_stop_background.load(std::memory_order_acquire)) {
162175
try {
163-
std::unordered_map<int64_t, TtlInfo> ttl_info_copy;
176+
std::map<int64_t, TtlInfo> ttl_info_copy;
164177

165178
// Copy TTL info for processing
166179
{
@@ -171,12 +184,15 @@ void BlockFileCacheTtlMgr::run_backgroud_expiration_check() {
171184
uint64_t current_time = UnixSeconds();
172185

173186
for (const auto& [tablet_id, ttl_info] : ttl_info_copy) {
174-
if (ttl_info.create_time + ttl_info.ttl < current_time) {
187+
if (ttl_info.tablet_ctime + ttl_info.ttl < current_time) {
175188
// Tablet has expired, convert TTL blocks back to NORMAL type
176189
FileBlocks blocks = get_file_blocks_from_tablet_id(tablet_id);
177190
for (auto& block : blocks) {
178191
if (block->cache_type() == FileCacheType::TTL) {
179-
block->change_cache_type(FileCacheType::NORMAL);
192+
auto st = block->change_cache_type(FileCacheType::NORMAL);
193+
if (!st.ok()) {
194+
LOG(WARNING) << "Failed to convert block back to NORMAL cache_type";
195+
}
180196
}
181197
}
182198

be/src/io/cache/block_file_cache_ttl_mgr.h

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <thread>
2727
#include <unordered_set>
2828

29+
#include "io/cache/file_block.h"
2930
#include "io/cache/file_cache_common.h"
3031

3132
namespace doris::io {
@@ -35,7 +36,7 @@ class CacheBlockMetaStore;
3536

3637
struct TtlInfo {
3738
uint64_t ttl;
38-
uint64_t create_time;
39+
uint64_t tablet_ctime;
3940
};
4041

4142
class BlockFileCacheTtlMgr {
@@ -45,16 +46,18 @@ class BlockFileCacheTtlMgr {
4546

4647
void register_tablet_id(int64_t tablet_id);
4748

48-
// Background thread functions
49+
// Background thread to update ttl_info_map
4950
void run_backgroud_update_ttl_info_map();
51+
// Background thread to find expired tablet and evict from ttl queue
5052
void run_backgroud_expiration_check();
5153

5254
private:
5355
FileBlocks get_file_blocks_from_tablet_id(int64_t tablet_id);
5456

5557
private:
56-
std::unordered_set<int64_t> _tablet_id_set;
57-
std::map<int64_t, TtlInfo> _ttl_info_map;
58+
// the set contains all the tablet ids which has cache data
59+
std::unordered_set<int64_t> _tablet_id_set; //TODO(zhengyu): clean up old tablet ids
60+
std::map<int64_t /* tablet_id */, TtlInfo> _ttl_info_map;
5861
BlockFileCache* _mgr;
5962
CacheBlockMetaStore* _meta_store;
6063

0 commit comments

Comments
 (0)