Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 28 additions & 8 deletions be/src/vec/exec/format/json/new_json_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1599,6 +1599,7 @@ Status NewJsonReader::_simdjson_set_column_value(simdjson::ondemand::object* val
bool has_valid_value = false;
// iterate through object, simdjson::ondemond will parsing on the fly
size_t key_index = 0;

for (auto field : *value) {
std::string_view key = field.unescaped_key();
StringRef name_ref(key.data(), key.size());
Expand Down Expand Up @@ -1627,7 +1628,7 @@ Status NewJsonReader::_simdjson_set_column_value(simdjson::ondemand::object* val
}
simdjson::ondemand::value val = field.value();
auto* column_ptr = block.get_by_position(column_index).column->assume_mutable().get();
RETURN_IF_ERROR(_simdjson_write_data_to_column(
RETURN_IF_ERROR(_simdjson_write_data_to_column<false>(
val, slot_descs[column_index]->type(), column_ptr,
slot_descs[column_index]->col_name(), _serdes[column_index], valid));
if (!(*valid)) {
Expand Down Expand Up @@ -1718,6 +1719,7 @@ Status NewJsonReader::_simdjson_set_column_value(simdjson::ondemand::object* val
return Status::OK();
}

template <bool use_string_cache>
Status NewJsonReader::_simdjson_write_data_to_column(simdjson::ondemand::value& value,
const TypeDescriptor& type_desc,
vectorized::IColumn* column_ptr,
Expand Down Expand Up @@ -1753,7 +1755,20 @@ Status NewJsonReader::_simdjson_write_data_to_column(simdjson::ondemand::value&

if (_is_load || !type_desc.is_complex_type()) {
if (value.type() == simdjson::ondemand::json_type::string) {
std::string_view value_string = value.get_string();
std::string_view value_string;
if constexpr (use_string_cache) {
const auto cache_key = value.raw_json().value();
if (_cached_string_values.contains(cache_key)) {
value_string = _cached_string_values[cache_key];
} else {
value_string = value.get_string();
_cached_string_values.emplace(cache_key, value_string);
}
} else {
DCHECK(_cached_string_values.empty());
value_string = value.get_string();
}

Slice slice {value_string.data(), value_string.size()};
RETURN_IF_ERROR(data_serde->deserialize_one_cell_from_json(*data_column_ptr, slice,
_serde_options));
Expand Down Expand Up @@ -1812,7 +1827,7 @@ Status NewJsonReader::_simdjson_write_data_to_column(simdjson::ondemand::value&
has_value[sub_column_idx] = true;

const auto& sub_col_type = type_desc.children[sub_column_idx];
RETURN_IF_ERROR(_simdjson_write_data_to_column(
RETURN_IF_ERROR(_simdjson_write_data_to_column<use_string_cache>(
sub.value(), sub_col_type, sub_column_ptr, column_name + "." + sub_key,
sub_serdes[sub_column_idx], valid));
}
Expand Down Expand Up @@ -1869,7 +1884,8 @@ Status NewJsonReader::_simdjson_write_data_to_column(simdjson::ondemand::value&
sub_serdes[0], _serde_options, valid));

simdjson::ondemand::value field_value = member_value.value();
RETURN_IF_ERROR(_simdjson_write_data_to_column(

RETURN_IF_ERROR(_simdjson_write_data_to_column<use_string_cache>(
field_value, type_desc.children[1],
map_column_ptr->get_values_ptr()->assume_mutable()->get_ptr(),
column_name + ".value", sub_serdes[1], valid));
Expand All @@ -1892,9 +1908,10 @@ Status NewJsonReader::_simdjson_write_data_to_column(simdjson::ondemand::value&

int field_count = 0;
for (simdjson::ondemand::value sub_value : array_value) {
RETURN_IF_ERROR(_simdjson_write_data_to_column(
RETURN_IF_ERROR(_simdjson_write_data_to_column<use_string_cache>(
sub_value, type_desc.children[0], array_column_ptr->get_data().get_ptr(),
column_name + ".element", sub_serdes[0], valid));

field_count++;
}
auto& offsets = array_column_ptr->get_offsets();
Expand Down Expand Up @@ -2085,6 +2102,9 @@ Status NewJsonReader::_simdjson_write_columns_by_jsonpath(
Block& block, bool* valid) {
// write by jsonpath
bool has_valid_value = false;

Defer clear_defer([this]() { _cached_string_values.clear(); });

for (size_t i = 0; i < slot_descs.size(); i++) {
auto* slot_desc = slot_descs[i];
if (!slot_desc->is_materialized()) {
Expand Down Expand Up @@ -2119,9 +2139,9 @@ Status NewJsonReader::_simdjson_write_columns_by_jsonpath(
return Status::OK();
}
} else {
RETURN_IF_ERROR(_simdjson_write_data_to_column(json_value, slot_desc->type(),
column_ptr, slot_desc->col_name(),
_serdes[i], valid));
RETURN_IF_ERROR(_simdjson_write_data_to_column<true>(json_value, slot_desc->type(),
column_ptr, slot_desc->col_name(),
_serdes[i], valid));
if (!(*valid)) {
return Status::OK();
}
Expand Down
20 changes: 20 additions & 0 deletions be/src/vec/exec/format/json/new_json_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include <memory>
#include <string>
#include <string_view>
#include <unordered_map>
#include <unordered_set>
#include <vector>
Expand Down Expand Up @@ -176,6 +177,7 @@ class NewJsonReader : public GenericReader {
Status _simdjson_set_column_value(simdjson::ondemand::object* value, Block& block,
const std::vector<SlotDescriptor*>& slot_descs, bool* valid);

template <bool use_string_cache>
Status _simdjson_write_data_to_column(simdjson::ondemand::value& value,
const TypeDescriptor& type_desc,
vectorized::IColumn* column_ptr,
Expand Down Expand Up @@ -291,6 +293,24 @@ class NewJsonReader : public GenericReader {
// column to default value string map
std::unordered_map<std::string, std::string> _col_default_value_map;

// From document of simdjson:
// ```
// Important: a value should be consumed once. Calling get_string() twice on the same value is an error.
// ```
// We should cache the string_views to avoid multiple get_string() calls.
struct StringViewHash {
size_t operator()(const std::string_view& str) const {
return std::hash<int64_t>()(reinterpret_cast<int64_t>(str.data()));
}
};
struct StringViewEqual {
bool operator()(const std::string_view& lhs, const std::string_view& rhs) const {
return lhs.data() == rhs.data() && lhs.size() == rhs.size();
}
};
std::unordered_map<std::string_view, std::string_view, StringViewHash, StringViewEqual>
_cached_string_values;

int32_t skip_bitmap_col_idx {-1};

//Used to indicate whether it is a stream load. When loading, only data will be inserted into columnString.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,13 @@
1000 7395.231067
2000 \N

-- !test_select2 --
22 7291 7291 7291 \N I am a long string here. I am a long string here. I am a long string here. I am another long string here 4. I am another long string here 4. I am another long string here 4. I am another long string here 4. I am another long string here 4. I am another long string here 4. I am another long string here 4. I am another long string here 4. \N
7291.703724 2000 2000 2000 \N I am a long string here. I am a long string here. I am a long string here. I am another long string here 3. I am another long string here 3. I am another long string here 3. I am another long string here 3. I am another long string here 3. I am another long string here 3. I am another long string here 3. I am another long string here 3. \N
7395.231067 1000 1000 1000 \N I am a long string here. I am a long string here. I am a long string here. I am another long string here 2. I am another long string here 2. I am another long string here 2. I am another long string here 2. I am another long string here 2. I am another long string here 2. I am another long string here 2. I am another long string here 2. \N

-- !test_select3 --
22 7291 I am a long string here. I am another long string here 4.
7291.703724 2000 I am a long string here. I am another long string here 3.
7395.231067 1000 I am a long string here. I am another long string here 2.

Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[
{
"k1": "7395.231067",
"k2": "1000",
"k3": "I am a long string here.",
"k4": "I am another long string here 2."
},
{
"k1": "7291.703724",
"k2": "2000",
"k3": "I am a long string here.",
"k4": "I am another long string here 3."
},
{
"k1": "22",
"k2": "7291",
"k3": "I am a long string here.",
"k4": "I am another long string here 4."
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ suite("test_load_json_with_jsonpath", "p0") {
"""
}

def load_array_data = {new_json_reader_flag, table_name, strip_flag, read_flag, format_flag, exprs, json_paths,
def load_array_data = { table_name, strip_flag, read_flag, format_flag, exprs, json_paths,
json_root, where_expr, fuzzy_flag, column_sep, file_name ->
// load the json data
streamLoad {
Expand Down Expand Up @@ -82,17 +82,98 @@ suite("test_load_json_with_jsonpath", "p0") {

create_test_table.call()

load_array_data.call('false', testTable, 'true', '', 'json', '', '["$.k1", "$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json')
load_array_data.call(testTable, 'true', '', 'json', '', '["$.k1", "$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json')

check_data_correct(testTable)

// test new json load, should be deleted after new_load_scan ready
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table.call()
load_array_data.call('true', testTable, 'true', '', 'json', '', '["$.k1", "$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json')
load_array_data.call(testTable, 'true', '', 'json', '', '["$.k1", "$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json')
check_data_correct(testTable)

} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}

// case2 with duplicate json paths
try {
sql "DROP TABLE IF EXISTS tbl_test_load_json_with_jsonpath2"

sql """
CREATE TABLE IF NOT EXISTS tbl_test_load_json_with_jsonpath2 (
`k1` varchar(128) NULL COMMENT "",
`k2` int NULL COMMENT "",
`k22` int NULL COMMENT "",
`k222` int NULL COMMENT "",
`k2222` int NULL COMMENT "",
`k3` STRING NULL COMMENT "",
`k33` STRING NULL COMMENT "",
`k333` STRING NULL COMMENT "",
`k3333` STRING NULL COMMENT "",
`k4` STRING NULL COMMENT "",
`k44` STRING NULL COMMENT "",
`k444` STRING NULL COMMENT "",
`k4444` STRING NULL COMMENT "",
`k44444` STRING NULL COMMENT "",
`k444444` STRING NULL COMMENT "",
`k4444444` STRING NULL COMMENT "",
`k44444444` STRING NULL COMMENT "",
) ENGINE=OLAP
DUPLICATE KEY(`k1`)
DISTRIBUTED BY HASH(`k1`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"storage_format" = "V2"
)
"""

load_array_data.call(
'tbl_test_load_json_with_jsonpath2',
'true',
'',
'json',
'',
'["$.k1", "$.k2", "$.k2", "$.k2", "$.k3", "$.k3", "$.k3","$.k3", "$.k4", "$.k4", "$.k4", "$.k4", "$.k4", "$.k4", "$.k4", "$.k4"]',
'', '', '', '', 'test_load_with_jsonpath2.json')

qt_test_select2 "select * from tbl_test_load_json_with_jsonpath2 order by k1"

} finally {

}

// case3 without json paths
try {
sql "DROP TABLE IF EXISTS tbl_test_load_json_with_jsonpath3"

sql """
CREATE TABLE IF NOT EXISTS tbl_test_load_json_with_jsonpath3 (
`k1` varchar(128) NULL COMMENT "",
`k2` int NULL COMMENT "",
`k3` STRING NULL COMMENT "",
`k4` STRING NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`k1`)
DISTRIBUTED BY HASH(`k1`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"storage_format" = "V2"
)
"""

load_array_data.call(
'tbl_test_load_json_with_jsonpath3',
'true',
'',
'json',
'',
'[]',
'', '', '', '', 'test_load_with_jsonpath2.json')

qt_test_select3 "select * from tbl_test_load_json_with_jsonpath3 order by k1"

} finally {

}
}
Loading