Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/olap/id_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,13 @@ class IdFileMap {
return it->second;
}

int64_t get_delayed_expired_timestamp() { return delayed_expired_timestamp; }
int64_t get_delayed_expired_timestamp() const { return delayed_expired_timestamp; }

void set_external_scan_params(QueryContext* query_ctx, int max_file_scanners) {
std::call_once(once_flag_for_external, [&] {
DCHECK(query_ctx != nullptr);
_query_global = query_ctx->get_query_globals();
_query_options = query_ctx->get_query_options();
_query_options = query_ctx->query_options();
_file_scan_range_params_map = query_ctx->file_scan_range_params_map;
_max_file_scanners = max_file_scanners;
});
Expand Down
114 changes: 114 additions & 0 deletions be/src/runtime/coordinator_context.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "runtime/coordinator_context.h"

#include "runtime/query_context.h"

namespace doris {

CoordinatorContext::CoordinatorContext(TUniqueId query_id, TQueryOptions query_options,
std::weak_ptr<QueryContext> query_ctx)
: _query_id(std::move(query_id)),
_query_options(std::move(query_options)),
_query_ctx(std::move(query_ctx)),
_timeout_second(query_options.execution_timeout) {
_query_watcher.start();
CHECK(_query_ctx.lock() != nullptr);
_resource_ctx = _query_ctx.lock()->_resource_ctx;
}

CoordinatorContext::~CoordinatorContext() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker());
// query mem tracker consumption is equal to 0, it means that after QueryContext is created,
// it is found that query already exists in _query_ctx_map, and query mem tracker is not used.
// query mem tracker consumption is not equal to 0 after use, because there is memory consumed
// on query mem tracker, released on other trackers.
std::string mem_tracker_msg;
if (query_mem_tracker()->peak_consumption() != 0) {
mem_tracker_msg = fmt::format(
"deregister query/load memory tracker, queryId={}, Limit={}, CurrUsed={}, "
"PeakUsed={}",
print_id(_query_id), PrettyPrinter::print_bytes(query_mem_tracker()->limit()),
PrettyPrinter::print_bytes(query_mem_tracker()->consumption()),
PrettyPrinter::print_bytes(query_mem_tracker()->peak_consumption()));
}
[[maybe_unused]] uint64_t group_id = 0;
if (_resource_ctx->workload_group()) {
group_id = _resource_ctx->workload_group()->id(); // before remove
}

_resource_ctx->task_controller()->finish();

// the only one msg shows query's end. any other msg should append to it if need.
LOG_INFO("Query {} deconstructed, mem_tracker: {}", print_id(this->_query_id), mem_tracker_msg);

#ifndef BE_TEST
if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) [[unlikely]] {
try {
ExecEnv::GetInstance()->pipeline_tracer_context()->end_query(_query_id, group_id);
} catch (std::exception& e) {
LOG(WARNING) << "Dump trace log failed bacause " << e.what();
}
}
#endif
_resource_ctx->memory_context()->mem_tracker();
_merge_controller_handler.reset();
}

std::string CoordinatorContext::debug_string() const {
return fmt::format(
"CoordinatorContext(query_id={}): {}", print_id(_query_id),
_merge_controller_handler ? _merge_controller_handler->debug_string() : "null");
}

void CoordinatorContext::set_merge_controller_handler(
std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
_merge_controller_handler = handler;
}
std::shared_ptr<RuntimeFilterMergeControllerEntity>
CoordinatorContext::get_merge_controller_handler() const {
return _merge_controller_handler;
}

const TQueryOptions& CoordinatorContext::query_options() const {
return _query_options;
}

std::weak_ptr<QueryContext> CoordinatorContext::weak_query_ctx() const {
return _query_ctx;
}

int CoordinatorContext::execution_timeout() const {
return _query_options.__isset.execution_timeout ? _query_options.execution_timeout
: _query_options.query_timeout;
}

TUniqueId CoordinatorContext::query_id() const {
return _query_id;
}

std::shared_ptr<ResourceContext> CoordinatorContext::resource_ctx() const {
return _resource_ctx;
}

std::shared_ptr<MemTrackerLimiter> CoordinatorContext::query_mem_tracker() const {
DCHECK(_resource_ctx->memory_context()->mem_tracker() != nullptr);
return _resource_ctx->memory_context()->mem_tracker();
}

} // namespace doris
94 changes: 94 additions & 0 deletions be/src/runtime/coordinator_context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/Types_types.h>

#include <memory>
#include <string>

#include "runtime/query_context.h"
#include "runtime_filter/runtime_filter_mgr.h"

namespace doris {

class CoordinatorContext;
class ResourceContext;

// The role of CoordinatorContext is similar to QueryContext, but its lifecycle is longer than QueryContext.
// CoordinatorContext will exist until the entire query ends, rather than being released as the current BE task ends like QueryContext.
// It is mainly used to store runtime states that need coordination between BEs, such as the MergeControllerHandler of RuntimeFilter.
// This way, even if the QueryContext of one BE has been released, other BEs can still access these coordination states through CoordinatorContext to ensure the correctness and consistency of the query.
// QueryContext hold shared_ptr of CoordinatorContext, and CoordinatorContext hold weak_ptr of QueryContext to avoid circular references.
class CoordinatorContext {
public:
CoordinatorContext(TUniqueId query_id, TQueryOptions query_options,
std::weak_ptr<QueryContext> query_ctx);
~CoordinatorContext();

std::string debug_string() const;

void set_merge_controller_handler(std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler);
std::shared_ptr<RuntimeFilterMergeControllerEntity> get_merge_controller_handler() const;

const TQueryOptions& query_options() const;

std::weak_ptr<QueryContext> weak_query_ctx() const;

int execution_timeout() const;

TUniqueId query_id() const;

std::shared_ptr<ResourceContext> resource_ctx() const;

std::shared_ptr<MemTrackerLimiter> query_mem_tracker() const;

bool is_timeout(timespec now) const {
if (_timeout_second <= 0) {
return false;
}
return _query_watcher.elapsed_time_seconds(now) > _timeout_second;
}

int64_t get_remaining_query_time_seconds() const {
timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
if (is_timeout(now)) {
return -1;
}
int64_t elapsed_seconds = _query_watcher.elapsed_time_seconds(now);
return _timeout_second - elapsed_seconds;
}

private:
TUniqueId _query_id;
TQueryOptions _query_options;
std::weak_ptr<QueryContext> _query_ctx;

std::shared_ptr<ResourceContext> _resource_ctx;

// This shared ptr is never used. It is just a reference to hold the object.
// There is a weak ptr in runtime filter manager to reference this object.
std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler;

MonotonicStopWatch _query_watcher;
int _timeout_second;
};

} // namespace doris
55 changes: 38 additions & 17 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
#include "io/fs/stream_load_pipe.h"
#include "pipeline/pipeline_fragment_context.h"
#include "runtime/client_cache.h"
#include "runtime/coordinator_context.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/frontend_info.h"
Expand All @@ -75,7 +76,6 @@
#include "runtime/stream_load/stream_load_context.h"
#include "runtime/stream_load/stream_load_executor.h"
#include "runtime/thread_context.h"
#include "runtime/types.h"
#include "runtime/workload_group/workload_group.h"
#include "runtime/workload_group/workload_group_manager.h"
#include "runtime_filter/runtime_filter_consumer.h"
Expand Down Expand Up @@ -728,6 +728,10 @@ Status FragmentMgr::_get_or_create_query_ctx(const TPipelineFragmentParams& para
query_ctx = QueryContext::create(query_id, _exec_env, params.query_options,
params.coord, params.is_nereids,
params.current_connect_fe, query_source);
auto coordinator_context = std::make_shared<CoordinatorContext>(
query_id, parent.query_options, query_ctx);
_coordinator_context_map.insert(query_id, coordinator_context);
query_ctx->set_coordinator_context(coordinator_context);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker());
RETURN_IF_ERROR(DescriptorTbl::create(
&(query_ctx->obj_pool), params.desc_tbl, &(query_ctx->desc_tbl)));
Expand Down Expand Up @@ -755,9 +759,8 @@ Status FragmentMgr::_get_or_create_query_ctx(const TPipelineFragmentParams& para
if (info.__isset.runtime_filter_params) {
auto handler =
std::make_shared<RuntimeFilterMergeControllerEntity>();
RETURN_IF_ERROR(
handler->init(query_ctx, info.runtime_filter_params));
query_ctx->set_merge_controller_handler(handler);
RETURN_IF_ERROR(handler->init(info.runtime_filter_params));
coordinator_context->set_merge_controller_handler(handler);

query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
info.runtime_filter_params);
Expand Down Expand Up @@ -787,6 +790,16 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) {
timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);

_coordinator_context_map.apply(
[&](phmap::flat_hash_map<TUniqueId, std::shared_ptr<CoordinatorContext>>& map)
-> Status {
for (auto& it : map) {
fmt::format_to(debug_string_buffer, "CoordinatorContext: {}\n",
it.second->debug_string());
}
return Status::OK();
});

_pipeline_map.apply([&](phmap::flat_hash_map<
std::pair<TUniqueId, int>,
std::shared_ptr<pipeline::PipelineFragmentContext>>& map)
Expand All @@ -804,13 +817,6 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) {
debug_string_buffer, "QueryId: {}, global_runtime_filter_mgr: {}\n",
print_id(it.first.first),
it.second->get_query_ctx()->runtime_filter_mgr()->debug_string());

if (it.second->get_query_ctx()->get_merge_controller_handler()) {
fmt::format_to(debug_string_buffer, "{}\n",
it.second->get_query_ctx()
->get_merge_controller_handler()
->debug_string());
}
}

auto timeout_second = it.second->timeout_second();
Expand Down Expand Up @@ -902,6 +908,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
}

void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) {
_coordinator_context_map.erase(query_id);
std::shared_ptr<QueryContext> query_ctx = nullptr;
{
if (auto q_ctx = get_query_ctx(query_id)) {
Expand Down Expand Up @@ -963,6 +970,20 @@ void FragmentMgr::cancel_worker() {

std::unordered_map<std::shared_ptr<PBackendService_Stub>, BrpcItem> brpc_stub_with_queries;
{
_coordinator_context_map.apply(
[&](phmap::flat_hash_map<TUniqueId, std::shared_ptr<CoordinatorContext>>& map)
-> Status {
for (auto& it : map) {
if (auto coord_ctx = it.second; coord_ctx->is_timeout(now)) {
LOG_WARNING(
"Query {} is timeout, but CoordinatorContext still exist",
print_id(it.first));
queries_timeout.push_back(it.first);
}
}
return Status::OK();
});

std::vector<std::shared_ptr<QueryContext>> contexts;
_query_ctx_map.apply([&](phmap::flat_hash_map<TUniqueId, std::weak_ptr<QueryContext>>&
map) -> Status {
Expand Down Expand Up @@ -1311,8 +1332,8 @@ Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) {
return Status::EndOfFile("inject FragmentMgr::send_filter_size.return_eof");
}

if (auto q_ctx = get_query_ctx(query_id)) {
return q_ctx->get_merge_controller_handler()->send_filter_size(q_ctx, request);
if (auto q_handle = _coordinator_context_map.find(query_id); q_handle != nullptr) {
return q_handle->get_merge_controller_handler()->send_filter_size(q_handle, request);
} else {
return Status::EndOfFile(
"Send filter size failed: Query context (query-id: {}) not found, maybe "
Expand Down Expand Up @@ -1348,12 +1369,12 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
TUniqueId query_id;
query_id.__set_hi(queryid.hi);
query_id.__set_lo(queryid.lo);
if (auto q_ctx = get_query_ctx(query_id)) {
SCOPED_ATTACH_TASK(q_ctx.get());
if (!q_ctx->get_merge_controller_handler()) {
if (auto q_handle = _coordinator_context_map.find(query_id); q_handle != nullptr) {
SCOPED_ATTACH_TASK(q_handle->resource_ctx());
if (!q_handle->get_merge_controller_handler()) {
return Status::InternalError("Merge filter failed: Merge controller handler is null");
}
return q_ctx->get_merge_controller_handler()->merge(q_ctx, request, attach_data);
return q_handle->get_merge_controller_handler()->merge(q_handle, request, attach_data);
} else {
return Status::EndOfFile(
"Merge filter size failed: Query context (query-id: {}) already finished",
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/fragment_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ namespace pipeline {
class PipelineFragmentContext;
} // namespace pipeline
class QueryContext;
class CoordinatorContext;
class ExecEnv;
class ThreadPool;
class PExecPlanFragmentStartRequest;
Expand Down Expand Up @@ -209,6 +210,8 @@ class FragmentMgr : public RestMonitorIface {

// query id -> QueryContext
ConcurrentContextMap<TUniqueId, std::weak_ptr<QueryContext>, QueryContext> _query_ctx_map;
ConcurrentContextMap<TUniqueId, std::shared_ptr<CoordinatorContext>, CoordinatorContext>
_coordinator_context_map;

CountDownLatch _stop_background_threads_latch;
std::shared_ptr<Thread> _cancel_thread;
Expand Down
Loading
Loading