Skip to content

Commit cb4fc02

Browse files
committed
update
1 parent 0408f4a commit cb4fc02

File tree

8 files changed

+71
-64
lines changed

8 files changed

+71
-64
lines changed

be/src/runtime/query_handle.cpp renamed to be/src/runtime/coordinator_context.cpp

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,56 +15,59 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
#include "runtime/query_handle.h"
18+
#include "runtime/coordinator_context.h"
1919

2020
#include "runtime/query_context.h"
2121

2222
namespace doris {
2323

24-
QueryHandle::QueryHandle(TUniqueId query_id, TQueryOptions query_options,
25-
std::weak_ptr<QueryContext> query_ctx)
24+
CoordinatorContext::CoordinatorContext(TUniqueId query_id, TQueryOptions query_options,
25+
std::weak_ptr<QueryContext> query_ctx)
2626
: _query_id(std::move(query_id)),
2727
_query_options(std::move(query_options)),
2828
_query_ctx(std::move(query_ctx)) {
2929
CHECK(_query_ctx.lock() != nullptr);
3030
_resource_ctx = _query_ctx.lock()->_resource_ctx;
3131
}
3232

33-
QueryHandle::~QueryHandle() = default;
33+
CoordinatorContext::~CoordinatorContext() {
34+
SCOPED_ATTACH_TASK(_resource_ctx);
35+
_merge_controller_handler.reset();
36+
}
3437

35-
std::string QueryHandle::debug_string() const {
38+
std::string CoordinatorContext::debug_string() const {
3639
return fmt::format(
37-
"QueryHandle(query_id={}): {}", print_id(_query_id),
40+
"CoordinatorContext(query_id={}): {}", print_id(_query_id),
3841
_merge_controller_handler ? _merge_controller_handler->debug_string() : "null");
3942
}
4043

41-
void QueryHandle::set_merge_controller_handler(
44+
void CoordinatorContext::set_merge_controller_handler(
4245
std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
4346
_merge_controller_handler = handler;
4447
}
45-
std::shared_ptr<RuntimeFilterMergeControllerEntity> QueryHandle::get_merge_controller_handler()
46-
const {
48+
std::shared_ptr<RuntimeFilterMergeControllerEntity>
49+
CoordinatorContext::get_merge_controller_handler() const {
4750
return _merge_controller_handler;
4851
}
4952

50-
const TQueryOptions& QueryHandle::query_options() const {
53+
const TQueryOptions& CoordinatorContext::query_options() const {
5154
return _query_options;
5255
}
5356

54-
std::weak_ptr<QueryContext> QueryHandle::weak_query_ctx() const {
57+
std::weak_ptr<QueryContext> CoordinatorContext::weak_query_ctx() const {
5558
return _query_ctx;
5659
}
5760

58-
int QueryHandle::execution_timeout() const {
61+
int CoordinatorContext::execution_timeout() const {
5962
return _query_options.__isset.execution_timeout ? _query_options.execution_timeout
6063
: _query_options.query_timeout;
6164
}
6265

63-
TUniqueId QueryHandle::query_id() const {
66+
TUniqueId CoordinatorContext::query_id() const {
6467
return _query_id;
6568
}
6669

67-
std::shared_ptr<ResourceContext> QueryHandle::resource_ctx() const {
70+
std::shared_ptr<ResourceContext> CoordinatorContext::resource_ctx() const {
6871
return _resource_ctx;
6972
}
7073

be/src/runtime/query_handle.h renamed to be/src/runtime/coordinator_context.h

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,25 +22,25 @@
2222

2323
#include <memory>
2424
#include <string>
25-
#include <utility>
2625

2726
#include "runtime/query_context.h"
2827
#include "runtime_filter/runtime_filter_mgr.h"
2928

3029
namespace doris {
3130

32-
class QueryHandle;
31+
class CoordinatorContext;
32+
class ResourceContext;
3333

34-
// The role of QueryHandle is similar to QueryContext, but its lifecycle is longer than QueryContext.
35-
// QueryHandle will exist until the entire query ends, rather than being released as the current BE task ends like QueryContext.
34+
// The role of CoordinatorContext is similar to QueryContext, but its lifecycle is longer than QueryContext.
35+
// CoordinatorContext will exist until the entire query ends, rather than being released as the current BE task ends like QueryContext.
3636
// It is mainly used to store runtime states that need coordination between BEs, such as the MergeControllerHandler of RuntimeFilter.
37-
// This way, even if the QueryContext of one BE has been released, other BEs can still access these coordination states through QueryHandle to ensure the correctness and consistency of the query.
38-
// QueryContext hold shared_ptr of QueryHandle, and QueryHandle hold weak_ptr of QueryContext to avoid circular references.
39-
class QueryHandle {
37+
// This way, even if the QueryContext of one BE has been released, other BEs can still access these coordination states through CoordinatorContext to ensure the correctness and consistency of the query.
38+
// QueryContext hold shared_ptr of CoordinatorContext, and CoordinatorContext hold weak_ptr of QueryContext to avoid circular references.
39+
class CoordinatorContext {
4040
public:
41-
QueryHandle(TUniqueId query_id, TQueryOptions query_options,
42-
std::weak_ptr<QueryContext> query_ctx);
43-
~QueryHandle();
41+
CoordinatorContext(TUniqueId query_id, TQueryOptions query_options,
42+
std::weak_ptr<QueryContext> query_ctx);
43+
~CoordinatorContext();
4444

4545
std::string debug_string() const;
4646

be/src/runtime/fragment_mgr.cpp

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,12 @@
6464
#include "io/fs/stream_load_pipe.h"
6565
#include "pipeline/pipeline_fragment_context.h"
6666
#include "runtime/client_cache.h"
67+
#include "runtime/coordinator_context.h"
6768
#include "runtime/descriptors.h"
6869
#include "runtime/exec_env.h"
6970
#include "runtime/frontend_info.h"
7071
#include "runtime/primitive_type.h"
7172
#include "runtime/query_context.h"
72-
#include "runtime/query_handle.h"
7373
#include "runtime/runtime_query_statistics_mgr.h"
7474
#include "runtime/runtime_state.h"
7575
#include "runtime/stream_load/new_load_stream_mgr.h"
@@ -728,10 +728,10 @@ Status FragmentMgr::_get_or_create_query_ctx(const TPipelineFragmentParams& para
728728
query_ctx = QueryContext::create(query_id, _exec_env, params.query_options,
729729
params.coord, params.is_nereids,
730730
params.current_connect_fe, query_source);
731-
auto query_handle = std::make_shared<QueryHandle>(
731+
auto coordinator_context = std::make_shared<CoordinatorContext>(
732732
query_id, parent.query_options, query_ctx);
733-
_query_handle_map.insert(query_id, query_handle);
734-
query_ctx->set_query_handle(query_handle);
733+
_coordinator_context_map.insert(query_id, coordinator_context);
734+
query_ctx->set_coordinator_context(coordinator_context);
735735
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker());
736736
RETURN_IF_ERROR(DescriptorTbl::create(
737737
&(query_ctx->obj_pool), params.desc_tbl, &(query_ctx->desc_tbl)));
@@ -760,7 +760,7 @@ Status FragmentMgr::_get_or_create_query_ctx(const TPipelineFragmentParams& para
760760
auto handler =
761761
std::make_shared<RuntimeFilterMergeControllerEntity>();
762762
RETURN_IF_ERROR(handler->init(info.runtime_filter_params));
763-
query_handle->set_merge_controller_handler(handler);
763+
coordinator_context->set_merge_controller_handler(handler);
764764

765765
query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
766766
info.runtime_filter_params);
@@ -790,10 +790,11 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) {
790790
timespec now;
791791
clock_gettime(CLOCK_MONOTONIC, &now);
792792

793-
_query_handle_map.apply(
794-
[&](phmap::flat_hash_map<TUniqueId, std::shared_ptr<QueryHandle>>& map) -> Status {
793+
_coordinator_context_map.apply(
794+
[&](phmap::flat_hash_map<TUniqueId, std::shared_ptr<CoordinatorContext>>& map)
795+
-> Status {
795796
for (auto& it : map) {
796-
fmt::format_to(debug_string_buffer, "QueryHandle: {}\n",
797+
fmt::format_to(debug_string_buffer, "CoordinatorContext: {}\n",
797798
it.second->debug_string());
798799
}
799800
return Status::OK();
@@ -920,7 +921,7 @@ void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) {
920921
SCOPED_ATTACH_TASK(query_ctx->resource_ctx());
921922
query_ctx->cancel(reason);
922923
remove_query_context(query_id);
923-
_query_handle_map.erase(query_id);
924+
_coordinator_context_map.erase(query_id);
924925
LOG(INFO) << "Query " << print_id(query_id)
925926
<< " is cancelled and removed. Reason: " << reason.to_string();
926927
}
@@ -1317,7 +1318,7 @@ Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) {
13171318
return Status::EndOfFile("inject FragmentMgr::send_filter_size.return_eof");
13181319
}
13191320

1320-
if (auto q_handle = _query_handle_map.find(query_id); q_handle != nullptr) {
1321+
if (auto q_handle = _coordinator_context_map.find(query_id); q_handle != nullptr) {
13211322
return q_handle->get_merge_controller_handler()->send_filter_size(q_handle, request);
13221323
} else {
13231324
return Status::EndOfFile(
@@ -1354,7 +1355,7 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
13541355
TUniqueId query_id;
13551356
query_id.__set_hi(queryid.hi);
13561357
query_id.__set_lo(queryid.lo);
1357-
if (auto q_handle = _query_handle_map.find(query_id); q_handle != nullptr) {
1358+
if (auto q_handle = _coordinator_context_map.find(query_id); q_handle != nullptr) {
13581359
SCOPED_ATTACH_TASK(q_handle->resource_ctx());
13591360
if (!q_handle->get_merge_controller_handler()) {
13601361
return Status::InternalError("Merge filter failed: Merge controller handler is null");

be/src/runtime/fragment_mgr.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ namespace pipeline {
5353
class PipelineFragmentContext;
5454
} // namespace pipeline
5555
class QueryContext;
56-
class QueryHandle;
56+
class CoordinatorContext;
5757
class ExecEnv;
5858
class ThreadPool;
5959
class PExecPlanFragmentStartRequest;
@@ -210,7 +210,8 @@ class FragmentMgr : public RestMonitorIface {
210210

211211
// query id -> QueryContext
212212
ConcurrentContextMap<TUniqueId, std::weak_ptr<QueryContext>, QueryContext> _query_ctx_map;
213-
ConcurrentContextMap<TUniqueId, std::shared_ptr<QueryHandle>, QueryHandle> _query_handle_map;
213+
ConcurrentContextMap<TUniqueId, std::shared_ptr<CoordinatorContext>, CoordinatorContext>
214+
_coordinator_context_map;
214215

215216
CountDownLatch _stop_background_threads_latch;
216217
std::shared_ptr<Thread> _cancel_thread;

be/src/runtime/query_context.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,17 @@
3131
#include "common/config.h"
3232
#include "common/factory_creator.h"
3333
#include "common/object_pool.h"
34+
#include "runtime/coordinator_context.h"
3435
#include "runtime/exec_env.h"
3536
#include "runtime/memory/mem_tracker_limiter.h"
36-
#include "runtime/query_handle.h"
3737
#include "runtime/runtime_predicate.h"
3838
#include "runtime/workload_management/resource_context.h"
3939
#include "vec/exec/scan/scanner_scheduler.h"
4040
#include "workload_group/workload_group.h"
4141

4242
namespace doris {
4343

44-
class QueryHandle;
44+
class CoordinatorContext;
4545

4646
namespace pipeline {
4747
class PipelineFragmentContext;
@@ -284,13 +284,13 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
284284
void set_first_error_msg(std::string error_msg);
285285
std::string get_first_error_msg();
286286

287-
void set_query_handle(std::shared_ptr<QueryHandle> query_handle) {
288-
_query_handle = query_handle;
287+
void set_coordinator_context(std::shared_ptr<CoordinatorContext> coordinator_context) {
288+
_coordinator_context = coordinator_context;
289289
}
290290

291291
private:
292292
friend class QueryTaskController;
293-
friend class QueryHandle;
293+
friend class CoordinatorContext;
294294

295295
int _timeout_second;
296296
TUniqueId _query_id;
@@ -363,7 +363,7 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
363363
std::string _load_error_url;
364364
std::string _first_error_msg;
365365

366-
std::shared_ptr<QueryHandle> _query_handle;
366+
std::shared_ptr<CoordinatorContext> _coordinator_context;
367367

368368
public:
369369
// when fragment of pipeline is closed, it will register its profile to this map by using add_fragment_profile

be/src/runtime_filter/runtime_filter_mgr.cpp

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@
3232
#include "common/config.h"
3333
#include "common/logging.h"
3434
#include "common/status.h"
35+
#include "runtime/coordinator_context.h"
3536
#include "runtime/exec_env.h"
3637
#include "runtime/memory/mem_tracker.h"
3738
#include "runtime/query_context.h"
38-
#include "runtime/query_handle.h"
3939
#include "runtime/runtime_state.h"
4040
#include "runtime/thread_context.h"
4141
#include "runtime_filter/runtime_filter_consumer.h"
@@ -216,7 +216,8 @@ Status RuntimeFilterMergeControllerEntity::init(const TRuntimeFilterParams& runt
216216
}
217217

218218
Status RuntimeFilterMergeControllerEntity::send_filter_size(
219-
std::shared_ptr<QueryHandle> query_handle, const PSendFilterSizeRequest* request) {
219+
std::shared_ptr<CoordinatorContext> coordinator_context,
220+
const PSendFilterSizeRequest* request) {
220221
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
221222

222223
auto filter_id = request->filter_id();
@@ -236,9 +237,9 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(
236237
Status st = Status::OK();
237238
// After all runtime filters' size are collected, we should send response to all producers.
238239
if (cnt_val.merger->add_rf_size(request->filter_size())) {
239-
auto ctx = query_handle->query_options().ignore_runtime_filter_error
240+
auto ctx = coordinator_context->query_options().ignore_runtime_filter_error
240241
? std::weak_ptr<QueryContext> {}
241-
: query_handle->weak_query_ctx();
242+
: coordinator_context->weak_query_ctx();
242243
for (auto addr : cnt_val.source_addrs) {
243244
std::shared_ptr<PBackendService_Stub> stub(
244245
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(addr));
@@ -255,10 +256,10 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(
255256
DummyBrpcCallback<PSyncFilterSizeResponse>::create_shared(), ctx);
256257

257258
auto* pquery_id = closure->request_->mutable_query_id();
258-
pquery_id->set_hi(query_handle->query_id().hi);
259-
pquery_id->set_lo(query_handle->query_id().lo);
259+
pquery_id->set_hi(coordinator_context->query_id().hi);
260+
pquery_id->set_lo(coordinator_context->query_id().lo);
260261
closure->cntl_->set_timeout_ms(
261-
get_execution_rpc_timeout_ms(query_handle->execution_timeout()));
262+
get_execution_rpc_timeout_ms(coordinator_context->execution_timeout()));
262263
if (config::execution_ignore_eovercrowded) {
263264
closure->cntl_->ignore_eovercrowded();
264265
}
@@ -302,9 +303,9 @@ std::string RuntimeFilterMgr::debug_string() {
302303
}
303304

304305
// merge data
305-
Status RuntimeFilterMergeControllerEntity::merge(std::shared_ptr<QueryHandle> query_handle,
306-
const PMergeFilterRequest* request,
307-
butil::IOBufAsZeroCopyInputStream* attach_data) {
306+
Status RuntimeFilterMergeControllerEntity::merge(
307+
std::shared_ptr<CoordinatorContext> coordinator_context, const PMergeFilterRequest* request,
308+
butil::IOBufAsZeroCopyInputStream* attach_data) {
308309
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
309310
int64_t merge_time = 0;
310311
auto filter_id = request->filter_id();
@@ -327,7 +328,7 @@ Status RuntimeFilterMergeControllerEntity::merge(std::shared_ptr<QueryHandle> qu
327328
return Status::OK();
328329
}
329330
std::shared_ptr<RuntimeFilterProducer> tmp_filter;
330-
RETURN_IF_ERROR(RuntimeFilterProducer::create(query_handle->query_options(),
331+
RETURN_IF_ERROR(RuntimeFilterProducer::create(coordinator_context->query_options(),
331332
&cnt_val.runtime_filter_desc, &tmp_filter));
332333

333334
RETURN_IF_ERROR(tmp_filter->assign(*request, attach_data));
@@ -340,11 +341,11 @@ Status RuntimeFilterMergeControllerEntity::merge(std::shared_ptr<QueryHandle> qu
340341

341342
if (is_ready) {
342343
return _send_rf_to_target(cnt_val,
343-
query_handle->query_options().ignore_runtime_filter_error
344+
coordinator_context->query_options().ignore_runtime_filter_error
344345
? std::weak_ptr<QueryContext> {}
345-
: query_handle->weak_query_ctx(),
346+
: coordinator_context->weak_query_ctx(),
346347
merge_time, request->query_id(),
347-
query_handle->execution_timeout());
348+
coordinator_context->execution_timeout());
348349
}
349350
return Status::OK();
350351
}

be/src/runtime_filter/runtime_filter_mgr.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class RuntimeFilterWrapper;
5555
class QueryContext;
5656
class ExecEnv;
5757
class RuntimeProfile;
58-
class QueryHandle;
58+
class CoordinatorContext;
5959

6060
struct LocalMergeContext {
6161
std::mutex mtx;
@@ -142,10 +142,11 @@ class RuntimeFilterMergeControllerEntity {
142142
Status init(const TRuntimeFilterParams& runtime_filter_params);
143143

144144
// handle merge rpc
145-
Status merge(std::shared_ptr<QueryHandle> query_handle, const PMergeFilterRequest* request,
145+
Status merge(std::shared_ptr<CoordinatorContext> coordinator_context,
146+
const PMergeFilterRequest* request,
146147
butil::IOBufAsZeroCopyInputStream* attach_data);
147148

148-
Status send_filter_size(std::shared_ptr<QueryHandle> query_handle,
149+
Status send_filter_size(std::shared_ptr<CoordinatorContext> coordinator_context,
149150
const PSendFilterSizeRequest* request);
150151

151152
std::string debug_string();

be/test/runtime_filter/runtime_filter_test_utils.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ class RuntimeFilterTest : public testing::Test {
3838
_query_ctx =
3939
QueryContext::create(TUniqueId(), ExecEnv::GetInstance(), _query_options,
4040
fe_address, true, fe_address, QuerySource::INTERNAL_FRONTEND);
41-
_query_handle =
42-
std::make_shared<QueryHandle>(_query_ctx->query_id(), _query_options, _query_ctx);
43-
_query_ctx->set_query_handle(_query_handle);
41+
_coordinator_context = std::make_shared<CoordinatorContext>(_query_ctx->query_id(),
42+
_query_options, _query_ctx);
43+
_query_ctx->set_coordinator_context(_coordinator_context);
4444
_query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
4545
TRuntimeFilterParamsBuilder().build());
4646

@@ -61,7 +61,7 @@ class RuntimeFilterTest : public testing::Test {
6161
protected:
6262
RuntimeProfile _profile = RuntimeProfile("");
6363
std::shared_ptr<QueryContext> _query_ctx;
64-
std::shared_ptr<QueryHandle> _query_handle;
64+
std::shared_ptr<CoordinatorContext> _coordinator_context;
6565
TQueryOptions _query_options;
6666
const std::string LOCALHOST = BackendOptions::get_localhost();
6767
const int DUMMY_PORT = config::brpc_port;

0 commit comments

Comments
 (0)