diff --git a/be/src/olap/id_manager.h b/be/src/olap/id_manager.h index 4b9974e0ee3596..cc729b6d5a6777 100644 --- a/be/src/olap/id_manager.h +++ b/be/src/olap/id_manager.h @@ -195,13 +195,13 @@ class IdFileMap { return it->second; } - int64_t get_delayed_expired_timestamp() { return delayed_expired_timestamp; } + int64_t get_delayed_expired_timestamp() const { return delayed_expired_timestamp; } void set_external_scan_params(QueryContext* query_ctx, int max_file_scanners) { std::call_once(once_flag_for_external, [&] { DCHECK(query_ctx != nullptr); _query_global = query_ctx->get_query_globals(); - _query_options = query_ctx->get_query_options(); + _query_options = query_ctx->query_options(); _file_scan_range_params_map = query_ctx->file_scan_range_params_map; _max_file_scanners = max_file_scanners; }); diff --git a/be/src/runtime/coordinator_context.cpp b/be/src/runtime/coordinator_context.cpp new file mode 100644 index 00000000000000..0be5697b0a5080 --- /dev/null +++ b/be/src/runtime/coordinator_context.cpp @@ -0,0 +1,114 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/coordinator_context.h" + +#include "runtime/query_context.h" + +namespace doris { + +CoordinatorContext::CoordinatorContext(TUniqueId query_id, TQueryOptions query_options, + std::weak_ptr query_ctx) + : _query_id(std::move(query_id)), + _query_options(std::move(query_options)), + _query_ctx(std::move(query_ctx)), + _timeout_second(query_options.execution_timeout) { + _query_watcher.start(); + CHECK(_query_ctx.lock() != nullptr); + _resource_ctx = _query_ctx.lock()->_resource_ctx; +} + +CoordinatorContext::~CoordinatorContext() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker()); + // query mem tracker consumption is equal to 0, it means that after QueryContext is created, + // it is found that query already exists in _query_ctx_map, and query mem tracker is not used. + // query mem tracker consumption is not equal to 0 after use, because there is memory consumed + // on query mem tracker, released on other trackers. + std::string mem_tracker_msg; + if (query_mem_tracker()->peak_consumption() != 0) { + mem_tracker_msg = fmt::format( + "deregister query/load memory tracker, queryId={}, Limit={}, CurrUsed={}, " + "PeakUsed={}", + print_id(_query_id), PrettyPrinter::print_bytes(query_mem_tracker()->limit()), + PrettyPrinter::print_bytes(query_mem_tracker()->consumption()), + PrettyPrinter::print_bytes(query_mem_tracker()->peak_consumption())); + } + [[maybe_unused]] uint64_t group_id = 0; + if (_resource_ctx->workload_group()) { + group_id = _resource_ctx->workload_group()->id(); // before remove + } + + _resource_ctx->task_controller()->finish(); + + // the only one msg shows query's end. any other msg should append to it if need. + LOG_INFO("Query {} deconstructed, mem_tracker: {}", print_id(this->_query_id), mem_tracker_msg); + +#ifndef BE_TEST + if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) [[unlikely]] { + try { + ExecEnv::GetInstance()->pipeline_tracer_context()->end_query(_query_id, group_id); + } catch (std::exception& e) { + LOG(WARNING) << "Dump trace log failed bacause " << e.what(); + } + } +#endif + _resource_ctx->memory_context()->mem_tracker(); + _merge_controller_handler.reset(); +} + +std::string CoordinatorContext::debug_string() const { + return fmt::format( + "CoordinatorContext(query_id={}): {}", print_id(_query_id), + _merge_controller_handler ? _merge_controller_handler->debug_string() : "null"); +} + +void CoordinatorContext::set_merge_controller_handler( + std::shared_ptr& handler) { + _merge_controller_handler = handler; +} +std::shared_ptr +CoordinatorContext::get_merge_controller_handler() const { + return _merge_controller_handler; +} + +const TQueryOptions& CoordinatorContext::query_options() const { + return _query_options; +} + +std::weak_ptr CoordinatorContext::weak_query_ctx() const { + return _query_ctx; +} + +int CoordinatorContext::execution_timeout() const { + return _query_options.__isset.execution_timeout ? _query_options.execution_timeout + : _query_options.query_timeout; +} + +TUniqueId CoordinatorContext::query_id() const { + return _query_id; +} + +std::shared_ptr CoordinatorContext::resource_ctx() const { + return _resource_ctx; +} + +std::shared_ptr CoordinatorContext::query_mem_tracker() const { + DCHECK(_resource_ctx->memory_context()->mem_tracker() != nullptr); + return _resource_ctx->memory_context()->mem_tracker(); +} + +} // namespace doris diff --git a/be/src/runtime/coordinator_context.h b/be/src/runtime/coordinator_context.h new file mode 100644 index 00000000000000..fb7ac0e5714fc7 --- /dev/null +++ b/be/src/runtime/coordinator_context.h @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include +#include + +#include "runtime/query_context.h" +#include "runtime_filter/runtime_filter_mgr.h" + +namespace doris { + +class CoordinatorContext; +class ResourceContext; + +// The role of CoordinatorContext is similar to QueryContext, but its lifecycle is longer than QueryContext. +// CoordinatorContext will exist until the entire query ends, rather than being released as the current BE task ends like QueryContext. +// It is mainly used to store runtime states that need coordination between BEs, such as the MergeControllerHandler of RuntimeFilter. +// This way, even if the QueryContext of one BE has been released, other BEs can still access these coordination states through CoordinatorContext to ensure the correctness and consistency of the query. +// QueryContext hold shared_ptr of CoordinatorContext, and CoordinatorContext hold weak_ptr of QueryContext to avoid circular references. +class CoordinatorContext { +public: + CoordinatorContext(TUniqueId query_id, TQueryOptions query_options, + std::weak_ptr query_ctx); + ~CoordinatorContext(); + + std::string debug_string() const; + + void set_merge_controller_handler(std::shared_ptr& handler); + std::shared_ptr get_merge_controller_handler() const; + + const TQueryOptions& query_options() const; + + std::weak_ptr weak_query_ctx() const; + + int execution_timeout() const; + + TUniqueId query_id() const; + + std::shared_ptr resource_ctx() const; + + std::shared_ptr query_mem_tracker() const; + + bool is_timeout(timespec now) const { + if (_timeout_second <= 0) { + return false; + } + return _query_watcher.elapsed_time_seconds(now) > _timeout_second; + } + + int64_t get_remaining_query_time_seconds() const { + timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); + if (is_timeout(now)) { + return -1; + } + int64_t elapsed_seconds = _query_watcher.elapsed_time_seconds(now); + return _timeout_second - elapsed_seconds; + } + +private: + TUniqueId _query_id; + TQueryOptions _query_options; + std::weak_ptr _query_ctx; + + std::shared_ptr _resource_ctx; + + // This shared ptr is never used. It is just a reference to hold the object. + // There is a weak ptr in runtime filter manager to reference this object. + std::shared_ptr _merge_controller_handler; + + MonotonicStopWatch _query_watcher; + int _timeout_second; +}; + +} // namespace doris diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 88bb449fc4018e..f7257c7cb36a06 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -64,6 +64,7 @@ #include "io/fs/stream_load_pipe.h" #include "pipeline/pipeline_fragment_context.h" #include "runtime/client_cache.h" +#include "runtime/coordinator_context.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "runtime/frontend_info.h" @@ -75,7 +76,6 @@ #include "runtime/stream_load/stream_load_context.h" #include "runtime/stream_load/stream_load_executor.h" #include "runtime/thread_context.h" -#include "runtime/types.h" #include "runtime/workload_group/workload_group.h" #include "runtime/workload_group/workload_group_manager.h" #include "runtime_filter/runtime_filter_consumer.h" @@ -728,6 +728,10 @@ Status FragmentMgr::_get_or_create_query_ctx(const TPipelineFragmentParams& para query_ctx = QueryContext::create(query_id, _exec_env, params.query_options, params.coord, params.is_nereids, params.current_connect_fe, query_source); + auto coordinator_context = std::make_shared( + query_id, parent.query_options, query_ctx); + _coordinator_context_map.insert(query_id, coordinator_context); + query_ctx->set_coordinator_context(coordinator_context); SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker()); RETURN_IF_ERROR(DescriptorTbl::create( &(query_ctx->obj_pool), params.desc_tbl, &(query_ctx->desc_tbl))); @@ -755,9 +759,8 @@ Status FragmentMgr::_get_or_create_query_ctx(const TPipelineFragmentParams& para if (info.__isset.runtime_filter_params) { auto handler = std::make_shared(); - RETURN_IF_ERROR( - handler->init(query_ctx, info.runtime_filter_params)); - query_ctx->set_merge_controller_handler(handler); + RETURN_IF_ERROR(handler->init(info.runtime_filter_params)); + coordinator_context->set_merge_controller_handler(handler); query_ctx->runtime_filter_mgr()->set_runtime_filter_params( info.runtime_filter_params); @@ -787,6 +790,16 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) { timespec now; clock_gettime(CLOCK_MONOTONIC, &now); + _coordinator_context_map.apply( + [&](phmap::flat_hash_map>& map) + -> Status { + for (auto& it : map) { + fmt::format_to(debug_string_buffer, "CoordinatorContext: {}\n", + it.second->debug_string()); + } + return Status::OK(); + }); + _pipeline_map.apply([&](phmap::flat_hash_map< std::pair, std::shared_ptr>& map) @@ -804,13 +817,6 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) { debug_string_buffer, "QueryId: {}, global_runtime_filter_mgr: {}\n", print_id(it.first.first), it.second->get_query_ctx()->runtime_filter_mgr()->debug_string()); - - if (it.second->get_query_ctx()->get_merge_controller_handler()) { - fmt::format_to(debug_string_buffer, "{}\n", - it.second->get_query_ctx() - ->get_merge_controller_handler() - ->debug_string()); - } } auto timeout_second = it.second->timeout_second(); @@ -902,6 +908,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, } void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) { + _coordinator_context_map.erase(query_id); std::shared_ptr query_ctx = nullptr; { if (auto q_ctx = get_query_ctx(query_id)) { @@ -963,6 +970,20 @@ void FragmentMgr::cancel_worker() { std::unordered_map, BrpcItem> brpc_stub_with_queries; { + _coordinator_context_map.apply( + [&](phmap::flat_hash_map>& map) + -> Status { + for (auto& it : map) { + if (auto coord_ctx = it.second; coord_ctx->is_timeout(now)) { + LOG_WARNING( + "Query {} is timeout, but CoordinatorContext still exist", + print_id(it.first)); + queries_timeout.push_back(it.first); + } + } + return Status::OK(); + }); + std::vector> contexts; _query_ctx_map.apply([&](phmap::flat_hash_map>& map) -> Status { @@ -1311,8 +1332,8 @@ Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) { return Status::EndOfFile("inject FragmentMgr::send_filter_size.return_eof"); } - if (auto q_ctx = get_query_ctx(query_id)) { - return q_ctx->get_merge_controller_handler()->send_filter_size(q_ctx, request); + if (auto q_handle = _coordinator_context_map.find(query_id); q_handle != nullptr) { + return q_handle->get_merge_controller_handler()->send_filter_size(q_handle, request); } else { return Status::EndOfFile( "Send filter size failed: Query context (query-id: {}) not found, maybe " @@ -1348,12 +1369,12 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, TUniqueId query_id; query_id.__set_hi(queryid.hi); query_id.__set_lo(queryid.lo); - if (auto q_ctx = get_query_ctx(query_id)) { - SCOPED_ATTACH_TASK(q_ctx.get()); - if (!q_ctx->get_merge_controller_handler()) { + if (auto q_handle = _coordinator_context_map.find(query_id); q_handle != nullptr) { + SCOPED_ATTACH_TASK(q_handle->resource_ctx()); + if (!q_handle->get_merge_controller_handler()) { return Status::InternalError("Merge filter failed: Merge controller handler is null"); } - return q_ctx->get_merge_controller_handler()->merge(q_ctx, request, attach_data); + return q_handle->get_merge_controller_handler()->merge(q_handle, request, attach_data); } else { return Status::EndOfFile( "Merge filter size failed: Query context (query-id: {}) already finished", diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 826bca81901e69..b7745d89ec7363 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -53,6 +53,7 @@ namespace pipeline { class PipelineFragmentContext; } // namespace pipeline class QueryContext; +class CoordinatorContext; class ExecEnv; class ThreadPool; class PExecPlanFragmentStartRequest; @@ -209,6 +210,8 @@ class FragmentMgr : public RestMonitorIface { // query id -> QueryContext ConcurrentContextMap, QueryContext> _query_ctx_map; + ConcurrentContextMap, CoordinatorContext> + _coordinator_context_map; CountDownLatch _stop_background_threads_latch; std::shared_ptr _cancel_thread; diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 2b142f9a9f0abc..c78962dafef016 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -23,7 +23,6 @@ #include #include -#include #include #include #include @@ -32,7 +31,6 @@ #include "common/logging.h" #include "common/status.h" -#include "olap/olap_common.h" #include "pipeline/dependency.h" #include "pipeline/pipeline_fragment_context.h" #include "runtime/exec_env.h" @@ -43,10 +41,8 @@ #include "runtime/thread_context.h" #include "runtime/workload_group/workload_group_manager.h" #include "runtime/workload_management/query_task_controller.h" -#include "runtime_filter/runtime_filter_definitions.h" #include "util/mem_info.h" #include "util/uid_util.h" -#include "vec/spill/spill_stream_manager.h" namespace doris { @@ -92,15 +88,13 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv* exec_env, const TQueryOptions& query_options, TNetworkAddress coord_addr, bool is_nereids, TNetworkAddress current_connect_fe, QuerySource query_source) - : _timeout_second(-1), - _query_id(std::move(query_id)), + : _query_id(std::move(query_id)), _exec_env(exec_env), _is_nereids(is_nereids), _query_options(query_options), _query_source(query_source) { _init_resource_context(); SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker()); - _query_watcher.start(); _execution_dependency = pipeline::Dependency::create_unique(-1, -1, "ExecutionDependency", false); _memory_sufficient_dependency = @@ -108,8 +102,6 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv* exec_env, _runtime_filter_mgr = std::make_unique(true); - _timeout_second = query_options.execution_timeout; - bool is_query_type_valid = query_options.query_type == TQueryType::SELECT || query_options.query_type == TQueryType::LOAD || query_options.query_type == TQueryType::EXTERNAL; @@ -193,53 +185,18 @@ void QueryContext::init_query_task_controller() { QueryContext::~QueryContext() { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker()); - // query mem tracker consumption is equal to 0, it means that after QueryContext is created, - // it is found that query already exists in _query_ctx_map, and query mem tracker is not used. - // query mem tracker consumption is not equal to 0 after use, because there is memory consumed - // on query mem tracker, released on other trackers. - std::string mem_tracker_msg; - if (query_mem_tracker()->peak_consumption() != 0) { - mem_tracker_msg = fmt::format( - "deregister query/load memory tracker, queryId={}, Limit={}, CurrUsed={}, " - "PeakUsed={}", - print_id(_query_id), PrettyPrinter::print_bytes(query_mem_tracker()->limit()), - PrettyPrinter::print_bytes(query_mem_tracker()->consumption()), - PrettyPrinter::print_bytes(query_mem_tracker()->peak_consumption())); - } - [[maybe_unused]] uint64_t group_id = 0; - if (workload_group()) { - group_id = workload_group()->id(); // before remove - } - - _resource_ctx->task_controller()->finish(); - if (enable_profile()) { _report_query_profile(); } -#ifndef BE_TEST - if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) [[unlikely]] { - try { - ExecEnv::GetInstance()->pipeline_tracer_context()->end_query(_query_id, group_id); - } catch (std::exception& e) { - LOG(WARNING) << "Dump trace log failed bacause " << e.what(); - } - } -#endif _runtime_filter_mgr.reset(); _execution_dependency.reset(); _runtime_predicates.clear(); file_scan_range_params_map.clear(); obj_pool.clear(); - if (_merge_controller_handler) { - _merge_controller_handler->release_undone_filters(this); - } - _merge_controller_handler.reset(); DorisMetrics::instance()->query_ctx_cnt->increment(-1); ExecEnv::GetInstance()->fragment_mgr()->remove_query_context(this->_query_id); - // the only one msg shows query's end. any other msg should append to it if need. - LOG_INFO("Query {} deconstructed, mem_tracker: {}", print_id(this->_query_id), mem_tracker_msg); } void QueryContext::set_ready_to_execute(Status reason) { @@ -497,4 +454,12 @@ TReportExecStatusParams QueryContext::get_realtime_exec_status() { return exec_status; } +bool QueryContext::is_timeout(timespec now) const { + return _coordinator_context->is_timeout(now); +} + +int64_t QueryContext::get_remaining_query_time_seconds() const { + return _coordinator_context->get_remaining_query_time_seconds(); +} + } // namespace doris diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 32458cdb00ec1f..eb070321745a40 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -22,7 +22,6 @@ #include #include -#include #include #include #include @@ -32,18 +31,18 @@ #include "common/config.h" #include "common/factory_creator.h" #include "common/object_pool.h" +#include "runtime/coordinator_context.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/runtime_predicate.h" #include "runtime/workload_management/resource_context.h" -#include "runtime_filter/runtime_filter_mgr.h" -#include "util/hash_util.hpp" -#include "util/threadpool.h" #include "vec/exec/scan/scanner_scheduler.h" #include "workload_group/workload_group.h" namespace doris { +class CoordinatorContext; + namespace pipeline { class PipelineFragmentContext; class PipelineTask; @@ -100,22 +99,9 @@ class QueryContext : public std::enable_shared_from_this { ExecEnv* exec_env() const { return _exec_env; } - bool is_timeout(timespec now) const { - if (_timeout_second <= 0) { - return false; - } - return _query_watcher.elapsed_time_seconds(now) > _timeout_second; - } + bool is_timeout(timespec now) const; - int64_t get_remaining_query_time_seconds() const { - timespec now; - clock_gettime(CLOCK_MONOTONIC, &now); - if (is_timeout(now)) { - return -1; - } - int64_t elapsed_seconds = _query_watcher.elapsed_time_seconds(now); - return _timeout_second - elapsed_seconds; - } + int64_t get_remaining_query_time_seconds() const; void set_ready_to_execute(Status reason); @@ -177,12 +163,6 @@ class QueryContext : public std::enable_shared_from_this { return _query_options.__isset.fe_process_uuid ? _query_options.fe_process_uuid : 0; } - bool ignore_runtime_filter_error() const { - return _query_options.__isset.ignore_runtime_filter_error - ? _query_options.ignore_runtime_filter_error - : false; - } - bool enable_force_spill() const { return _query_options.__isset.enable_force_spill && _query_options.enable_force_spill; } @@ -214,14 +194,6 @@ class QueryContext : public std::enable_shared_from_this { doris::pipeline::TaskScheduler* get_pipe_exec_scheduler(); - void set_merge_controller_handler( - std::shared_ptr& handler) { - _merge_controller_handler = handler; - } - std::shared_ptr get_merge_controller_handler() const { - return _merge_controller_handler; - } - bool is_nereids() const { return _is_nereids; } WorkloadGroupPtr workload_group() const { return _resource_ctx->workload_group(); } @@ -299,13 +271,16 @@ class QueryContext : public std::enable_shared_from_this { void set_first_error_msg(std::string error_msg); std::string get_first_error_msg(); + void set_coordinator_context(std::shared_ptr coordinator_context) { + _coordinator_context = coordinator_context; + } + private: friend class QueryTaskController; + friend class CoordinatorContext; - int _timeout_second; TUniqueId _query_id; ExecEnv* _exec_env = nullptr; - MonotonicStopWatch _query_watcher; bool _is_nereids = false; std::shared_ptr _resource_ctx; @@ -330,10 +305,6 @@ class QueryContext : public std::enable_shared_from_this { // This dependency indicates if memory is sufficient to execute. std::unique_ptr _memory_sufficient_dependency; - // This shared ptr is never used. It is just a reference to hold the object. - // There is a weak ptr in runtime filter manager to reference this object. - std::shared_ptr _merge_controller_handler; - std::map> _fragment_id_to_pipeline_ctx; std::mutex _pipeline_map_write_lock; @@ -377,6 +348,8 @@ class QueryContext : public std::enable_shared_from_this { std::string _load_error_url; std::string _first_error_msg; + std::shared_ptr _coordinator_context; + public: // when fragment of pipeline is closed, it will register its profile to this map by using add_fragment_profile void add_fragment_profile( @@ -392,8 +365,6 @@ class QueryContext : public std::enable_shared_from_this { timespec get_query_arrival_timestamp() const { return this->_query_arrival_timestamp; } QuerySource get_query_source() const { return this->_query_source; } - - const TQueryOptions get_query_options() const { return _query_options; } }; } // namespace doris diff --git a/be/src/runtime_filter/runtime_filter_merger.h b/be/src/runtime_filter/runtime_filter_merger.h index 00bac84547389a..932d546e268086 100644 --- a/be/src/runtime_filter/runtime_filter_merger.h +++ b/be/src/runtime_filter/runtime_filter_merger.h @@ -35,9 +35,9 @@ class RuntimeFilterMerger : public RuntimeFilter { READY // Collecting all products(_received_producer_num == _expected_producer_num) will transfer to this state, and filter is already available }; - static Status create(const QueryContext* query_ctx, const TRuntimeFilterDesc* desc, + static Status create(const TRuntimeFilterDesc* desc, std::shared_ptr* res) { - *res = std::shared_ptr(new RuntimeFilterMerger(query_ctx, desc)); + *res = std::shared_ptr(new RuntimeFilterMerger(desc)); vectorized::VExprContextSPtr build_ctx; RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(desc->src_expr, build_ctx)); (*res)->_wrapper = std::make_shared( @@ -104,7 +104,7 @@ class RuntimeFilterMerger : public RuntimeFilter { } private: - RuntimeFilterMerger(const QueryContext* query_ctx, const TRuntimeFilterDesc* desc) + RuntimeFilterMerger(const TRuntimeFilterDesc* desc) : RuntimeFilter(desc), _rf_state(State::WAITING_FOR_PRODUCT) {} std::atomic _rf_state; diff --git a/be/src/runtime_filter/runtime_filter_mgr.cpp b/be/src/runtime_filter/runtime_filter_mgr.cpp index 48645ffdb8e3b0..2b5430c4614e97 100644 --- a/be/src/runtime_filter/runtime_filter_mgr.cpp +++ b/be/src/runtime_filter/runtime_filter_mgr.cpp @@ -32,12 +32,12 @@ #include "common/config.h" #include "common/logging.h" #include "common/status.h" +#include "runtime/coordinator_context.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker.h" #include "runtime/query_context.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" -#include "runtime_filter/runtime_filter.h" #include "runtime_filter/runtime_filter_consumer.h" #include "runtime_filter/runtime_filter_merger.h" #include "runtime_filter/runtime_filter_producer.h" @@ -104,7 +104,7 @@ Status LocalMergeContext::register_producer(const QueryContext* query_ctx, std::shared_ptr producer) { std::lock_guard l(mtx); if (!merger) { - RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, desc, &merger)); + RETURN_IF_ERROR(RuntimeFilterMerger::create(desc, &merger)); } producers.emplace_back(producer); merger->set_expected_producer_num(cast_set(producers.size())); @@ -144,7 +144,7 @@ Status RuntimeFilterMgr::register_producer_filter( if (_producer_id_set.contains(key)) { return Status::InvalidArgument("filter {} has been registered", key); } - RETURN_IF_ERROR(RuntimeFilterProducer::create(query_ctx, &desc, producer)); + RETURN_IF_ERROR(RuntimeFilterProducer::create(query_ctx->query_options(), &desc, producer)); _producer_id_set.insert(key); return Status::OK(); } @@ -169,7 +169,7 @@ Status RuntimeFilterMgr::get_merge_addr(TNetworkAddress* addr) { } Status RuntimeFilterMergeControllerEntity::_init_with_desc( - std::shared_ptr query_ctx, const TRuntimeFilterDesc* runtime_filter_desc, + const TRuntimeFilterDesc* runtime_filter_desc, const std::vector&& targetv2_info, const int producer_size) { auto filter_id = runtime_filter_desc->filter_id; GlobalMergeContext* cnt_val; @@ -182,15 +182,13 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( // so we need to copy to cnt_val cnt_val->runtime_filter_desc = *runtime_filter_desc; cnt_val->targetv2_info = targetv2_info; - RETURN_IF_ERROR( - RuntimeFilterMerger::create(query_ctx.get(), runtime_filter_desc, &cnt_val->merger)); + RETURN_IF_ERROR(RuntimeFilterMerger::create(runtime_filter_desc, &cnt_val->merger)); cnt_val->merger->set_expected_producer_num(producer_size); return Status::OK(); } -Status RuntimeFilterMergeControllerEntity::init(std::shared_ptr query_ctx, - const TRuntimeFilterParams& runtime_filter_params) { +Status RuntimeFilterMergeControllerEntity::init(const TRuntimeFilterParams& runtime_filter_params) { _mem_tracker = std::make_shared("RuntimeFilterMergeControllerEntity(experimental)"); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); if (runtime_filter_params.__isset.rid_to_runtime_filter) { @@ -207,7 +205,7 @@ Status RuntimeFilterMergeControllerEntity::init(std::shared_ptr qu } RETURN_IF_ERROR(_init_with_desc( - query_ctx, &filterid_to_desc.second, + &filterid_to_desc.second, targetv2_iter == runtime_filter_params.rid_to_target_paramv2.end() ? std::vector {} : targetv2_iter->second, @@ -217,8 +215,9 @@ Status RuntimeFilterMergeControllerEntity::init(std::shared_ptr qu return Status::OK(); } -Status RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptr query_ctx, - const PSendFilterSizeRequest* request) { +Status RuntimeFilterMergeControllerEntity::send_filter_size( + std::shared_ptr coordinator_context, + const PSendFilterSizeRequest* request) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); auto filter_id = request->filter_id(); @@ -238,8 +237,9 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptradd_rf_size(request->filter_size())) { - auto ctx = query_ctx->ignore_runtime_filter_error() ? std::weak_ptr {} - : query_ctx; + auto ctx = coordinator_context->query_options().ignore_runtime_filter_error + ? std::weak_ptr {} + : coordinator_context->weak_query_ctx(); for (auto addr : cnt_val.source_addrs) { std::shared_ptr stub( ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(addr)); @@ -256,10 +256,10 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptr::create_shared(), ctx); auto* pquery_id = closure->request_->mutable_query_id(); - pquery_id->set_hi(query_ctx->query_id().hi); - pquery_id->set_lo(query_ctx->query_id().lo); + pquery_id->set_hi(coordinator_context->query_id().hi); + pquery_id->set_lo(coordinator_context->query_id().lo); closure->cntl_->set_timeout_ms( - get_execution_rpc_timeout_ms(query_ctx->execution_timeout())); + get_execution_rpc_timeout_ms(coordinator_context->execution_timeout())); if (config::execution_ignore_eovercrowded) { closure->cntl_->ignore_eovercrowded(); } @@ -303,9 +303,9 @@ std::string RuntimeFilterMgr::debug_string() { } // merge data -Status RuntimeFilterMergeControllerEntity::merge(std::shared_ptr query_ctx, - const PMergeFilterRequest* request, - butil::IOBufAsZeroCopyInputStream* attach_data) { +Status RuntimeFilterMergeControllerEntity::merge( + std::shared_ptr coordinator_context, const PMergeFilterRequest* request, + butil::IOBufAsZeroCopyInputStream* attach_data) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); int64_t merge_time = 0; auto filter_id = request->filter_id(); @@ -328,8 +328,8 @@ Status RuntimeFilterMergeControllerEntity::merge(std::shared_ptr q return Status::OK(); } std::shared_ptr tmp_filter; - RETURN_IF_ERROR(RuntimeFilterProducer::create(query_ctx.get(), &cnt_val.runtime_filter_desc, - &tmp_filter)); + RETURN_IF_ERROR(RuntimeFilterProducer::create(coordinator_context->query_options(), + &cnt_val.runtime_filter_desc, &tmp_filter)); RETURN_IF_ERROR(tmp_filter->assign(*request, attach_data)); @@ -341,10 +341,11 @@ Status RuntimeFilterMergeControllerEntity::merge(std::shared_ptr q if (is_ready) { return _send_rf_to_target(cnt_val, - query_ctx->ignore_runtime_filter_error() + coordinator_context->query_options().ignore_runtime_filter_error ? std::weak_ptr {} - : query_ctx, - merge_time, request->query_id(), query_ctx->execution_timeout()); + : coordinator_context->weak_query_ctx(), + merge_time, request->query_id(), + coordinator_context->execution_timeout()); } return Status::OK(); } @@ -433,30 +434,6 @@ Status RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext return st; } -void RuntimeFilterMergeControllerEntity::release_undone_filters(QueryContext* query_ctx) { - std::unique_lock guard(_filter_map_mutex); - for (auto& [filter_id, ctx] : _filter_map) { - if (!ctx.done && !ctx.targetv2_info.empty()) { - { - std::lock_guard l(ctx.mtx); - ctx.merger->set_wrapper_state_and_ready_to_apply( - RuntimeFilterWrapper::State::DISABLED, - "rf coordinator's query context released before runtime filter is ready to " - "apply"); - } - auto st = _send_rf_to_target(ctx, std::weak_ptr {}, 0, - UniqueId(query_ctx->query_id()).to_proto(), - query_ctx->execution_timeout()); - if (!st.ok()) { - LOG(WARNING) - << "Failed to send runtime filter to target before query done. filter_id:" - << filter_id << " " << ctx.merger->debug_string() << " reason:" << st; - } - } - } - _filter_map.clear(); -} - std::string RuntimeFilterMergeControllerEntity::debug_string() { std::string result = "RuntimeFilterMergeControllerEntity Info:\n"; std::shared_lock guard(_filter_map_mutex); diff --git a/be/src/runtime_filter/runtime_filter_mgr.h b/be/src/runtime_filter/runtime_filter_mgr.h index 160babf278d119..0ce21b832b6b8c 100644 --- a/be/src/runtime_filter/runtime_filter_mgr.h +++ b/be/src/runtime_filter/runtime_filter_mgr.h @@ -55,6 +55,7 @@ class RuntimeFilterWrapper; class QueryContext; class ExecEnv; class RuntimeProfile; +class CoordinatorContext; struct LocalMergeContext { std::mutex mtx; @@ -138,23 +139,20 @@ class RuntimeFilterMgr { // the class is destroyed with the last fragment_exec. class RuntimeFilterMergeControllerEntity { public: - Status init(std::shared_ptr query_ctx, - const TRuntimeFilterParams& runtime_filter_params); + Status init(const TRuntimeFilterParams& runtime_filter_params); // handle merge rpc - Status merge(std::shared_ptr query_ctx, const PMergeFilterRequest* request, + Status merge(std::shared_ptr coordinator_context, + const PMergeFilterRequest* request, butil::IOBufAsZeroCopyInputStream* attach_data); - Status send_filter_size(std::shared_ptr query_ctx, + Status send_filter_size(std::shared_ptr coordinator_context, const PSendFilterSizeRequest* request); std::string debug_string(); - void release_undone_filters(QueryContext* query_ctx); - private: - Status _init_with_desc(std::shared_ptr query_ctx, - const TRuntimeFilterDesc* runtime_filter_desc, + Status _init_with_desc(const TRuntimeFilterDesc* runtime_filter_desc, const std::vector&& target_info, const int producer_size); diff --git a/be/src/runtime_filter/runtime_filter_producer.h b/be/src/runtime_filter/runtime_filter_producer.h index 0edf85cd1d9341..3dcf6628deb1ae 100644 --- a/be/src/runtime_filter/runtime_filter_producer.h +++ b/be/src/runtime_filter/runtime_filter_producer.h @@ -46,10 +46,10 @@ class RuntimeFilterProducer : public RuntimeFilter { PUBLISHED = 4 // Publish is complete, entering the final state of rf }; - static Status create(const QueryContext* query_ctx, const TRuntimeFilterDesc* desc, + static Status create(const TQueryOptions& query_options, const TRuntimeFilterDesc* desc, std::shared_ptr* res) { - *res = std::shared_ptr(new RuntimeFilterProducer(query_ctx, desc)); - RETURN_IF_ERROR((*res)->_init_with_desc(desc, &query_ctx->query_options())); + *res = std::shared_ptr(new RuntimeFilterProducer(desc)); + RETURN_IF_ERROR((*res)->_init_with_desc(desc, &query_options)); return Status::OK(); } @@ -132,7 +132,7 @@ class RuntimeFilterProducer : public RuntimeFilter { } private: - RuntimeFilterProducer(const QueryContext* query_ctx, const TRuntimeFilterDesc* desc) + RuntimeFilterProducer(const TRuntimeFilterDesc* desc) : RuntimeFilter(desc), _is_broadcast_join(desc->is_broadcast_join) {} Status _send_to_remote_targets(RuntimeState* state, RuntimeFilter* merger_filter); diff --git a/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp b/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp index e0dbc33e716d69..c4c46d1db1a5b3 100644 --- a/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp +++ b/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp @@ -89,8 +89,8 @@ TEST_F(RuntimeFilterConsumerHelperTest, basic) { ASSERT_EQ(conjuncts.size(), 0); std::shared_ptr producer; - FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create( - _query_ctx.get(), runtime_filter_descs.data(), &producer)); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + RuntimeFilterProducer::create(_query_options, runtime_filter_descs.data(), &producer)); producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY); helper._consumers[0]->signal(producer.get()); diff --git a/be/test/runtime_filter/runtime_filter_consumer_test.cpp b/be/test/runtime_filter/runtime_filter_consumer_test.cpp index 77ee21368c00a1..aa3a5418566228 100644 --- a/be/test/runtime_filter/runtime_filter_consumer_test.cpp +++ b/be/test/runtime_filter/runtime_filter_consumer_test.cpp @@ -35,7 +35,7 @@ class RuntimeFilterConsumerTest : public RuntimeFilterTest { std::shared_ptr producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer)); + RuntimeFilterProducer::create(_query_options, &desc, &producer)); FAIL_IF_ERROR_OR_CATCH_EXCEPTION(producer->init(123)); producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY); @@ -120,7 +120,7 @@ TEST_F(RuntimeFilterConsumerTest, timeout_aquire) { std::shared_ptr producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer)); + RuntimeFilterProducer::create(_query_options, &desc, &producer)); producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY); std::vector push_exprs; @@ -156,7 +156,7 @@ TEST_F(RuntimeFilterConsumerTest, aquire_disabled) { std::shared_ptr producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer)); + RuntimeFilterProducer::create(_query_options, &desc, &producer)); producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::DISABLED); std::vector push_exprs; @@ -221,7 +221,7 @@ TEST_F(RuntimeFilterConsumerTest, aquire_signal_at_same_time) { std::shared_ptr producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer)); + RuntimeFilterProducer::create(_query_options, &desc, &producer)); producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY); std::vector push_exprs; diff --git a/be/test/runtime_filter/runtime_filter_merger_test.cpp b/be/test/runtime_filter/runtime_filter_merger_test.cpp index 2c62c0de8b039b..5864744d1173d0 100644 --- a/be/test/runtime_filter/runtime_filter_merger_test.cpp +++ b/be/test/runtime_filter/runtime_filter_merger_test.cpp @@ -33,8 +33,7 @@ class RuntimeFilterMergerTest : public RuntimeFilterTest { RuntimeFilterWrapper::State second_expected_state) { std::shared_ptr merger; auto desc = TRuntimeFilterDescBuilder().build(); - FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterMerger::create(_query_ctx.get(), &desc, &merger)); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterMerger::create(&desc, &merger)); merger->set_expected_producer_num(2); ASSERT_FALSE(merger->ready()); ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::UNINITED); @@ -61,8 +60,7 @@ class RuntimeFilterMergerTest : public RuntimeFilterTest { .set_type(TRuntimeFilterType::IN_OR_BLOOM) .build()) { std::shared_ptr merger; - FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterMerger::create(_query_ctx.get(), &desc, &merger)); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterMerger::create(&desc, &merger)); merger->set_expected_producer_num(1); ASSERT_FALSE(merger->ready()); @@ -81,7 +79,7 @@ class RuntimeFilterMergerTest : public RuntimeFilterTest { std::shared_ptr deserialized_producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterProducer::create(_query_ctx.get(), &desc, &deserialized_producer)); + RuntimeFilterProducer::create(_query_options, &desc, &deserialized_producer)); butil::IOBuf buf; buf.append(data, len); butil::IOBufAsZeroCopyInputStream stream(buf); @@ -98,7 +96,7 @@ TEST_F(RuntimeFilterMergerTest, basic) { TEST_F(RuntimeFilterMergerTest, add_rf_size) { std::shared_ptr merger; auto desc = TRuntimeFilterDescBuilder().build(); - FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterMerger::create(_query_ctx.get(), &desc, &merger)); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterMerger::create(&desc, &merger)); merger->set_expected_producer_num(2); ASSERT_FALSE(merger->add_rf_size(123)); @@ -117,7 +115,7 @@ TEST_F(RuntimeFilterMergerTest, add_rf_size) { TEST_F(RuntimeFilterMergerTest, invalid_merge) { std::shared_ptr merger; auto desc = TRuntimeFilterDescBuilder().build(); - FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterMerger::create(_query_ctx.get(), &desc, &merger)); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterMerger::create(&desc, &merger)); merger->set_expected_producer_num(1); ASSERT_FALSE(merger->ready()); ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::UNINITED); diff --git a/be/test/runtime_filter/runtime_filter_mgr_test.cpp b/be/test/runtime_filter/runtime_filter_mgr_test.cpp index d8222e201d9a5e..ffb714e845a860 100644 --- a/be/test/runtime_filter/runtime_filter_mgr_test.cpp +++ b/be/test/runtime_filter/runtime_filter_mgr_test.cpp @@ -168,7 +168,7 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMergeControllerEntity) { TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build()) .add_rid_to_target_paramv2(rid, {TRuntimeFilterTargetParamsV2()}) .build(); - EXPECT_FALSE(entity->init(ctx, param).ok()); + EXPECT_FALSE(entity->init(param).ok()); param = TRuntimeFilterParamsBuilder() .add_rid_to_runtime_filter( @@ -177,7 +177,7 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMergeControllerEntity) { .add_runtime_filter_builder_num(rid, 1) .add_rid_to_target_paramv2(rid, {TRuntimeFilterTargetParamsV2()}) .build(); - EXPECT_TRUE(entity->init(ctx, param).ok()); + EXPECT_TRUE(entity->init(param).ok()); } } diff --git a/be/test/runtime_filter/runtime_filter_producer_test.cpp b/be/test/runtime_filter/runtime_filter_producer_test.cpp index 50dee8f1903f99..44fa39d787dd18 100644 --- a/be/test/runtime_filter/runtime_filter_producer_test.cpp +++ b/be/test/runtime_filter/runtime_filter_producer_test.cpp @@ -31,7 +31,7 @@ TEST_F(RuntimeFilterProducerTest, basic) { std::shared_ptr producer; auto desc = TRuntimeFilterDescBuilder().build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer)); + RuntimeFilterProducer::create(_query_options, &desc, &producer)); } TEST_F(RuntimeFilterProducerTest, no_sync_filter_size) { @@ -42,7 +42,7 @@ TEST_F(RuntimeFilterProducerTest, no_sync_filter_size) { .set_is_broadcast_join(true) .build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer)); + RuntimeFilterProducer::create(_query_options, &desc, &producer)); ASSERT_EQ(producer->_need_sync_filter_size, false); ASSERT_EQ(producer->_rf_state, RuntimeFilterProducer::State::WAITING_FOR_DATA); } @@ -53,7 +53,7 @@ TEST_F(RuntimeFilterProducerTest, no_sync_filter_size) { .set_is_broadcast_join(false) .build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer)); + RuntimeFilterProducer::create(_query_options, &desc, &producer)); ASSERT_EQ(producer->_need_sync_filter_size, false); ASSERT_EQ(producer->_rf_state, RuntimeFilterProducer::State::WAITING_FOR_DATA); } @@ -66,7 +66,7 @@ TEST_F(RuntimeFilterProducerTest, sync_filter_size) { .set_is_broadcast_join(false) .build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer)); + RuntimeFilterProducer::create(_query_options, &desc, &producer)); ASSERT_EQ(producer->_need_sync_filter_size, true); ASSERT_EQ(producer->_rf_state, RuntimeFilterProducer::State::WAITING_FOR_SEND_SIZE); @@ -85,7 +85,7 @@ TEST_F(RuntimeFilterProducerTest, sync_filter_size_local_no_merge) { .set_is_broadcast_join(false) .build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer)); + RuntimeFilterProducer::create(_query_options, &desc, &producer)); ASSERT_EQ(producer->_need_sync_filter_size, true); ASSERT_EQ(producer->_rf_state, RuntimeFilterProducer::State::WAITING_FOR_SEND_SIZE); diff --git a/be/test/runtime_filter/runtime_filter_test_utils.h b/be/test/runtime_filter/runtime_filter_test_utils.h index 028380938877e6..49bfc77b35414c 100644 --- a/be/test/runtime_filter/runtime_filter_test_utils.h +++ b/be/test/runtime_filter/runtime_filter_test_utils.h @@ -38,6 +38,9 @@ class RuntimeFilterTest : public testing::Test { _query_ctx = QueryContext::create(TUniqueId(), ExecEnv::GetInstance(), _query_options, fe_address, true, fe_address, QuerySource::INTERNAL_FRONTEND); + _coordinator_context = std::make_shared(_query_ctx->query_id(), + _query_options, _query_ctx); + _query_ctx->set_coordinator_context(_coordinator_context); _query_ctx->runtime_filter_mgr()->set_runtime_filter_params( TRuntimeFilterParamsBuilder().build()); @@ -58,6 +61,7 @@ class RuntimeFilterTest : public testing::Test { protected: RuntimeProfile _profile = RuntimeProfile(""); std::shared_ptr _query_ctx; + std::shared_ptr _coordinator_context; TQueryOptions _query_options; const std::string LOCALHOST = BackendOptions::get_localhost(); const int DUMMY_PORT = config::brpc_port;