Skip to content

Commit a2a9195

Browse files
Merge branch 'master' into ttl-management-rebased
2 parents 1db6407 + a3ae7cb commit a2a9195

File tree

730 files changed

+15355
-6378
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

730 files changed

+15355
-6378
lines changed

be/src/agent/cgroup_cpu_ctl.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ Status CgroupCpuCtl::delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids)
292292
std::string wg_path = query_path + unused_wg_id;
293293
int ret = rmdir(wg_path.c_str());
294294
if (ret < 0) {
295-
LOG(WARNING) << "rmdir failed, path=" << wg_path;
295+
LOG(WARNING) << "remove cgroup path failed, path=" << wg_path << ", error=" << ret;
296296
failed_count++;
297297
}
298298
}
@@ -317,7 +317,8 @@ Status CgroupV1CpuCtl::init() {
317317
if (access(_cgroup_v1_cpu_tg_path.c_str(), F_OK) != 0) {
318318
int ret = mkdir(_cgroup_v1_cpu_tg_path.c_str(), S_IRWXU);
319319
if (ret != 0) {
320-
LOG(ERROR) << "cgroup v1 mkdir workload group failed, path=" << _cgroup_v1_cpu_tg_path;
320+
LOG(WARNING) << "cgroup v1 make workload group dir failed, path="
321+
<< _cgroup_v1_cpu_tg_path << ", error=" << ret;
321322
return Status::InternalError<false>("cgroup v1 mkdir workload group failed, path={}",
322323
_cgroup_v1_cpu_tg_path);
323324
}
@@ -382,6 +383,8 @@ Status CgroupV2CpuCtl::init() {
382383
if (access(_cgroup_v2_query_wg_path.c_str(), F_OK) != 0) {
383384
int ret = mkdir(_cgroup_v2_query_wg_path.c_str(), S_IRWXU);
384385
if (ret != 0) {
386+
LOG(WARNING) << "cgroup v2 make workload group dir failed, path="
387+
<< _cgroup_v2_query_wg_path << ", error=" << ret;
385388
return Status::InternalError<false>("cgroup v2 mkdir wg failed, path={}",
386389
_cgroup_v2_query_wg_path);
387390
}

be/src/agent/task_worker_pool.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,8 @@ Status _submit_task(const TAgentTaskRequest& task,
419419
// TODO(plat1ko): check task request member
420420

421421
// Set the receiving time of task so that we can determine whether it is timed out later
422+
// exist a path task_worker_pool <- agent_server <- backend_service <- BackendService
423+
// use the arg BackendService_submit_tasks_args.tasks is not const, so modify is ok
422424
(const_cast<TAgentTaskRequest&>(task)).__set_recv_time(time(nullptr));
423425
auto st = submit_op(task);
424426
if (!st.ok()) [[unlikely]] {
@@ -617,7 +619,7 @@ Status PriorTaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
617619
});
618620
}
619621

620-
Status PriorTaskWorkerPool::submit_high_prior_and_cancel_low(const TAgentTaskRequest& task) {
622+
Status PriorTaskWorkerPool::submit_high_prior_and_cancel_low(TAgentTaskRequest& task) {
621623
const TTaskType::type task_type = task.task_type;
622624
int64_t signature = task.signature;
623625
std::string type_str;
@@ -657,7 +659,7 @@ Status PriorTaskWorkerPool::submit_high_prior_and_cancel_low(const TAgentTaskReq
657659
} while (false);
658660

659661
// Set the receiving time of task so that we can determine whether it is timed out later
660-
(const_cast<TAgentTaskRequest&>(task)).__set_recv_time(time(nullptr));
662+
task.__set_recv_time(time(nullptr));
661663

662664
LOG_INFO("successfully submit task").tag("type", type_str).tag("signature", signature);
663665
return Status::OK();
@@ -1926,6 +1928,9 @@ void push_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
19261928
<< " push_type=" << push_req.push_type;
19271929
std::vector<TTabletInfo> tablet_infos;
19281930

1931+
// exist a path task_worker_pool <- agent_server <- backend_service <- BackendService
1932+
// use the arg BackendService_submit_tasks_args.tasks is not const
1933+
// and push_req will be modify, so modify is ok
19291934
EngineBatchLoadTask engine_task(engine, const_cast<TPushReq&>(push_req), &tablet_infos);
19301935
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
19311936
auto status = engine_task.execute();

be/src/agent/task_worker_pool.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ class PriorTaskWorkerPool final : public TaskWorkerPoolIf {
8888

8989
Status submit_task(const TAgentTaskRequest& task) override;
9090

91-
Status submit_high_prior_and_cancel_low(const TAgentTaskRequest& task);
91+
Status submit_high_prior_and_cancel_low(TAgentTaskRequest& task);
9292

9393
private:
9494
void normal_loop();

be/src/cloud/cloud_backend_service.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include "runtime/stream_load/stream_load_context.h"
3333
#include "runtime/stream_load/stream_load_recorder.h"
3434
#include "util/brpc_client_cache.h" // BrpcClientCache
35+
#include "util/stack_util.h"
3536
#include "util/thrift_server.h"
3637

3738
namespace doris {
@@ -213,8 +214,12 @@ void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons
213214
}
214215
brpc::Controller cntl;
215216
PGetFileCacheMetaRequest brpc_request;
216-
std::for_each(request.tablet_ids.cbegin(), request.tablet_ids.cend(),
217-
[&](int64_t tablet_id) { brpc_request.add_tablet_ids(tablet_id); });
217+
std::stringstream ss;
218+
std::for_each(request.tablet_ids.cbegin(), request.tablet_ids.cend(), [&](int64_t tablet_id) {
219+
brpc_request.add_tablet_ids(tablet_id);
220+
ss << tablet_id << ",";
221+
});
222+
VLOG_DEBUG << "tablets set: " << ss.str() << " stack: " << get_stack_trace();
218223
PGetFileCacheMetaResponse brpc_response;
219224

220225
brpc_stub->get_file_cache_meta_by_tablet_id(&cntl, &brpc_request, &brpc_response, nullptr);

be/src/cloud/cloud_meta_mgr.cpp

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,7 @@ Status retry_rpc(std::string_view op_name, const Request& req, Response* res,
388388
static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
389389
static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
390390

391+
// Applies only to the current file, and all req are non-const, but passed as const types.
391392
const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
392393

393394
int retry_times = 0;
@@ -944,6 +945,17 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_
944945
DeleteBitmapPtr new_delete_bitmap = std::make_shared<DeleteBitmap>(tablet->tablet_id());
945946
*delete_bitmap = *new_delete_bitmap;
946947
}
948+
949+
if (read_version == 2 && config::delete_bitmap_store_write_version == 1) {
950+
return Status::InternalError(
951+
"please set delete_bitmap_store_read_version to 1 or 3 because "
952+
"delete_bitmap_store_write_version is 1");
953+
} else if (read_version == 1 && config::delete_bitmap_store_write_version == 2) {
954+
return Status::InternalError(
955+
"please set delete_bitmap_store_read_version to 2 or 3 because "
956+
"delete_bitmap_store_write_version is 2");
957+
}
958+
947959
int64_t new_max_version = std::max(old_max_version, rs_metas.rbegin()->end_version());
948960
// When there are many delete bitmaps that need to be synchronized, it
949961
// may take a longer time, especially when loading the tablet for the
@@ -1637,6 +1649,7 @@ Status CloudMetaMgr::update_delete_bitmap(const CloudTablet& tablet, int64_t loc
16371649
if (next_visible_version > 0) {
16381650
req.set_next_visible_version(next_visible_version);
16391651
}
1652+
req.set_store_version(store_version);
16401653

16411654
bool write_v1 = store_version == 1 || store_version == 3;
16421655
bool write_v2 = store_version == 2 || store_version == 3;
@@ -1784,6 +1797,10 @@ Status CloudMetaMgr::cloud_update_delete_bitmap_without_lock(
17841797
const CloudTablet& tablet, DeleteBitmap* delete_bitmap,
17851798
std::map<std::string, int64_t>& rowset_to_versions, int64_t pre_rowset_agg_start_version,
17861799
int64_t pre_rowset_agg_end_version) {
1800+
if (config::delete_bitmap_store_write_version == 2) {
1801+
VLOG_DEBUG << "no need to agg delete bitmap v1 in ms because use v2";
1802+
return Status::OK();
1803+
}
17871804
LOG(INFO) << "cloud_update_delete_bitmap_without_lock, tablet_id: " << tablet.tablet_id()
17881805
<< ", delete_bitmap size: " << delete_bitmap->delete_bitmap.size();
17891806
UpdateDeleteBitmapRequest req;
@@ -1915,12 +1932,12 @@ void CloudMetaMgr::remove_delete_bitmap_update_lock(int64_t table_id, int64_t lo
19151932
}
19161933
}
19171934

1918-
void CloudMetaMgr::check_table_size_correctness(const RowsetMeta& rs_meta) {
1935+
void CloudMetaMgr::check_table_size_correctness(RowsetMeta& rs_meta) {
19191936
if (!config::enable_table_size_correctness_check) {
19201937
return;
19211938
}
19221939
int64_t total_segment_size = get_segment_file_size(rs_meta);
1923-
int64_t total_inverted_index_size = get_inverted_index_file_szie(rs_meta);
1940+
int64_t total_inverted_index_size = get_inverted_index_file_size(rs_meta);
19241941
if (rs_meta.data_disk_size() != total_segment_size ||
19251942
rs_meta.index_disk_size() != total_inverted_index_size ||
19261943
rs_meta.data_disk_size() + rs_meta.index_disk_size() != rs_meta.total_disk_size()) {
@@ -1939,9 +1956,9 @@ void CloudMetaMgr::check_table_size_correctness(const RowsetMeta& rs_meta) {
19391956
}
19401957
}
19411958

1942-
int64_t CloudMetaMgr::get_segment_file_size(const RowsetMeta& rs_meta) {
1959+
int64_t CloudMetaMgr::get_segment_file_size(RowsetMeta& rs_meta) {
19431960
int64_t total_segment_size = 0;
1944-
const auto fs = const_cast<RowsetMeta&>(rs_meta).fs();
1961+
const auto fs = rs_meta.fs();
19451962
if (!fs) {
19461963
LOG(WARNING) << "get fs failed, resource_id={}" << rs_meta.resource_id();
19471964
}
@@ -1966,9 +1983,9 @@ int64_t CloudMetaMgr::get_segment_file_size(const RowsetMeta& rs_meta) {
19661983
return total_segment_size;
19671984
}
19681985

1969-
int64_t CloudMetaMgr::get_inverted_index_file_szie(const RowsetMeta& rs_meta) {
1986+
int64_t CloudMetaMgr::get_inverted_index_file_size(RowsetMeta& rs_meta) {
19701987
int64_t total_inverted_index_size = 0;
1971-
const auto fs = const_cast<RowsetMeta&>(rs_meta).fs();
1988+
const auto fs = rs_meta.fs();
19721989
if (!fs) {
19731990
LOG(WARNING) << "get fs failed, resource_id={}" << rs_meta.resource_id();
19741991
}

be/src/cloud/cloud_meta_mgr.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -191,9 +191,9 @@ class CloudMetaMgr {
191191
GetDeleteBitmapResponse& res,
192192
int64_t bytes_threadhold);
193193

194-
void check_table_size_correctness(const RowsetMeta& rs_meta);
195-
int64_t get_segment_file_size(const RowsetMeta& rs_meta);
196-
int64_t get_inverted_index_file_szie(const RowsetMeta& rs_meta);
194+
void check_table_size_correctness(RowsetMeta& rs_meta);
195+
int64_t get_segment_file_size(RowsetMeta& rs_meta);
196+
int64_t get_inverted_index_file_size(RowsetMeta& rs_meta);
197197
};
198198

199199
} // namespace cloud

be/src/cloud/cloud_tablet.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
#include "olap/tablet_schema.h"
6262
#include "olap/txn_manager.h"
6363
#include "util/debug_points.h"
64+
#include "util/stack_util.h"
6465
#include "vec/common/schema_util.h"
6566

6667
namespace doris {
@@ -382,6 +383,8 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
382383
return;
383384
}
384385

386+
VLOG_DEBUG << "add_rowsets tablet_id=" << tablet_id() << " stack: " << get_stack_trace();
387+
385388
auto add_rowsets_directly = [=, this](std::vector<RowsetSharedPtr>& rowsets) {
386389
for (auto& rs : rowsets) {
387390
if (version_overlap || warmup_delta_data) {

be/src/cloud/cloud_tablet_mgr.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "common/status.h"
3030
#include "olap/lru_cache.h"
3131
#include "runtime/memory/cache_policy.h"
32+
#include "util/stack_util.h"
3233

3334
namespace doris {
3435
uint64_t g_tablet_report_inactive_duration_ms = 0;
@@ -175,6 +176,8 @@ Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_i
175176
TabletMap& tablet_map;
176177
};
177178

179+
VLOG_DEBUG << "get_tablet tablet_id=" << tablet_id << " stack: " << get_stack_trace();
180+
178181
auto tablet_id_str = std::to_string(tablet_id);
179182
CacheKey key(tablet_id_str);
180183
auto* handle = _cache->lookup(key);

be/src/cloud/cloud_warm_up_manager.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
#include "runtime/client_cache.h"
4040
#include "runtime/exec_env.h"
4141
#include "util/brpc_client_cache.h" // BrpcClientCache
42+
#include "util/stack_util.h"
4243
#include "util/thrift_rpc_helper.h"
4344
#include "util/time.h"
4445

@@ -210,6 +211,7 @@ void CloudWarmUpManager::handle_jobs() {
210211
std::make_shared<bthread::CountdownEvent>(0);
211212

212213
for (int64_t tablet_id : cur_job->tablet_ids) {
214+
VLOG_DEBUG << "Warm up tablet " << tablet_id << " stack: " << get_stack_trace();
213215
if (_cur_job_id == 0) { // The job is canceled
214216
break;
215217
}

be/src/common/config.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ DEFINE_Validator(task_executor_initial_max_concurrency_per_task, [](const int co
317317
return true;
318318
});
319319
// Enable task executor in internal table scan.
320-
DEFINE_Bool(enable_task_executor_in_internal_table, "false");
320+
DEFINE_Bool(enable_task_executor_in_internal_table, "true");
321321
// Enable task executor in external table scan.
322322
DEFINE_Bool(enable_task_executor_in_external_table, "true");
323323

0 commit comments

Comments
 (0)