Skip to content

Commit f9dd819

Browse files
Chen768959Your Name
authored andcommitted
[fix](datatype)Fix unaligned memory access in read_column_from_arrow (#58002)
Fix memory alignment issues in read_column_from_arrow by using memcpy This PR addresses potential memory alignment problems when reading data from Arrow arrays in the `read_column_from_arrow` method. Instead of directly casting potentially unaligned pointers, we now use `memcpy` to safely copy data into properly aligned memory, preventing misaligned address runtime errors. Followup #55274
1 parent d51a156 commit f9dd819

10 files changed

+343
-16
lines changed

be/src/vec/data_types/serde/data_type_array_serde.cpp

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -326,11 +326,21 @@ Status DataTypeArraySerDe::read_column_from_arrow(IColumn& column, const arrow::
326326
auto arrow_offsets_array = concrete_array->offsets();
327327
auto* arrow_offsets = dynamic_cast<arrow::Int32Array*>(arrow_offsets_array.get());
328328
auto prev_size = offsets_data.back();
329-
auto arrow_nested_start_offset = arrow_offsets->Value(start);
330-
auto arrow_nested_end_offset = arrow_offsets->Value(end);
329+
const auto* base_offsets_ptr = reinterpret_cast<const uint8_t*>(arrow_offsets->raw_values());
330+
const size_t offset_element_size = sizeof(int32_t);
331+
int32_t arrow_nested_start_offset = 0;
332+
int32_t arrow_nested_end_offset = 0;
333+
const uint8_t* start_offset_ptr = base_offsets_ptr + start * offset_element_size;
334+
const uint8_t* end_offset_ptr = base_offsets_ptr + end * offset_element_size;
335+
memcpy(&arrow_nested_start_offset, start_offset_ptr, offset_element_size);
336+
memcpy(&arrow_nested_end_offset, end_offset_ptr, offset_element_size);
337+
331338
for (auto i = start + 1; i < end + 1; ++i) {
339+
int32_t current_offset = 0;
340+
const uint8_t* current_offset_ptr = base_offsets_ptr + i * offset_element_size;
341+
memcpy(&current_offset, current_offset_ptr, offset_element_size);
332342
// convert to doris offset, start from offsets.back()
333-
offsets_data.emplace_back(prev_size + arrow_offsets->Value(i) - arrow_nested_start_offset);
343+
offsets_data.emplace_back(prev_size + current_offset - arrow_nested_start_offset);
334344
}
335345
return nested_serde->read_column_from_arrow(
336346
column_array.get_data(), concrete_array->values().get(), arrow_nested_start_offset,

be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -397,8 +397,13 @@ Status DataTypeDateTimeV2SerDe::read_column_from_arrow(IColumn& column,
397397
type->unit());
398398
}
399399
}
400+
const auto* base_ptr = reinterpret_cast<const uint8_t*>(concrete_array->raw_values());
401+
const size_t element_size = sizeof(int64_t);
400402
for (auto value_i = start; value_i < end; ++value_i) {
401-
auto utc_epoch = static_cast<UInt64>(concrete_array->Value(value_i));
403+
int64_t date_value = 0;
404+
const uint8_t* raw_byte_ptr = base_ptr + value_i * element_size;
405+
memcpy(&date_value, raw_byte_ptr, element_size);
406+
auto utc_epoch = static_cast<UInt64>(date_value);
402407

403408
DateV2Value<DateTimeV2ValueType> v;
404409
// convert second

be/src/vec/data_types/serde/data_type_datev2_serde.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,15 @@ Status DataTypeDateV2SerDe::read_column_from_arrow(IColumn& column, const arrow:
114114
const cctz::time_zone& ctz) const {
115115
auto& col_data = static_cast<ColumnDateV2&>(column).get_data();
116116
const auto* concrete_array = dynamic_cast<const arrow::Date32Array*>(arrow_array);
117+
const auto* base_ptr = reinterpret_cast<const uint8_t*>(concrete_array->raw_values());
118+
const size_t element_size = sizeof(int32_t);
117119
for (auto value_i = start; value_i < end; ++value_i) {
120+
int32_t date_value = 0;
121+
const uint8_t* raw_byte_ptr = base_ptr + value_i * element_size;
122+
memcpy(&date_value, raw_byte_ptr, element_size);
123+
118124
DateV2Value<DateV2ValueType> v;
119-
v.get_date_from_daynr(concrete_array->Value(value_i) + date_threshold);
125+
v.get_date_from_daynr(date_value + date_threshold);
120126
col_data.emplace_back(binary_cast<DateV2Value<DateV2ValueType>, UInt32>(v));
121127
}
122128
return Status::OK();

be/src/vec/data_types/serde/data_type_decimal_serde.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -337,8 +337,10 @@ Status DataTypeDecimalSerDe<T>::read_column_from_arrow(IColumn& column,
337337
} else if constexpr (T == TYPE_DECIMAL32 || T == TYPE_DECIMAL64 || T == TYPE_DECIMAL128I) {
338338
const auto* concrete_array = dynamic_cast<const arrow::DecimalArray*>(arrow_array);
339339
for (auto value_i = start; value_i < end; ++value_i) {
340-
column_data.emplace_back(
341-
*reinterpret_cast<const FieldType*>(concrete_array->Value(value_i)));
340+
const auto* value = concrete_array->Value(value_i);
341+
FieldType decimal_value = FieldType {};
342+
memcpy(&decimal_value, value, sizeof(FieldType));
343+
column_data.emplace_back(decimal_value);
342344
}
343345
} else if constexpr (T == TYPE_DECIMAL256) {
344346
const auto* concrete_array = dynamic_cast<const arrow::Decimal256Array*>(arrow_array);

be/src/vec/data_types/serde/data_type_number_serde.cpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,10 +219,17 @@ Status DataTypeNumberSerDe<T>::read_column_from_arrow(IColumn& column,
219219
const auto* concrete_array = dynamic_cast<const arrow::StringArray*>(arrow_array);
220220
std::shared_ptr<arrow::Buffer> buffer = concrete_array->value_data();
221221

222+
const auto* offsets_data = concrete_array->value_offsets()->data();
223+
const size_t offset_size = sizeof(int32_t);
222224
for (size_t offset_i = start; offset_i < end; ++offset_i) {
223225
if (!concrete_array->IsNull(offset_i)) {
224-
const auto* raw_data = buffer->data() + concrete_array->value_offset(offset_i);
225-
const auto raw_data_len = concrete_array->value_length(offset_i);
226+
int32_t start_offset = 0;
227+
int32_t end_offset = 0;
228+
memcpy(&start_offset, offsets_data + offset_i * offset_size, offset_size);
229+
memcpy(&end_offset, offsets_data + (offset_i + 1) * offset_size, offset_size);
230+
231+
const auto* raw_data = buffer->data() + start_offset;
232+
const auto raw_data_len = end_offset - start_offset;
226233

227234
if (raw_data_len == 0) {
228235
col_data.emplace_back(Int128()); // Int128() is NULL

be/src/vec/data_types/serde/data_type_string_serde.cpp

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -262,12 +262,21 @@ Status DataTypeStringSerDeBase<ColumnType>::read_column_from_arrow(
262262
arrow_array->type_id() == arrow::Type::BINARY) {
263263
const auto* concrete_array = dynamic_cast<const arrow::BinaryArray*>(arrow_array);
264264
std::shared_ptr<arrow::Buffer> buffer = concrete_array->value_data();
265+
const uint8_t* offsets_data = concrete_array->value_offsets()->data();
266+
const size_t offset_size = sizeof(int32_t);
265267

266268
for (auto offset_i = start; offset_i < end; ++offset_i) {
267269
if (!concrete_array->IsNull(offset_i)) {
268-
const auto* raw_data = buffer->data() + concrete_array->value_offset(offset_i);
270+
int32_t start_offset = 0;
271+
int32_t end_offset = 0;
272+
memcpy(&start_offset, offsets_data + offset_i * offset_size, offset_size);
273+
memcpy(&end_offset, offsets_data + (offset_i + 1) * offset_size, offset_size);
274+
275+
int32_t length = end_offset - start_offset;
276+
const auto* raw_data = buffer->data() + start_offset;
277+
269278
assert_cast<ColumnType&>(column).insert_data(
270-
(char*)raw_data, concrete_array->value_length(offset_i));
279+
reinterpret_cast<const char*>(raw_data), length);
271280
} else {
272281
assert_cast<ColumnType&>(column).insert_default();
273282
}
@@ -450,4 +459,4 @@ template class DataTypeStringSerDeBase<ColumnString>;
450459
template class DataTypeStringSerDeBase<ColumnString64>;
451460
template class DataTypeStringSerDeBase<ColumnFixedLengthObject>;
452461

453-
} // namespace doris::vectorized
462+
} // namespace doris::vectorized

be/test/vec/data_types/serde/data_type_serde_datetime_v2_test.cpp

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
#include <arrow/api.h>
19+
#include <cctz/time_zone.h>
1820
#include <gtest/gtest-message.h>
1921
#include <gtest/gtest-test-part.h>
2022
#include <gtest/gtest.h>
@@ -221,4 +223,66 @@ TEST_F(DataTypeDateTimeV2SerDeTest, serdes) {
221223
test_func(*serde_time_v2_0, column_time_v2_0);
222224
}
223225

224-
} // namespace doris::vectorized
226+
// Run with UBSan enabled to catch misalignment errors.
227+
TEST_F(DataTypeDateTimeV2SerDeTest, ArrowMemNotAlignedDate) {
228+
// 1.Prepare the data.
229+
std::vector<int32_t> dates = {0, 365, 1000, 5000, 10000};
230+
const int64_t num_elements = dates.size();
231+
const int64_t element_size = sizeof(int32_t);
232+
233+
// 2.Create an unaligned memory buffer.
234+
std::vector<uint8_t> data_storage(num_elements * element_size + 10);
235+
uint8_t* unaligned_data = data_storage.data() + 1;
236+
237+
// 3.Copy data to unaligned memory
238+
for (size_t i = 0; i < dates.size(); ++i) {
239+
memcpy(unaligned_data + i * element_size, &dates[i], element_size);
240+
}
241+
242+
// 4. Create Arrow array with unaligned memory
243+
auto unaligned_buffer = arrow::Buffer::Wrap(unaligned_data, num_elements * element_size);
244+
auto arr = std::make_shared<arrow::Date32Array>(num_elements, unaligned_buffer);
245+
const auto* raw_values_ptr = arr->raw_values();
246+
uintptr_t address = reinterpret_cast<uintptr_t>(raw_values_ptr);
247+
EXPECT_EQ(address % 4, 1);
248+
249+
// 5.Test read_column_from_arrow
250+
cctz::time_zone tz;
251+
auto st = serde_date_v2->read_column_from_arrow(*column_date_v2, arr.get(), 0, 1, tz);
252+
EXPECT_TRUE(st.ok());
253+
}
254+
255+
// Run with UBSan enabled to catch misalignment errors.
256+
TEST_F(DataTypeDateTimeV2SerDeTest, ArrowMemNotAlignedDateTime) {
257+
// 1.Prepare the data.
258+
std::vector<int64_t> timestamps = {0, 86400000000, 31536000000000, 100000000000, 500000000000};
259+
const int64_t num_elements = timestamps.size();
260+
const int64_t element_size = sizeof(int64_t);
261+
262+
// 2.Create an unaligned memory buffer.
263+
std::vector<uint8_t> data_storage(num_elements * element_size + 10);
264+
uint8_t* unaligned_data = data_storage.data() + 1;
265+
266+
// 3. Copy data to unaligned memory
267+
for (size_t i = 0; i < timestamps.size(); ++i) {
268+
memcpy(unaligned_data + i * element_size, &timestamps[i], element_size);
269+
}
270+
271+
// 4. Create Arrow array with unaligned memory
272+
auto unaligned_buffer = arrow::Buffer::Wrap(unaligned_data, num_elements * element_size);
273+
auto timestamp_type = arrow::timestamp(arrow::TimeUnit::MICRO);
274+
auto arr =
275+
std::make_shared<arrow::TimestampArray>(timestamp_type, num_elements, unaligned_buffer);
276+
277+
const auto* raw_values_ptr = arr->raw_values();
278+
uintptr_t address = reinterpret_cast<uintptr_t>(raw_values_ptr);
279+
EXPECT_EQ(address % 4, 1);
280+
281+
// 5.Test read_column_from_arrow
282+
cctz::time_zone tz;
283+
auto st =
284+
serde_datetime_v2_6->read_column_from_arrow(*column_datetime_v2_6, arr.get(), 0, 1, tz);
285+
EXPECT_TRUE(st.ok());
286+
}
287+
288+
} // namespace doris::vectorized

be/test/vec/data_types/serde/data_type_serde_decimal_test.cpp

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
#include <arrow/api.h>
19+
#include <cctz/time_zone.h>
1820
#include <gtest/gtest-message.h>
1921
#include <gtest/gtest-test-part.h>
2022
#include <gtest/gtest.h>
@@ -291,4 +293,48 @@ TEST_F(DataTypeDecimalSerDeTest, serdes) {
291293

292294
test_func(*serde_decimal128v2, column_decimal128_v2);
293295
}
294-
} // namespace doris::vectorized
296+
297+
// Run with UBSan enabled to catch misalignment errors.
298+
TEST_F(DataTypeDecimalSerDeTest, ArrowMemNotAligned) {
299+
// 1.Prepare the data.
300+
arrow::Decimal128Builder builder(arrow::decimal(38, 30));
301+
std::vector<std::string> decimal_strings = {"12345.67", "89.10", "1112.13", "1415.16",
302+
"1718.19"};
303+
304+
for (const auto& str : decimal_strings) {
305+
EXPECT_TRUE(builder.Append(arrow::Decimal128(str)).ok());
306+
}
307+
308+
std::shared_ptr<arrow::Array> aligned_array;
309+
EXPECT_TRUE(builder.Finish(&aligned_array).ok());
310+
auto decimal_array = std::static_pointer_cast<arrow::DecimalArray>(aligned_array);
311+
312+
// 2.Create an unaligned memory buffer.
313+
const int64_t num_elements = decimal_array->length();
314+
const int64_t element_size = decimal_array->byte_width();
315+
316+
std::vector<uint8_t> data_storage(num_elements * element_size + 10);
317+
uint8_t* unaligned_data = data_storage.data() + 1;
318+
319+
// 3. Copy data to unaligned memory
320+
const uint8_t* original_data = decimal_array->raw_values();
321+
memcpy(unaligned_data, original_data, num_elements * element_size);
322+
323+
// 4. Create Arrow array with unaligned memory
324+
auto unaligned_buffer = arrow::Buffer::Wrap(unaligned_data, num_elements * element_size);
325+
326+
auto arr = std::make_shared<arrow::DecimalArray>(arrow::decimal(38, 30), num_elements,
327+
unaligned_buffer);
328+
329+
const auto* raw_values_ptr = arr->raw_values();
330+
uintptr_t address = reinterpret_cast<uintptr_t>(raw_values_ptr);
331+
EXPECT_EQ(address % 4, 1);
332+
333+
// 5.Test read_column_from_arrow
334+
cctz::time_zone tz;
335+
auto st = serde_decimal128v3_2->read_column_from_arrow(*column_decimal128v3_2, arr.get(), 0, 1,
336+
tz);
337+
EXPECT_TRUE(st.ok());
338+
}
339+
340+
} // namespace doris::vectorized

be/test/vec/data_types/serde/data_type_serde_number_test.cpp

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
#include <arrow/api.h>
19+
#include <cctz/time_zone.h>
1820
#include <gtest/gtest-message.h>
1921
#include <gtest/gtest-test-part.h>
2022
#include <gtest/gtest.h>
@@ -291,4 +293,53 @@ TEST_F(DataTypeNumberSerDeTest, serdes) {
291293
test_func(*serde_int128, column_int128);
292294
test_func(*serde_uint8, column_uint8);
293295
}
294-
} // namespace doris::vectorized
296+
297+
// Run with UBSan enabled to catch misalignment errors.
298+
TEST_F(DataTypeNumberSerDeTest, ArrowMemNotAligned) {
299+
// 1.Prepare the data.
300+
std::vector<std::string> strings = {"9223372036854775807", "9223372036854775806",
301+
"9223372036854775805", "9223372036854775804",
302+
"9223372036854775803"};
303+
304+
int32_t total_length = 0;
305+
std::vector<int32_t> offsets = {0};
306+
for (const auto& str : strings) {
307+
total_length += static_cast<int32_t>(str.length());
308+
offsets.push_back(total_length);
309+
}
310+
311+
// 2.Create an unaligned memory buffer.
312+
std::vector<uint8_t> value_storage(total_length + 10);
313+
std::vector<uint8_t> offset_storage((strings.size() + 1) * sizeof(int32_t) + 10);
314+
315+
uint8_t* unaligned_value_data = value_storage.data() + 1;
316+
uint8_t* unaligned_offset_data = offset_storage.data() + 1;
317+
318+
// 3. Copy data to unaligned memory
319+
int32_t current_pos = 0;
320+
for (size_t i = 0; i < strings.size(); ++i) {
321+
memcpy(unaligned_value_data + current_pos, strings[i].data(), strings[i].length());
322+
current_pos += strings[i].length();
323+
}
324+
325+
for (size_t i = 0; i < offsets.size(); ++i) {
326+
memcpy(unaligned_offset_data + i * sizeof(int32_t), &offsets[i], sizeof(int32_t));
327+
}
328+
329+
// 4. Create Arrow array with unaligned memory
330+
auto value_buffer = arrow::Buffer::Wrap(unaligned_value_data, total_length);
331+
auto offset_buffer =
332+
arrow::Buffer::Wrap(unaligned_offset_data, offsets.size() * sizeof(int32_t));
333+
auto arr = std::make_shared<arrow::StringArray>(strings.size(), offset_buffer, value_buffer);
334+
335+
const auto* offsets_ptr = arr->raw_value_offsets();
336+
uintptr_t address = reinterpret_cast<uintptr_t>(offsets_ptr);
337+
EXPECT_EQ((reinterpret_cast<uintptr_t>(address) % 4), 1);
338+
339+
// 5.Test read_column_from_arrow
340+
cctz::time_zone tz;
341+
auto st = serde_int128->read_column_from_arrow(*column_int128, arr.get(), 0, 1, tz);
342+
EXPECT_TRUE(st.ok());
343+
}
344+
345+
} // namespace doris::vectorized

0 commit comments

Comments
 (0)