Skip to content

Commit a43c10f

Browse files
committed
add query handle to extend RuntimeFilterMergeControllerEntity lifecircuit
1 parent a0e1306 commit a43c10f

File tree

10 files changed

+151
-114
lines changed

10 files changed

+151
-114
lines changed

be/src/olap/id_manager.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,13 +195,13 @@ class IdFileMap {
195195
return it->second;
196196
}
197197

198-
int64_t get_delayed_expired_timestamp() { return delayed_expired_timestamp; }
198+
int64_t get_delayed_expired_timestamp() const { return delayed_expired_timestamp; }
199199

200200
void set_external_scan_params(QueryContext* query_ctx, int max_file_scanners) {
201201
std::call_once(once_flag_for_external, [&] {
202202
DCHECK(query_ctx != nullptr);
203203
_query_global = query_ctx->get_query_globals();
204-
_query_options = query_ctx->get_query_options();
204+
_query_options = query_ctx->query_options();
205205
_file_scan_range_params_map = query_ctx->file_scan_range_params_map;
206206
_max_file_scanners = max_file_scanners;
207207
});

be/src/runtime/fragment_mgr.cpp

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,13 @@
6969
#include "runtime/frontend_info.h"
7070
#include "runtime/primitive_type.h"
7171
#include "runtime/query_context.h"
72+
#include "runtime/query_handle.h"
7273
#include "runtime/runtime_query_statistics_mgr.h"
7374
#include "runtime/runtime_state.h"
7475
#include "runtime/stream_load/new_load_stream_mgr.h"
7576
#include "runtime/stream_load/stream_load_context.h"
7677
#include "runtime/stream_load/stream_load_executor.h"
7778
#include "runtime/thread_context.h"
78-
#include "runtime/types.h"
7979
#include "runtime/workload_group/workload_group.h"
8080
#include "runtime/workload_group/workload_group_manager.h"
8181
#include "runtime_filter/runtime_filter_consumer.h"
@@ -728,6 +728,10 @@ Status FragmentMgr::_get_or_create_query_ctx(const TPipelineFragmentParams& para
728728
query_ctx = QueryContext::create(query_id, _exec_env, params.query_options,
729729
params.coord, params.is_nereids,
730730
params.current_connect_fe, query_source);
731+
auto query_handle =
732+
std::make_shared<QueryHandle>(query_id, parent.query_options, query_ctx);
733+
_query_handle_map.insert(query_id, query_handle);
734+
query_ctx->set_query_handle(query_handle);
731735
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker());
732736
RETURN_IF_ERROR(DescriptorTbl::create(
733737
&(query_ctx->obj_pool), params.desc_tbl, &(query_ctx->desc_tbl)));
@@ -755,9 +759,8 @@ Status FragmentMgr::_get_or_create_query_ctx(const TPipelineFragmentParams& para
755759
if (info.__isset.runtime_filter_params) {
756760
auto handler =
757761
std::make_shared<RuntimeFilterMergeControllerEntity>();
758-
RETURN_IF_ERROR(
759-
handler->init(query_ctx, info.runtime_filter_params));
760-
query_ctx->set_merge_controller_handler(handler);
762+
RETURN_IF_ERROR(handler->init(info.runtime_filter_params));
763+
query_handle->set_merge_controller_handler(handler);
761764

762765
query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
763766
info.runtime_filter_params);
@@ -787,6 +790,15 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) {
787790
timespec now;
788791
clock_gettime(CLOCK_MONOTONIC, &now);
789792

793+
_query_handle_map.apply(
794+
[&](phmap::flat_hash_map<TUniqueId, std::shared_ptr<QueryHandle>>& map) -> Status {
795+
for (auto& it : map) {
796+
fmt::format_to(debug_string_buffer, "QueryHandle: {}\n",
797+
it.second->debug_string());
798+
}
799+
return Status::OK();
800+
});
801+
790802
_pipeline_map.apply([&](phmap::flat_hash_map<
791803
std::pair<TUniqueId, int>,
792804
std::shared_ptr<pipeline::PipelineFragmentContext>>& map)
@@ -804,13 +816,6 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) {
804816
debug_string_buffer, "QueryId: {}, global_runtime_filter_mgr: {}\n",
805817
print_id(it.first.first),
806818
it.second->get_query_ctx()->runtime_filter_mgr()->debug_string());
807-
808-
if (it.second->get_query_ctx()->get_merge_controller_handler()) {
809-
fmt::format_to(debug_string_buffer, "{}\n",
810-
it.second->get_query_ctx()
811-
->get_merge_controller_handler()
812-
->debug_string());
813-
}
814819
}
815820

816821
auto timeout_second = it.second->timeout_second();
@@ -914,6 +919,7 @@ void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) {
914919
}
915920
query_ctx->cancel(reason);
916921
remove_query_context(query_id);
922+
_query_handle_map.erase(query_id);
917923
LOG(INFO) << "Query " << print_id(query_id)
918924
<< " is cancelled and removed. Reason: " << reason.to_string();
919925
}
@@ -1310,8 +1316,8 @@ Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) {
13101316
return Status::EndOfFile("inject FragmentMgr::send_filter_size.return_eof");
13111317
}
13121318

1313-
if (auto q_ctx = get_query_ctx(query_id)) {
1314-
return q_ctx->get_merge_controller_handler()->send_filter_size(q_ctx, request);
1319+
if (auto q_handle = _query_handle_map.find(query_id); q_handle != nullptr) {
1320+
return q_handle->get_merge_controller_handler()->send_filter_size(q_handle, request);
13151321
} else {
13161322
return Status::EndOfFile(
13171323
"Send filter size failed: Query context (query-id: {}) not found, maybe "
@@ -1347,12 +1353,11 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
13471353
TUniqueId query_id;
13481354
query_id.__set_hi(queryid.hi);
13491355
query_id.__set_lo(queryid.lo);
1350-
if (auto q_ctx = get_query_ctx(query_id)) {
1351-
SCOPED_ATTACH_TASK(q_ctx.get());
1352-
if (!q_ctx->get_merge_controller_handler()) {
1356+
if (auto q_handle = _query_handle_map.find(query_id); q_handle != nullptr) {
1357+
if (!q_handle->get_merge_controller_handler()) {
13531358
return Status::InternalError("Merge filter failed: Merge controller handler is null");
13541359
}
1355-
return q_ctx->get_merge_controller_handler()->merge(q_ctx, request, attach_data);
1360+
return q_handle->get_merge_controller_handler()->merge(q_handle, request, attach_data);
13561361
} else {
13571362
return Status::EndOfFile(
13581363
"Merge filter size failed: Query context (query-id: {}) already finished",

be/src/runtime/fragment_mgr.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ namespace pipeline {
5353
class PipelineFragmentContext;
5454
} // namespace pipeline
5555
class QueryContext;
56+
class QueryHandle;
5657
class ExecEnv;
5758
class ThreadPool;
5859
class PExecPlanFragmentStartRequest;
@@ -209,6 +210,7 @@ class FragmentMgr : public RestMonitorIface {
209210

210211
// query id -> QueryContext
211212
ConcurrentContextMap<TUniqueId, std::weak_ptr<QueryContext>, QueryContext> _query_ctx_map;
213+
ConcurrentContextMap<TUniqueId, std::shared_ptr<QueryHandle>, QueryHandle> _query_handle_map;
212214

213215
CountDownLatch _stop_background_threads_latch;
214216
std::shared_ptr<Thread> _cancel_thread;

be/src/runtime/query_context.cpp

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
#include <gen_cpp/Types_types.h>
2424
#include <glog/logging.h>
2525

26-
#include <algorithm>
2726
#include <exception>
2827
#include <memory>
2928
#include <mutex>
@@ -32,7 +31,6 @@
3231

3332
#include "common/logging.h"
3433
#include "common/status.h"
35-
#include "olap/olap_common.h"
3634
#include "pipeline/dependency.h"
3735
#include "pipeline/pipeline_fragment_context.h"
3836
#include "runtime/exec_env.h"
@@ -43,10 +41,8 @@
4341
#include "runtime/thread_context.h"
4442
#include "runtime/workload_group/workload_group_manager.h"
4543
#include "runtime/workload_management/query_task_controller.h"
46-
#include "runtime_filter/runtime_filter_definitions.h"
4744
#include "util/mem_info.h"
4845
#include "util/uid_util.h"
49-
#include "vec/spill/spill_stream_manager.h"
5046

5147
namespace doris {
5248

@@ -106,8 +102,6 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv* exec_env,
106102
_memory_sufficient_dependency =
107103
pipeline::Dependency::create_unique(-1, -1, "MemorySufficientDependency", true);
108104

109-
_runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(true);
110-
111105
_timeout_second = query_options.execution_timeout;
112106

113107
bool is_query_type_valid = query_options.query_type == TQueryType::SELECT ||
@@ -231,10 +225,6 @@ QueryContext::~QueryContext() {
231225
_runtime_predicates.clear();
232226
file_scan_range_params_map.clear();
233227
obj_pool.clear();
234-
if (_merge_controller_handler) {
235-
_merge_controller_handler->release_undone_filters(this);
236-
}
237-
_merge_controller_handler.reset();
238228

239229
DorisMetrics::instance()->query_ctx_cnt->increment(-1);
240230
ExecEnv::GetInstance()->fragment_mgr()->remove_query_context(this->_query_id);

be/src/runtime/query_context.h

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
#include <gen_cpp/Types_types.h>
2323
#include <glog/logging.h>
2424

25-
#include <atomic>
2625
#include <cstdint>
2726
#include <memory>
2827
#include <mutex>
@@ -34,16 +33,16 @@
3433
#include "common/object_pool.h"
3534
#include "runtime/exec_env.h"
3635
#include "runtime/memory/mem_tracker_limiter.h"
36+
#include "runtime/query_handle.h"
3737
#include "runtime/runtime_predicate.h"
3838
#include "runtime/workload_management/resource_context.h"
39-
#include "runtime_filter/runtime_filter_mgr.h"
40-
#include "util/hash_util.hpp"
41-
#include "util/threadpool.h"
4239
#include "vec/exec/scan/scanner_scheduler.h"
4340
#include "workload_group/workload_group.h"
4441

4542
namespace doris {
4643

44+
class QueryHandle;
45+
4746
namespace pipeline {
4847
class PipelineFragmentContext;
4948
class PipelineTask;
@@ -177,12 +176,6 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
177176
return _query_options.__isset.fe_process_uuid ? _query_options.fe_process_uuid : 0;
178177
}
179178

180-
bool ignore_runtime_filter_error() const {
181-
return _query_options.__isset.ignore_runtime_filter_error
182-
? _query_options.ignore_runtime_filter_error
183-
: false;
184-
}
185-
186179
bool enable_force_spill() const {
187180
return _query_options.__isset.enable_force_spill && _query_options.enable_force_spill;
188181
}
@@ -214,14 +207,6 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
214207

215208
doris::pipeline::TaskScheduler* get_pipe_exec_scheduler();
216209

217-
void set_merge_controller_handler(
218-
std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
219-
_merge_controller_handler = handler;
220-
}
221-
std::shared_ptr<RuntimeFilterMergeControllerEntity> get_merge_controller_handler() const {
222-
return _merge_controller_handler;
223-
}
224-
225210
bool is_nereids() const { return _is_nereids; }
226211

227212
WorkloadGroupPtr workload_group() const { return _resource_ctx->workload_group(); }
@@ -299,6 +284,10 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
299284
void set_first_error_msg(std::string error_msg);
300285
std::string get_first_error_msg();
301286

287+
void set_query_handle(std::shared_ptr<QueryHandle> query_handle) {
288+
_query_handle = query_handle;
289+
}
290+
302291
private:
303292
friend class QueryTaskController;
304293

@@ -330,10 +319,6 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
330319
// This dependency indicates if memory is sufficient to execute.
331320
std::unique_ptr<pipeline::Dependency> _memory_sufficient_dependency;
332321

333-
// This shared ptr is never used. It is just a reference to hold the object.
334-
// There is a weak ptr in runtime filter manager to reference this object.
335-
std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler;
336-
337322
std::map<int, std::weak_ptr<pipeline::PipelineFragmentContext>> _fragment_id_to_pipeline_ctx;
338323
std::mutex _pipeline_map_write_lock;
339324

@@ -377,6 +362,8 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
377362
std::string _load_error_url;
378363
std::string _first_error_msg;
379364

365+
std::shared_ptr<QueryHandle> _query_handle;
366+
380367
public:
381368
// when fragment of pipeline is closed, it will register its profile to this map by using add_fragment_profile
382369
void add_fragment_profile(
@@ -392,8 +379,6 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
392379

393380
timespec get_query_arrival_timestamp() const { return this->_query_arrival_timestamp; }
394381
QuerySource get_query_source() const { return this->_query_source; }
395-
396-
const TQueryOptions get_query_options() const { return _query_options; }
397382
};
398383

399384
} // namespace doris

be/src/runtime/query_handle.h

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
20+
#include <gen_cpp/PaloInternalService_types.h>
21+
#include <gen_cpp/Types_types.h>
22+
23+
#include <memory>
24+
#include <string>
25+
#include <utility>
26+
27+
#include "runtime/query_context.h"
28+
#include "runtime_filter/runtime_filter_mgr.h"
29+
30+
namespace doris {
31+
32+
class QueryHandle;
33+
34+
// The role of QueryHandle is similar to QueryContext, but its lifecycle is longer than QueryContext.
35+
// QueryHandle will exist until the entire query ends, rather than being released as the current BE task ends like QueryContext.
36+
// It is mainly used to store runtime states that need coordination between BEs, such as the MergeControllerHandler of RuntimeFilter.
37+
// This way, even if the QueryContext of one BE has been released, other BEs can still access these coordination states through QueryHandle to ensure the correctness and consistency of the query.
38+
// QueryContext hold shared_ptr of QueryHandle, and QueryHandle hold weak_ptr of QueryContext to avoid circular references.
39+
class QueryHandle {
40+
public:
41+
QueryHandle(TUniqueId query_id, TQueryOptions query_options,
42+
std::weak_ptr<QueryContext> query_ctx)
43+
: _query_id(std::move(query_id)),
44+
_query_options(std::move(query_options)),
45+
_query_ctx(std::move(query_ctx)) {}
46+
~QueryHandle() = default;
47+
48+
std::string debug_string() const {
49+
return fmt::format(
50+
"QueryHandle(query_id={}): {}", print_id(_query_id),
51+
_merge_controller_handler ? _merge_controller_handler->debug_string() : "null");
52+
}
53+
54+
void set_merge_controller_handler(
55+
std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
56+
_merge_controller_handler = handler;
57+
}
58+
std::shared_ptr<RuntimeFilterMergeControllerEntity> get_merge_controller_handler() const {
59+
return _merge_controller_handler;
60+
}
61+
62+
const TQueryOptions& query_options() const { return _query_options; }
63+
64+
std::weak_ptr<QueryContext> weak_query_ctx() const { return _query_ctx; }
65+
66+
int execution_timeout() const {
67+
return _query_options.__isset.execution_timeout ? _query_options.execution_timeout
68+
: _query_options.query_timeout;
69+
}
70+
71+
TUniqueId query_id() const { return _query_id; }
72+
73+
private:
74+
TUniqueId _query_id;
75+
TQueryOptions _query_options;
76+
std::weak_ptr<QueryContext> _query_ctx;
77+
// This shared ptr is never used. It is just a reference to hold the object.
78+
// There is a weak ptr in runtime filter manager to reference this object.
79+
std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler;
80+
};
81+
82+
} // namespace doris

be/src/runtime_filter/runtime_filter_merger.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ class RuntimeFilterMerger : public RuntimeFilter {
3535
READY // Collecting all products(_received_producer_num == _expected_producer_num) will transfer to this state, and filter is already available
3636
};
3737

38-
static Status create(const QueryContext* query_ctx, const TRuntimeFilterDesc* desc,
38+
static Status create(const TRuntimeFilterDesc* desc,
3939
std::shared_ptr<RuntimeFilterMerger>* res) {
40-
*res = std::shared_ptr<RuntimeFilterMerger>(new RuntimeFilterMerger(query_ctx, desc));
40+
*res = std::shared_ptr<RuntimeFilterMerger>(new RuntimeFilterMerger(desc));
4141
vectorized::VExprContextSPtr build_ctx;
4242
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(desc->src_expr, build_ctx));
4343
(*res)->_wrapper = std::make_shared<RuntimeFilterWrapper>(
@@ -104,7 +104,7 @@ class RuntimeFilterMerger : public RuntimeFilter {
104104
}
105105

106106
private:
107-
RuntimeFilterMerger(const QueryContext* query_ctx, const TRuntimeFilterDesc* desc)
107+
RuntimeFilterMerger(const TRuntimeFilterDesc* desc)
108108
: RuntimeFilter(desc), _rf_state(State::WAITING_FOR_PRODUCT) {}
109109

110110
std::atomic<State> _rf_state;

0 commit comments

Comments
 (0)