Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 17 additions & 26 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ void SegmentIterator::_initialize_predicate_results() {
// Initialize from _col_predicates
for (auto* pred : _col_predicates) {
int cid = pred->column_id();
_column_predicate_inverted_index_status[cid][pred] = false;
_column_predicate_index_exec_status[cid][pred] = false;
}

_calculate_expr_in_remaining_conjunct_root();
Expand Down Expand Up @@ -667,9 +667,8 @@ Status SegmentIterator::_get_row_ranges_by_column_conditions() {
for (auto it = _common_expr_ctxs_push_down.begin();
it != _common_expr_ctxs_push_down.end();) {
if ((*it)->all_expr_inverted_index_evaluated()) {
const auto* result =
(*it)->get_inverted_index_context()->get_inverted_index_result_for_expr(
(*it)->root().get());
const auto* result = (*it)->get_index_context()->get_index_result_for_expr(
(*it)->root().get());
if (result != nullptr) {
_row_bitmap &= *result->get_data_bitmap();
auto root = (*it)->root();
Expand Down Expand Up @@ -1215,7 +1214,7 @@ Status SegmentIterator::_apply_inverted_index_on_column_predicate(
return Status::OK();
}
if (!pred->is_runtime_filter()) {
_column_predicate_inverted_index_status[pred->column_id()][pred] = true;
_column_predicate_index_exec_status[pred->column_id()][pred] = true;
}
}
return Status::OK();
Expand Down Expand Up @@ -1320,8 +1319,8 @@ bool SegmentIterator::_check_all_conditions_passed_inverted_index_for_column(Col
if (_opts.runtime_state && !_opts.runtime_state->query_options().enable_common_expr_pushdown) {
return false;
}
auto pred_it = _column_predicate_inverted_index_status.find(cid);
if (pred_it != _column_predicate_inverted_index_status.end()) {
auto pred_it = _column_predicate_index_exec_status.find(cid);
if (pred_it != _column_predicate_index_exec_status.end()) {
const auto& pred_map = pred_it->second;
bool pred_passed = std::all_of(pred_map.begin(), pred_map.end(),
[](const auto& pred_entry) { return pred_entry.second; });
Expand All @@ -1332,8 +1331,8 @@ bool SegmentIterator::_check_all_conditions_passed_inverted_index_for_column(Col
}
}

auto expr_it = _common_expr_inverted_index_status.find(cid);
if (expr_it != _common_expr_inverted_index_status.end()) {
auto expr_it = _common_expr_index_exec_status.find(cid);
if (expr_it != _common_expr_index_exec_status.end()) {
const auto& expr_map = expr_it->second;
return std::all_of(expr_map.begin(), expr_map.end(),
[](const auto& expr_entry) { return expr_entry.second; });
Expand Down Expand Up @@ -2794,7 +2793,7 @@ void SegmentIterator::_output_index_result_column_for_expr(uint16_t* sel_rowid_i
}
for (auto& expr_ctx : _common_expr_ctxs_push_down) {
for (auto& inverted_index_result_bitmap_for_expr :
expr_ctx->get_inverted_index_context()->get_inverted_index_result_bitmap()) {
expr_ctx->get_index_context()->get_index_result_bitmap()) {
const auto* expr = inverted_index_result_bitmap_for_expr.first;
const auto& result_bitmap = inverted_index_result_bitmap_for_expr.second;
const auto& index_result_bitmap = result_bitmap.get_data_bitmap();
Expand Down Expand Up @@ -2832,11 +2831,11 @@ void SegmentIterator::_output_index_result_column_for_expr(uint16_t* sel_rowid_i
DCHECK(block->rows() == vec_match_pred.size());

if (null_map_column) {
expr_ctx->get_inverted_index_context()->set_inverted_index_result_column_for_expr(
expr_ctx->get_index_context()->set_index_result_column_for_expr(
expr, vectorized::ColumnNullable::create(std::move(index_result_column),
std::move(null_map_column)));
} else {
expr_ctx->get_inverted_index_context()->set_inverted_index_result_column_for_expr(
expr_ctx->get_index_context()->set_index_result_column_for_expr(
expr, std::move(index_result_column));
}
}
Expand Down Expand Up @@ -2891,14 +2890,14 @@ Status SegmentIterator::current_block_row_locations(std::vector<RowLocation>* bl
}

Status SegmentIterator::_construct_compound_expr_context() {
auto inverted_index_context = std::make_shared<vectorized::InvertedIndexContext>(
auto inverted_index_context = std::make_shared<vectorized::IndexExecContext>(
_schema->column_ids(), _index_iterators, _storage_name_and_type,
_common_expr_inverted_index_status, _score_runtime);
_common_expr_index_exec_status, _score_runtime);
for (const auto& expr_ctx : _opts.common_expr_ctxs_push_down) {
vectorized::VExprContextSPtr context;
// _ann_range_search_runtime will do deep copy.
RETURN_IF_ERROR(expr_ctx->clone(_opts.runtime_state, context));
context->set_inverted_index_context(inverted_index_context);
context->set_index_context(inverted_index_context);
_common_expr_ctxs_push_down.emplace_back(context);
}
return Status::OK();
Expand Down Expand Up @@ -2940,19 +2939,11 @@ void SegmentIterator::_calculate_expr_in_remaining_conjunct_root() {
if (vir_child->is_slot_ref()) {
auto* inner_slot_ref =
assert_cast<vectorized::VSlotRef*>(vir_child.get());
_common_expr_inverted_index_status[_schema->column_id(
_common_expr_index_exec_status[_schema->column_id(
inner_slot_ref->column_id())][expr.get()] = false;
_common_expr_to_slotref_map[root_expr_ctx.get()]
[inner_slot_ref->column_id()] =
expr.get();
// Print debug info for virtual slot expansion
LOG(INFO) << fmt::format(
"common_expr_ctx_ptr: {}, expr_ptr: {}, "
"virtual_slotref_ptr: {}, inner_slotref_ptr: {}, "
"column_id: {}",
fmt::ptr(root_expr_ctx.get()), fmt::ptr(expr.get()),
fmt::ptr(child.get()), fmt::ptr(vir_child.get()),
inner_slot_ref->column_id());
}

if (!vir_child->children().empty()) {
Expand All @@ -2964,8 +2955,8 @@ void SegmentIterator::_calculate_expr_in_remaining_conjunct_root() {
}
if (child->is_slot_ref()) {
auto* column_slot_ref = assert_cast<vectorized::VSlotRef*>(child.get());
_common_expr_inverted_index_status[_schema->column_id(
column_slot_ref->column_id())][expr.get()] = false;
_common_expr_index_exec_status[_schema->column_id(column_slot_ref->column_id())]
[expr.get()] = false;
_common_expr_to_slotref_map[root_expr_ctx.get()][column_slot_ref->column_id()] =
expr.get();
}
Expand Down
13 changes: 9 additions & 4 deletions be/src/olap/rowset/segment_v2/segment_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -488,14 +488,21 @@ class SegmentIterator : public RowwiseIterator {
* a boolean value to indicate whether the column has been read by the index.
*/
std::unordered_map<ColumnId, std::unordered_map<ColumnPredicate*, bool>>
_column_predicate_inverted_index_status;
_column_predicate_index_exec_status;

/*
* column and common expr on it.
* a boolean value to indicate whether the column has been read by the index.
*/
std::unordered_map<ColumnId, std::unordered_map<const vectorized::VExpr*, bool>>
_common_expr_inverted_index_status;
_common_expr_index_exec_status;

/*
* common expr context to slotref map
* slot ref map is used to get slot ref expr by using column id.
*/
std::unordered_map<vectorized::VExprContext*, std::unordered_map<ColumnId, vectorized::VExpr*>>
_common_expr_to_slotref_map;

vectorized::ScoreRuntimeSPtr _score_runtime;

Expand All @@ -513,8 +520,6 @@ class SegmentIterator : public RowwiseIterator {
bool _find_condition_cache = false;
std::shared_ptr<std::vector<bool>> _condition_cache;
static constexpr int CONDITION_CACHE_OFFSET = 2048;
std::unordered_map<vectorized::VExprContext*, std::unordered_map<ColumnId, vectorized::VExpr*>>
_common_expr_to_slotref_map;
};

} // namespace segment_v2
Expand Down
2 changes: 2 additions & 0 deletions be/src/util/threadpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ ThreadPool::~ThreadPool() {
CHECK_EQ(1, _tokens.size()) << absl::Substitute(
"Threadpool $0 destroyed with $1 allocated tokens", _name, _tokens.size());
shutdown();
VLOG_DEBUG << fmt::format("Thread pool {} destroyed", _name);
}

Status ThreadPool::try_create_thread(int thread_num, std::lock_guard<std::mutex>&) {
Expand Down Expand Up @@ -332,6 +333,7 @@ Status ThreadPool::init() {
}

void ThreadPool::shutdown() {
VLOG_DEBUG << fmt::format("Shutting down thread pool {}", _name);
// Why access to doris_metrics is safe here?
// Since DorisMetrics is a singleton, it will be destroyed only after doris_main is exited.
// The shutdown/destroy of ThreadPool is guaranteed to take place before doris_main exits by
Expand Down
20 changes: 8 additions & 12 deletions be/src/vec/exprs/vcompound_pred.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ class VCompoundPred : public VectorizedFnCall {
all_pass = false;
continue;
}
auto inverted_index_context = context->get_inverted_index_context();
if (inverted_index_context->has_inverted_index_result_for_expr(child.get())) {
auto inverted_index_context = context->get_index_context();
if (inverted_index_context->has_index_result_for_expr(child.get())) {
const auto* index_result =
inverted_index_context->get_inverted_index_result_for_expr(child.get());
inverted_index_context->get_index_result_for_expr(child.get());
if (res.is_empty()) {
res = *index_result;
} else {
Expand All @@ -103,11 +103,9 @@ class VCompoundPred : public VectorizedFnCall {
all_pass = false;
continue;
}
if (context->get_inverted_index_context()->has_inverted_index_result_for_expr(
child.get())) {
if (context->get_index_context()->has_index_result_for_expr(child.get())) {
const auto* index_result =
context->get_inverted_index_context()
->get_inverted_index_result_for_expr(child.get());
context->get_index_context()->get_index_result_for_expr(child.get());
if (res.is_empty()) {
res = *index_result;
} else {
Expand All @@ -132,11 +130,9 @@ class VCompoundPred : public VectorizedFnCall {
return st;
}

if (context->get_inverted_index_context()->has_inverted_index_result_for_expr(
child.get())) {
if (context->get_index_context()->has_index_result_for_expr(child.get())) {
const auto* index_result =
context->get_inverted_index_context()->get_inverted_index_result_for_expr(
child.get());
context->get_index_context()->get_index_result_for_expr(child.get());
roaring::Roaring full_result;
full_result.addRange(0, segment_num_rows);
res = index_result->op_not(&full_result);
Expand All @@ -151,7 +147,7 @@ class VCompoundPred : public VectorizedFnCall {
}

if (all_pass && !res.is_empty()) {
context->get_inverted_index_context()->set_inverted_index_result_for_expr(this, res);
context->get_index_context()->set_index_result_for_expr(this, res);
}
return Status::OK();
}
Expand Down
25 changes: 11 additions & 14 deletions be/src/vec/exprs/vexpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ Status VExpr::_evaluate_inverted_index(VExprContext* context, const FunctionBase
column_ids.reserve(estimated_size);
children_exprs.reserve(estimated_size);

auto index_context = context->get_inverted_index_context();
auto index_context = context->get_index_context();

// if child is cast expr, we need to ensure target data type is the same with storage data type.
// or they are all string type
Expand All @@ -855,8 +855,8 @@ Status VExpr::_evaluate_inverted_index(VExprContext* context, const FunctionBase
auto* column_slot_ref = assert_cast<VSlotRef*>(cast_expr->get_child(0).get());
auto column_id = column_slot_ref->column_id();
const auto* storage_name_type =
context->get_inverted_index_context()
->get_storage_name_and_type_by_column_id(column_id);
context->get_index_context()->get_storage_name_and_type_by_column_id(
column_id);
auto storage_type = remove_nullable(storage_name_type->second);
auto target_type = remove_nullable(cast_expr->get_target_type());
auto origin_primitive_type = storage_type->get_primitive_type();
Expand Down Expand Up @@ -898,16 +898,14 @@ Status VExpr::_evaluate_inverted_index(VExprContext* context, const FunctionBase
if (child->is_slot_ref()) {
auto* column_slot_ref = assert_cast<VSlotRef*>(child.get());
auto column_id = column_slot_ref->column_id();
auto* iter =
context->get_inverted_index_context()->get_inverted_index_iterator_by_column_id(
column_id);
auto* iter = context->get_index_context()->get_inverted_index_iterator_by_column_id(
column_id);
//column does not have inverted index
if (iter == nullptr) {
continue;
}
const auto* storage_name_type =
context->get_inverted_index_context()->get_storage_name_and_type_by_column_id(
column_id);
context->get_index_context()->get_storage_name_and_type_by_column_id(column_id);
if (storage_name_type == nullptr) {
auto err_msg = fmt::format(
"storage_name_type cannot be found for column {} while in {} "
Expand Down Expand Up @@ -943,9 +941,9 @@ Status VExpr::_evaluate_inverted_index(VExprContext* context, const FunctionBase
return res;
}
if (!result_bitmap.is_empty()) {
index_context->set_inverted_index_result_for_expr(this, result_bitmap);
index_context->set_index_result_for_expr(this, result_bitmap);
for (int column_id : column_ids) {
index_context->set_true_for_inverted_index_status(this, column_id);
index_context->set_true_for_index_status(this, column_id);
}
}
return Status::OK();
Expand All @@ -970,11 +968,10 @@ size_t VExpr::estimate_memory(const size_t rows) {
}

bool VExpr::fast_execute(VExprContext* context, ColumnPtr& result_column) const {
if (context->get_inverted_index_context() &&
context->get_inverted_index_context()->get_inverted_index_result_column().contains(this)) {
if (context->get_index_context() &&
context->get_index_context()->get_index_result_column().contains(this)) {
// prepare a column to save result
result_column =
context->get_inverted_index_context()->get_inverted_index_result_column()[this];
result_column = context->get_index_context()->get_index_result_column()[this];
if (_data_type->is_nullable()) {
result_column = make_nullable(result_column);
}
Expand Down
5 changes: 2 additions & 3 deletions be/src/vec/exprs/vexpr_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ Status VExprContext::evaluate_inverted_index(uint32_t segment_num_rows) {
}

bool VExprContext::all_expr_inverted_index_evaluated() {
return _inverted_index_context->has_inverted_index_result_for_expr(_root.get());
return _index_context->has_index_result_for_expr(_root.get());
}

Status VExprContext::filter_block(VExprContext* vexpr_ctx, Block* block, size_t column_to_keep) {
Expand Down Expand Up @@ -503,8 +503,7 @@ Status VExprContext::evaluate_ann_range_search(
return Status::OK();
}
const VExpr* slot_ref_expr_addr = slot_ref_map.find(cid)->second;
_inverted_index_context->set_true_for_inverted_index_status(slot_ref_expr_addr,
idx_to_cid[cid]);
_index_context->set_true_for_index_status(slot_ref_expr_addr, idx_to_cid[cid]);

VLOG_DEBUG << fmt::format(
"Evaluate ann range search for expr {}, src_col_idx {}, cid {}, row_bitmap "
Expand Down
Loading
Loading