Skip to content

Commit d149e20

Browse files
Chen768959morningman
authored andcommitted
[fix](datatype)Fix for unaligned memory in arrow MapArray parsing. (#58248)
1. Fix for unaligned memory in arrow MapArray parsing. 2. arrow Map unaligned memory ut. 3. arrow Struct unaligned memory ut. Followup #55274
1 parent 893d725 commit d149e20

File tree

3 files changed

+357
-3
lines changed

3 files changed

+357
-3
lines changed

be/src/vec/data_types/serde/data_type_map_serde.cpp

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -394,11 +394,21 @@ Status DataTypeMapSerDe::read_column_from_arrow(IColumn& column, const arrow::Ar
394394
auto arrow_offsets_array = concrete_map->offsets();
395395
auto* arrow_offsets = dynamic_cast<arrow::Int32Array*>(arrow_offsets_array.get());
396396
auto prev_size = offsets_data.back();
397-
auto arrow_nested_start_offset = arrow_offsets->Value(start);
398-
auto arrow_nested_end_offset = arrow_offsets->Value(end);
397+
398+
const auto* base_offsets_ptr = reinterpret_cast<const uint8_t*>(arrow_offsets->raw_values());
399+
const size_t offset_element_size = sizeof(int32_t);
400+
int32_t arrow_nested_start_offset = 0;
401+
int32_t arrow_nested_end_offset = 0;
402+
const uint8_t* start_offset_ptr = base_offsets_ptr + start * offset_element_size;
403+
const uint8_t* end_offset_ptr = base_offsets_ptr + end * offset_element_size;
404+
memcpy(&arrow_nested_start_offset, start_offset_ptr, offset_element_size);
405+
memcpy(&arrow_nested_end_offset, end_offset_ptr, offset_element_size);
399406
for (int64_t i = start + 1; i < end + 1; ++i) {
407+
int32_t current_offset = 0;
408+
const uint8_t* current_offset_ptr = base_offsets_ptr + i * offset_element_size;
409+
memcpy(&current_offset, current_offset_ptr, offset_element_size);
400410
// convert to doris offset, start from offsets.back()
401-
offsets_data.emplace_back(prev_size + arrow_offsets->Value(i) - arrow_nested_start_offset);
411+
offsets_data.emplace_back(prev_size + current_offset - arrow_nested_start_offset);
402412
}
403413
RETURN_IF_ERROR(key_serde->read_column_from_arrow(
404414
column_map.get_keys(), concrete_map->keys().get(), arrow_nested_start_offset,
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include <arrow/api.h>
19+
#include <cctz/time_zone.h>
20+
#include <gtest/gtest-message.h>
21+
#include <gtest/gtest-test-part.h>
22+
#include <gtest/gtest.h>
23+
#include <lz4/lz4.h>
24+
#include <streamvbyte.h>
25+
26+
#include <cstddef>
27+
#include <iostream>
28+
#include <limits>
29+
#include <type_traits>
30+
31+
#include "agent/be_exec_version_manager.h"
32+
#include "olap/olap_common.h"
33+
#include "runtime/define_primitive_type.h"
34+
#include "runtime/types.h"
35+
#include "testutil/test_util.h"
36+
#include "vec/columns/column.h"
37+
#include "vec/columns/column_map.h"
38+
#include "vec/common/assert_cast.h"
39+
#include "vec/core/field.h"
40+
#include "vec/core/types.h"
41+
#include "vec/data_types/common_data_type_serder_test.h"
42+
#include "vec/data_types/common_data_type_test.h"
43+
#include "vec/data_types/data_type.h"
44+
#include "vec/data_types/data_type_array.h"
45+
#include "vec/data_types/data_type_factory.hpp"
46+
#include "vec/data_types/data_type_map.h"
47+
#include "vec/data_types/data_type_nullable.h"
48+
#include "vec/data_types/data_type_string.h"
49+
50+
namespace doris::vectorized {
51+
static auto serde_str_key = std::make_shared<DataTypeStringSerDe>();
52+
static auto serde_str_value = std::make_shared<DataTypeStringSerDe>();
53+
54+
class DataTypeMapSerDeTest : public ::testing::Test {
55+
protected:
56+
static void SetUpTestSuite() {}
57+
};
58+
59+
// Run with UBSan enabled to catch misalignment errors.
60+
TEST_F(DataTypeMapSerDeTest, ArrowMemNotAligned) {
61+
// 1.Prepare the data.
62+
std::vector<std::string> key_data = {"key1", "key2", "key3", "key4", "key5", "key6"};
63+
std::vector<std::string> value_data = {"val1", "val2", "val3", "val4", "val5", "val6"};
64+
65+
std::vector<int32_t> key_offsets = {0};
66+
std::vector<int32_t> value_offsets = {0};
67+
68+
int32_t current_key_offset = 0;
69+
for (const auto& key : key_data) {
70+
current_key_offset += static_cast<int32_t>(key.length());
71+
key_offsets.push_back(current_key_offset);
72+
}
73+
74+
int32_t current_value_offset = 0;
75+
for (const auto& value : value_data) {
76+
current_value_offset += static_cast<int32_t>(value.length());
77+
value_offsets.push_back(current_value_offset);
78+
}
79+
80+
std::vector<int32_t> map_offsets = {0, 2, 3, 6, 6};
81+
std::vector<int8_t> validity_bitmap = {0x0B};
82+
83+
std::vector<uint8_t> key_value_data;
84+
for (const auto& key : key_data) {
85+
key_value_data.insert(key_value_data.end(), key.begin(), key.end());
86+
}
87+
88+
std::vector<uint8_t> value_value_data;
89+
for (const auto& value : value_data) {
90+
value_value_data.insert(value_value_data.end(), value.begin(), value.end());
91+
}
92+
93+
const int64_t num_maps = map_offsets.size() - 1;
94+
const int64_t offset_element_size = sizeof(int32_t);
95+
96+
// 2.Create an unaligned memory buffer.
97+
std::vector<uint8_t> map_offset_storage(map_offsets.size() * offset_element_size + 10);
98+
uint8_t* unaligned_map_offsets = map_offset_storage.data() + 1;
99+
100+
std::vector<uint8_t> key_offset_storage(key_offsets.size() * offset_element_size + 10);
101+
uint8_t* unaligned_key_offsets = key_offset_storage.data() + 1;
102+
103+
std::vector<uint8_t> value_offset_storage(value_offsets.size() * offset_element_size + 10);
104+
uint8_t* unaligned_value_offsets = value_offset_storage.data() + 1;
105+
106+
std::vector<uint8_t> key_value_storage(key_value_data.size() + 10);
107+
uint8_t* unaligned_key_values = key_value_storage.data() + 1;
108+
109+
std::vector<uint8_t> value_value_storage(value_value_data.size() + 10);
110+
uint8_t* unaligned_value_values = value_value_storage.data() + 1;
111+
112+
std::vector<uint8_t> validity_storage(validity_bitmap.size() + 10);
113+
uint8_t* unaligned_validity = validity_storage.data() + 1;
114+
115+
// 3. Copy data to unaligned memory
116+
for (size_t i = 0; i < map_offsets.size(); ++i) {
117+
memcpy(unaligned_map_offsets + i * offset_element_size, &map_offsets[i],
118+
offset_element_size);
119+
}
120+
121+
for (size_t i = 0; i < key_offsets.size(); ++i) {
122+
memcpy(unaligned_key_offsets + i * offset_element_size, &key_offsets[i],
123+
offset_element_size);
124+
}
125+
126+
for (size_t i = 0; i < value_offsets.size(); ++i) {
127+
memcpy(unaligned_value_offsets + i * offset_element_size, &value_offsets[i],
128+
offset_element_size);
129+
}
130+
131+
memcpy(unaligned_key_values, key_value_data.data(), key_value_data.size());
132+
memcpy(unaligned_value_values, value_value_data.data(), value_value_data.size());
133+
memcpy(unaligned_validity, validity_bitmap.data(), validity_bitmap.size());
134+
135+
// 4. Create Arrow array with unaligned memory
136+
auto key_value_buffer = arrow::Buffer::Wrap(unaligned_key_values, key_value_data.size());
137+
auto key_offsets_buffer =
138+
arrow::Buffer::Wrap(unaligned_key_offsets, key_offsets.size() * sizeof(int32_t));
139+
auto key_array = std::make_shared<arrow::StringArray>(key_offsets.size() - 1,
140+
key_offsets_buffer, key_value_buffer);
141+
142+
auto value_value_buffer = arrow::Buffer::Wrap(unaligned_value_values, value_value_data.size());
143+
auto value_offsets_buffer =
144+
arrow::Buffer::Wrap(unaligned_value_offsets, value_offsets.size() * sizeof(int32_t));
145+
auto value_array = std::make_shared<arrow::StringArray>(
146+
value_offsets.size() - 1, value_offsets_buffer, value_value_buffer);
147+
148+
auto map_offsets_buffer =
149+
arrow::Buffer::Wrap(unaligned_map_offsets, map_offsets.size() * offset_element_size);
150+
auto validity_buffer = arrow::Buffer::Wrap(unaligned_validity, validity_bitmap.size());
151+
152+
auto map_type = arrow::map(arrow::utf8(), arrow::utf8());
153+
154+
auto arr = std::make_shared<arrow::MapArray>(map_type, num_maps, map_offsets_buffer, key_array,
155+
value_array, validity_buffer);
156+
157+
const auto* concrete_array = dynamic_cast<const arrow::MapArray*>(arr.get());
158+
auto arrow_offsets_array = concrete_array->offsets();
159+
auto* arrow_offsets = dynamic_cast<arrow::Int32Array*>(arrow_offsets_array.get());
160+
161+
const auto* offsets_ptr = arrow_offsets->raw_values();
162+
uintptr_t offsets_address = reinterpret_cast<uintptr_t>(offsets_ptr);
163+
EXPECT_EQ(offsets_address % 4, 1);
164+
165+
const auto* keys_ptr = key_array->value_data()->data();
166+
uintptr_t keys_address = reinterpret_cast<uintptr_t>(keys_ptr);
167+
EXPECT_EQ(keys_address % 4, 1);
168+
169+
const auto* values_ptr = value_array->value_data()->data();
170+
uintptr_t values_address = reinterpret_cast<uintptr_t>(values_ptr);
171+
EXPECT_EQ(values_address % 4, 1);
172+
173+
// 5.Test read_column_from_arrow
174+
auto ser_col = ColumnMap::create(ColumnString::create(), ColumnString::create(),
175+
ColumnOffset64::create());
176+
cctz::time_zone tz;
177+
auto serde_map = std::make_shared<DataTypeMapSerDe>(serde_str_key, serde_str_value);
178+
auto st = serde_map->read_column_from_arrow(*ser_col, arr.get(), 0, 1, tz);
179+
EXPECT_TRUE(st.ok());
180+
}
181+
182+
} // namespace doris::vectorized
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include <arrow/api.h>
19+
#include <cctz/time_zone.h>
20+
#include <gtest/gtest-message.h>
21+
#include <gtest/gtest-test-part.h>
22+
#include <gtest/gtest.h>
23+
#include <lz4/lz4.h>
24+
#include <streamvbyte.h>
25+
26+
#include <cstddef>
27+
#include <iostream>
28+
#include <limits>
29+
#include <type_traits>
30+
31+
#include "agent/be_exec_version_manager.h"
32+
#include "olap/olap_common.h"
33+
#include "runtime/define_primitive_type.h"
34+
#include "runtime/types.h"
35+
#include "testutil/test_util.h"
36+
#include "vec/columns/column.h"
37+
#include "vec/columns/column_struct.h"
38+
#include "vec/common/assert_cast.h"
39+
#include "vec/core/field.h"
40+
#include "vec/core/types.h"
41+
#include "vec/data_types/common_data_type_serder_test.h"
42+
#include "vec/data_types/common_data_type_test.h"
43+
#include "vec/data_types/data_type.h"
44+
#include "vec/data_types/data_type_array.h"
45+
#include "vec/data_types/data_type_factory.hpp"
46+
#include "vec/data_types/data_type_nullable.h"
47+
#include "vec/data_types/data_type_number.h"
48+
#include "vec/data_types/data_type_string.h"
49+
#include "vec/data_types/data_type_struct.h"
50+
51+
namespace doris::vectorized {
52+
static auto serde_int32 = std::make_shared<DataTypeNumberSerDe<TYPE_INT>>();
53+
static auto serde_str = std::make_shared<DataTypeStringSerDe>();
54+
55+
class DataTypeStructSerDeTest : public ::testing::Test {
56+
protected:
57+
static void SetUpTestSuite() {}
58+
};
59+
60+
// Run with UBSan enabled to catch misalignment errors.
61+
TEST_F(DataTypeStructSerDeTest, ArrowMemNotAligned) {
62+
// 1.Prepare the data.
63+
std::vector<int32_t> int_data = {1, 2, 3, 4, 5, 6};
64+
std::vector<std::string> string_data = {"hello", "world", "test", "data", "arrow", "struct"};
65+
66+
std::vector<int32_t> string_offsets = {0};
67+
int32_t current_string_offset = 0;
68+
for (const auto& str : string_data) {
69+
current_string_offset += static_cast<int32_t>(str.length());
70+
string_offsets.push_back(current_string_offset);
71+
}
72+
73+
std::vector<uint8_t> string_value_data;
74+
for (const auto& str : string_data) {
75+
string_value_data.insert(string_value_data.end(), str.begin(), str.end());
76+
}
77+
78+
std::vector<int8_t> validity_bitmap = {0x3F};
79+
80+
const int64_t num_elements = int_data.size();
81+
const int64_t int_element_size = sizeof(int32_t);
82+
const int64_t offset_element_size = sizeof(int32_t);
83+
84+
// 2.Create an unaligned memory buffer.
85+
std::vector<uint8_t> int_storage(int_data.size() * int_element_size + 10);
86+
uint8_t* unaligned_ints = int_storage.data() + 1;
87+
88+
std::vector<uint8_t> string_offset_storage(string_offsets.size() * offset_element_size + 10);
89+
uint8_t* unaligned_string_offsets = string_offset_storage.data() + 1;
90+
91+
std::vector<uint8_t> string_value_storage(string_value_data.size() + 10);
92+
uint8_t* unaligned_string_values = string_value_storage.data() + 1;
93+
94+
std::vector<uint8_t> validity_storage(validity_bitmap.size() + 10);
95+
uint8_t* unaligned_validity = validity_storage.data() + 1;
96+
97+
// 3. Copy data to unaligned memory
98+
for (size_t i = 0; i < int_data.size(); ++i) {
99+
memcpy(unaligned_ints + i * int_element_size, &int_data[i], int_element_size);
100+
}
101+
102+
for (size_t i = 0; i < string_offsets.size(); ++i) {
103+
memcpy(unaligned_string_offsets + i * offset_element_size, &string_offsets[i],
104+
offset_element_size);
105+
}
106+
107+
memcpy(unaligned_string_values, string_value_data.data(), string_value_data.size());
108+
memcpy(unaligned_validity, validity_bitmap.data(), validity_bitmap.size());
109+
110+
// 4. Create Arrow array with unaligned memory
111+
auto int_buffer = arrow::Buffer::Wrap(unaligned_ints, int_data.size() * int_element_size);
112+
auto int_array = std::make_shared<arrow::Int32Array>(num_elements, int_buffer, nullptr, 0);
113+
114+
auto string_value_buffer =
115+
arrow::Buffer::Wrap(unaligned_string_values, string_value_data.size());
116+
auto string_offsets_buffer = arrow::Buffer::Wrap(unaligned_string_offsets,
117+
string_offsets.size() * offset_element_size);
118+
auto string_array = std::make_shared<arrow::StringArray>(num_elements, string_offsets_buffer,
119+
string_value_buffer, nullptr, 0);
120+
121+
auto validity_buffer = arrow::Buffer::Wrap(unaligned_validity, validity_bitmap.size());
122+
123+
auto field_int = arrow::field("int_field", arrow::int32());
124+
auto field_string = arrow::field("string_field", arrow::utf8());
125+
126+
auto struct_type = arrow::struct_({field_int, field_string});
127+
128+
arrow::ArrayVector field_arrays = {int_array, string_array};
129+
130+
auto arr = std::make_shared<arrow::StructArray>(struct_type, num_elements, field_arrays,
131+
validity_buffer);
132+
133+
const auto* concrete_array = dynamic_cast<const arrow::StructArray*>(arr.get());
134+
135+
const auto* int_field_array =
136+
dynamic_cast<const arrow::Int32Array*>(concrete_array->field(0).get());
137+
const auto* ints_ptr = int_field_array->raw_values();
138+
uintptr_t ints_address = reinterpret_cast<uintptr_t>(ints_ptr);
139+
EXPECT_EQ(ints_address % 4, 1);
140+
141+
const auto* string_field_array =
142+
dynamic_cast<const arrow::StringArray*>(concrete_array->field(1).get());
143+
const auto* string_values_ptr = string_field_array->value_data()->data();
144+
uintptr_t string_values_address = reinterpret_cast<uintptr_t>(string_values_ptr);
145+
EXPECT_EQ(string_values_address % 4, 1);
146+
147+
// 5.Test read_column_from_arrow
148+
std::vector<ColumnPtr> vector_columns;
149+
vector_columns.emplace_back(ColumnInt32::create());
150+
vector_columns.emplace_back(ColumnString::create());
151+
auto ser_col = ColumnStruct::create(vector_columns);
152+
cctz::time_zone tz;
153+
DataTypeSerDeSPtrs elem_serdes = {serde_int32, serde_str};
154+
Strings field_names = {"int_field", "string_field"};
155+
156+
auto serde_struct = std::make_shared<DataTypeStructSerDe>(elem_serdes, field_names);
157+
158+
auto st = serde_struct->read_column_from_arrow(*ser_col, arr.get(), 0, num_elements, tz);
159+
EXPECT_TRUE(st.ok());
160+
}
161+
162+
} // namespace doris::vectorized

0 commit comments

Comments
 (0)