diff --git a/be/src/vec/data_types/serde/data_type_array_serde.cpp b/be/src/vec/data_types/serde/data_type_array_serde.cpp index 80082da89efd70..431fc153a4d814 100644 --- a/be/src/vec/data_types/serde/data_type_array_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_array_serde.cpp @@ -326,11 +326,21 @@ Status DataTypeArraySerDe::read_column_from_arrow(IColumn& column, const arrow:: auto arrow_offsets_array = concrete_array->offsets(); auto* arrow_offsets = dynamic_cast(arrow_offsets_array.get()); auto prev_size = offsets_data.back(); - auto arrow_nested_start_offset = arrow_offsets->Value(start); - auto arrow_nested_end_offset = arrow_offsets->Value(end); + const auto* base_offsets_ptr = reinterpret_cast(arrow_offsets->raw_values()); + const size_t offset_element_size = sizeof(int32_t); + int32_t arrow_nested_start_offset = 0; + int32_t arrow_nested_end_offset = 0; + const uint8_t* start_offset_ptr = base_offsets_ptr + start * offset_element_size; + const uint8_t* end_offset_ptr = base_offsets_ptr + end * offset_element_size; + memcpy(&arrow_nested_start_offset, start_offset_ptr, offset_element_size); + memcpy(&arrow_nested_end_offset, end_offset_ptr, offset_element_size); + for (auto i = start + 1; i < end + 1; ++i) { + int32_t current_offset = 0; + const uint8_t* current_offset_ptr = base_offsets_ptr + i * offset_element_size; + memcpy(¤t_offset, current_offset_ptr, offset_element_size); // convert to doris offset, start from offsets.back() - offsets_data.emplace_back(prev_size + arrow_offsets->Value(i) - arrow_nested_start_offset); + offsets_data.emplace_back(prev_size + current_offset - arrow_nested_start_offset); } return nested_serde->read_column_from_arrow( column_array.get_data(), concrete_array->values().get(), arrow_nested_start_offset, diff --git a/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp index 6f467d830763e9..f44c5d38c408f7 100644 --- a/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp @@ -397,8 +397,13 @@ Status DataTypeDateTimeV2SerDe::read_column_from_arrow(IColumn& column, type->unit()); } } + const auto* base_ptr = reinterpret_cast(concrete_array->raw_values()); + const size_t element_size = sizeof(int64_t); for (auto value_i = start; value_i < end; ++value_i) { - auto utc_epoch = static_cast(concrete_array->Value(value_i)); + int64_t date_value = 0; + const uint8_t* raw_byte_ptr = base_ptr + value_i * element_size; + memcpy(&date_value, raw_byte_ptr, element_size); + auto utc_epoch = static_cast(date_value); DateV2Value v; // convert second diff --git a/be/src/vec/data_types/serde/data_type_datev2_serde.cpp b/be/src/vec/data_types/serde/data_type_datev2_serde.cpp index d6e77bc9b79880..620ac6360739cf 100644 --- a/be/src/vec/data_types/serde/data_type_datev2_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_datev2_serde.cpp @@ -114,9 +114,15 @@ Status DataTypeDateV2SerDe::read_column_from_arrow(IColumn& column, const arrow: const cctz::time_zone& ctz) const { auto& col_data = static_cast(column).get_data(); const auto* concrete_array = dynamic_cast(arrow_array); + const auto* base_ptr = reinterpret_cast(concrete_array->raw_values()); + const size_t element_size = sizeof(int32_t); for (auto value_i = start; value_i < end; ++value_i) { + int32_t date_value = 0; + const uint8_t* raw_byte_ptr = base_ptr + value_i * element_size; + memcpy(&date_value, raw_byte_ptr, element_size); + DateV2Value v; - v.get_date_from_daynr(concrete_array->Value(value_i) + date_threshold); + v.get_date_from_daynr(date_value + date_threshold); col_data.emplace_back(binary_cast, UInt32>(v)); } return Status::OK(); diff --git a/be/src/vec/data_types/serde/data_type_decimal_serde.cpp b/be/src/vec/data_types/serde/data_type_decimal_serde.cpp index 0af0c5acf7d178..464bf73abf8792 100644 --- a/be/src/vec/data_types/serde/data_type_decimal_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_decimal_serde.cpp @@ -337,8 +337,10 @@ Status DataTypeDecimalSerDe::read_column_from_arrow(IColumn& column, } else if constexpr (T == TYPE_DECIMAL32 || T == TYPE_DECIMAL64 || T == TYPE_DECIMAL128I) { const auto* concrete_array = dynamic_cast(arrow_array); for (auto value_i = start; value_i < end; ++value_i) { - column_data.emplace_back( - *reinterpret_cast(concrete_array->Value(value_i))); + const auto* value = concrete_array->Value(value_i); + FieldType decimal_value = FieldType {}; + memcpy(&decimal_value, value, sizeof(FieldType)); + column_data.emplace_back(decimal_value); } } else if constexpr (T == TYPE_DECIMAL256) { const auto* concrete_array = dynamic_cast(arrow_array); diff --git a/be/src/vec/data_types/serde/data_type_number_serde.cpp b/be/src/vec/data_types/serde/data_type_number_serde.cpp index d483ed4c647167..d35f20635120fc 100644 --- a/be/src/vec/data_types/serde/data_type_number_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_number_serde.cpp @@ -219,10 +219,17 @@ Status DataTypeNumberSerDe::read_column_from_arrow(IColumn& column, const auto* concrete_array = dynamic_cast(arrow_array); std::shared_ptr buffer = concrete_array->value_data(); + const auto* offsets_data = concrete_array->value_offsets()->data(); + const size_t offset_size = sizeof(int32_t); for (size_t offset_i = start; offset_i < end; ++offset_i) { if (!concrete_array->IsNull(offset_i)) { - const auto* raw_data = buffer->data() + concrete_array->value_offset(offset_i); - const auto raw_data_len = concrete_array->value_length(offset_i); + int32_t start_offset = 0; + int32_t end_offset = 0; + memcpy(&start_offset, offsets_data + offset_i * offset_size, offset_size); + memcpy(&end_offset, offsets_data + (offset_i + 1) * offset_size, offset_size); + + const auto* raw_data = buffer->data() + start_offset; + const auto raw_data_len = end_offset - start_offset; if (raw_data_len == 0) { col_data.emplace_back(Int128()); // Int128() is NULL diff --git a/be/src/vec/data_types/serde/data_type_string_serde.cpp b/be/src/vec/data_types/serde/data_type_string_serde.cpp index 1b8e9cf88e972f..78edfdb97d7d37 100644 --- a/be/src/vec/data_types/serde/data_type_string_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_string_serde.cpp @@ -262,12 +262,21 @@ Status DataTypeStringSerDeBase::read_column_from_arrow( arrow_array->type_id() == arrow::Type::BINARY) { const auto* concrete_array = dynamic_cast(arrow_array); std::shared_ptr buffer = concrete_array->value_data(); + const uint8_t* offsets_data = concrete_array->value_offsets()->data(); + const size_t offset_size = sizeof(int32_t); for (auto offset_i = start; offset_i < end; ++offset_i) { if (!concrete_array->IsNull(offset_i)) { - const auto* raw_data = buffer->data() + concrete_array->value_offset(offset_i); + int32_t start_offset = 0; + int32_t end_offset = 0; + memcpy(&start_offset, offsets_data + offset_i * offset_size, offset_size); + memcpy(&end_offset, offsets_data + (offset_i + 1) * offset_size, offset_size); + + int32_t length = end_offset - start_offset; + const auto* raw_data = buffer->data() + start_offset; + assert_cast(column).insert_data( - (char*)raw_data, concrete_array->value_length(offset_i)); + reinterpret_cast(raw_data), length); } else { assert_cast(column).insert_default(); } @@ -451,4 +460,4 @@ template class DataTypeStringSerDeBase; template class DataTypeStringSerDeBase; template class DataTypeStringSerDeBase; -} // namespace doris::vectorized \ No newline at end of file +} // namespace doris::vectorized diff --git a/be/test/vec/data_types/serde/data_type_serde_datetime_v2_test.cpp b/be/test/vec/data_types/serde/data_type_serde_datetime_v2_test.cpp index 57e0dd76a7bedc..6226d5cfd4f8f0 100644 --- a/be/test/vec/data_types/serde/data_type_serde_datetime_v2_test.cpp +++ b/be/test/vec/data_types/serde/data_type_serde_datetime_v2_test.cpp @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#include +#include #include #include #include @@ -221,4 +223,66 @@ TEST_F(DataTypeDateTimeV2SerDeTest, serdes) { test_func(*serde_time_v2_0, column_time_v2_0); } -} // namespace doris::vectorized \ No newline at end of file +// Run with UBSan enabled to catch misalignment errors. +TEST_F(DataTypeDateTimeV2SerDeTest, ArrowMemNotAlignedDate) { + // 1.Prepare the data. + std::vector dates = {0, 365, 1000, 5000, 10000}; + const int64_t num_elements = dates.size(); + const int64_t element_size = sizeof(int32_t); + + // 2.Create an unaligned memory buffer. + std::vector data_storage(num_elements * element_size + 10); + uint8_t* unaligned_data = data_storage.data() + 1; + + // 3.Copy data to unaligned memory + for (size_t i = 0; i < dates.size(); ++i) { + memcpy(unaligned_data + i * element_size, &dates[i], element_size); + } + + // 4. Create Arrow array with unaligned memory + auto unaligned_buffer = arrow::Buffer::Wrap(unaligned_data, num_elements * element_size); + auto arr = std::make_shared(num_elements, unaligned_buffer); + const auto* raw_values_ptr = arr->raw_values(); + uintptr_t address = reinterpret_cast(raw_values_ptr); + EXPECT_EQ(address % 4, 1); + + // 5.Test read_column_from_arrow + cctz::time_zone tz; + auto st = serde_date_v2->read_column_from_arrow(*column_date_v2, arr.get(), 0, 1, tz); + EXPECT_TRUE(st.ok()); +} + +// Run with UBSan enabled to catch misalignment errors. +TEST_F(DataTypeDateTimeV2SerDeTest, ArrowMemNotAlignedDateTime) { + // 1.Prepare the data. + std::vector timestamps = {0, 86400000000, 31536000000000, 100000000000, 500000000000}; + const int64_t num_elements = timestamps.size(); + const int64_t element_size = sizeof(int64_t); + + // 2.Create an unaligned memory buffer. + std::vector data_storage(num_elements * element_size + 10); + uint8_t* unaligned_data = data_storage.data() + 1; + + // 3. Copy data to unaligned memory + for (size_t i = 0; i < timestamps.size(); ++i) { + memcpy(unaligned_data + i * element_size, ×tamps[i], element_size); + } + + // 4. Create Arrow array with unaligned memory + auto unaligned_buffer = arrow::Buffer::Wrap(unaligned_data, num_elements * element_size); + auto timestamp_type = arrow::timestamp(arrow::TimeUnit::MICRO); + auto arr = + std::make_shared(timestamp_type, num_elements, unaligned_buffer); + + const auto* raw_values_ptr = arr->raw_values(); + uintptr_t address = reinterpret_cast(raw_values_ptr); + EXPECT_EQ(address % 4, 1); + + // 5.Test read_column_from_arrow + cctz::time_zone tz; + auto st = + serde_datetime_v2_6->read_column_from_arrow(*column_datetime_v2_6, arr.get(), 0, 1, tz); + EXPECT_TRUE(st.ok()); +} + +} // namespace doris::vectorized diff --git a/be/test/vec/data_types/serde/data_type_serde_decimal_test.cpp b/be/test/vec/data_types/serde/data_type_serde_decimal_test.cpp index 994a59aacbcda2..697272ef8471f4 100644 --- a/be/test/vec/data_types/serde/data_type_serde_decimal_test.cpp +++ b/be/test/vec/data_types/serde/data_type_serde_decimal_test.cpp @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#include +#include #include #include #include @@ -291,4 +293,48 @@ TEST_F(DataTypeDecimalSerDeTest, serdes) { test_func(*serde_decimal128v2, column_decimal128_v2); } -} // namespace doris::vectorized \ No newline at end of file + +// Run with UBSan enabled to catch misalignment errors. +TEST_F(DataTypeDecimalSerDeTest, ArrowMemNotAligned) { + // 1.Prepare the data. + arrow::Decimal128Builder builder(arrow::decimal(38, 30)); + std::vector decimal_strings = {"12345.67", "89.10", "1112.13", "1415.16", + "1718.19"}; + + for (const auto& str : decimal_strings) { + EXPECT_TRUE(builder.Append(arrow::Decimal128(str)).ok()); + } + + std::shared_ptr aligned_array; + EXPECT_TRUE(builder.Finish(&aligned_array).ok()); + auto decimal_array = std::static_pointer_cast(aligned_array); + + // 2.Create an unaligned memory buffer. + const int64_t num_elements = decimal_array->length(); + const int64_t element_size = decimal_array->byte_width(); + + std::vector data_storage(num_elements * element_size + 10); + uint8_t* unaligned_data = data_storage.data() + 1; + + // 3. Copy data to unaligned memory + const uint8_t* original_data = decimal_array->raw_values(); + memcpy(unaligned_data, original_data, num_elements * element_size); + + // 4. Create Arrow array with unaligned memory + auto unaligned_buffer = arrow::Buffer::Wrap(unaligned_data, num_elements * element_size); + + auto arr = std::make_shared(arrow::decimal(38, 30), num_elements, + unaligned_buffer); + + const auto* raw_values_ptr = arr->raw_values(); + uintptr_t address = reinterpret_cast(raw_values_ptr); + EXPECT_EQ(address % 4, 1); + + // 5.Test read_column_from_arrow + cctz::time_zone tz; + auto st = serde_decimal128v3_2->read_column_from_arrow(*column_decimal128v3_2, arr.get(), 0, 1, + tz); + EXPECT_TRUE(st.ok()); +} + +} // namespace doris::vectorized diff --git a/be/test/vec/data_types/serde/data_type_serde_number_test.cpp b/be/test/vec/data_types/serde/data_type_serde_number_test.cpp index b0a1a58a9e5b63..360671eaa97b67 100644 --- a/be/test/vec/data_types/serde/data_type_serde_number_test.cpp +++ b/be/test/vec/data_types/serde/data_type_serde_number_test.cpp @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#include +#include #include #include #include @@ -291,4 +293,53 @@ TEST_F(DataTypeNumberSerDeTest, serdes) { test_func(*serde_int128, column_int128); test_func(*serde_uint8, column_uint8); } -} // namespace doris::vectorized \ No newline at end of file + +// Run with UBSan enabled to catch misalignment errors. +TEST_F(DataTypeNumberSerDeTest, ArrowMemNotAligned) { + // 1.Prepare the data. + std::vector strings = {"9223372036854775807", "9223372036854775806", + "9223372036854775805", "9223372036854775804", + "9223372036854775803"}; + + int32_t total_length = 0; + std::vector offsets = {0}; + for (const auto& str : strings) { + total_length += static_cast(str.length()); + offsets.push_back(total_length); + } + + // 2.Create an unaligned memory buffer. + std::vector value_storage(total_length + 10); + std::vector offset_storage((strings.size() + 1) * sizeof(int32_t) + 10); + + uint8_t* unaligned_value_data = value_storage.data() + 1; + uint8_t* unaligned_offset_data = offset_storage.data() + 1; + + // 3. Copy data to unaligned memory + int32_t current_pos = 0; + for (size_t i = 0; i < strings.size(); ++i) { + memcpy(unaligned_value_data + current_pos, strings[i].data(), strings[i].length()); + current_pos += strings[i].length(); + } + + for (size_t i = 0; i < offsets.size(); ++i) { + memcpy(unaligned_offset_data + i * sizeof(int32_t), &offsets[i], sizeof(int32_t)); + } + + // 4. Create Arrow array with unaligned memory + auto value_buffer = arrow::Buffer::Wrap(unaligned_value_data, total_length); + auto offset_buffer = + arrow::Buffer::Wrap(unaligned_offset_data, offsets.size() * sizeof(int32_t)); + auto arr = std::make_shared(strings.size(), offset_buffer, value_buffer); + + const auto* offsets_ptr = arr->raw_value_offsets(); + uintptr_t address = reinterpret_cast(offsets_ptr); + EXPECT_EQ((reinterpret_cast(address) % 4), 1); + + // 5.Test read_column_from_arrow + cctz::time_zone tz; + auto st = serde_int128->read_column_from_arrow(*column_int128, arr.get(), 0, 1, tz); + EXPECT_TRUE(st.ok()); +} + +} // namespace doris::vectorized diff --git a/be/test/vec/data_types/serde/data_type_serde_string_test.cpp b/be/test/vec/data_types/serde/data_type_serde_string_test.cpp index 125d870d5b8ea4..06e8f219c7a9ee 100644 --- a/be/test/vec/data_types/serde/data_type_serde_string_test.cpp +++ b/be/test/vec/data_types/serde/data_type_serde_string_test.cpp @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#include +#include #include #include #include @@ -32,12 +34,14 @@ #include "runtime/types.h" #include "testutil/test_util.h" #include "vec/columns/column.h" +#include "vec/columns/column_array.h" #include "vec/common/assert_cast.h" #include "vec/core/field.h" #include "vec/core/types.h" #include "vec/data_types/common_data_type_serder_test.h" #include "vec/data_types/common_data_type_test.h" #include "vec/data_types/data_type.h" +#include "vec/data_types/data_type_array.h" #include "vec/data_types/data_type_factory.hpp" #include "vec/data_types/data_type_nullable.h" #include "vec/data_types/data_type_string.h" @@ -195,4 +199,127 @@ TEST_F(DataTypeStringSerDeTest, serdes) { test_func(*serde_str, column_str32); } -} // namespace doris::vectorized \ No newline at end of file +// Run with UBSan enabled to catch misalignment errors. +TEST_F(DataTypeStringSerDeTest, ArrowMemNotAligned) { + // 1.Prepare the data. + std::vector strings = {"hello", "world!", "test", "unaligned", "memory"}; + + int32_t total_length = 0; + std::vector offsets = {0}; + for (const auto& str : strings) { + total_length += static_cast(str.length()); + offsets.push_back(total_length); + } + + // 2.Create an unaligned memory buffer. + std::vector value_storage(total_length + 10); + std::vector offset_storage((strings.size() + 1) * sizeof(int32_t) + 10); + + uint8_t* unaligned_value_data = value_storage.data() + 1; + uint8_t* unaligned_offset_data = offset_storage.data() + 1; + + // 3. Copy data to unaligned memory + int32_t current_pos = 0; + for (size_t i = 0; i < strings.size(); ++i) { + memcpy(unaligned_value_data + current_pos, strings[i].data(), strings[i].length()); + current_pos += strings[i].length(); + } + + for (size_t i = 0; i < offsets.size(); ++i) { + memcpy(unaligned_offset_data + i * sizeof(int32_t), &offsets[i], sizeof(int32_t)); + } + + // 4. Create Arrow array with unaligned memory + auto value_buffer = arrow::Buffer::Wrap(unaligned_value_data, total_length); + auto offset_buffer = + arrow::Buffer::Wrap(unaligned_offset_data, offsets.size() * sizeof(int32_t)); + auto arr = std::make_shared(strings.size(), offset_buffer, value_buffer); + + const auto* offsets_ptr = arr->raw_value_offsets(); + uintptr_t address = reinterpret_cast(offsets_ptr); + EXPECT_EQ((reinterpret_cast(address) % 4), 1); + + // 5.Test read_column_from_arrow + cctz::time_zone tz; + auto st = serde_str->read_column_from_arrow(*column_str32, arr.get(), 0, 1, tz); + EXPECT_TRUE(st.ok()); +} + +// Run with UBSan enabled to catch misalignment errors. +TEST_F(DataTypeStringSerDeTest, ArrowMemNotAlignedNestedArr) { + // 1.Prepare the data. + std::vector string_data = {"hello", "world", "test", "a", "b", "c"}; + std::vector string_offsets = {0}; + int32_t current_offset = 0; + for (const auto& str : string_data) { + current_offset += static_cast(str.length()); + string_offsets.push_back(current_offset); + } + + std::vector list_offsets = {0, 2, 3, 6, 6}; + std::vector value_data; + for (const auto& str : string_data) { + value_data.insert(value_data.end(), str.begin(), str.end()); + } + + const int64_t num_lists = list_offsets.size() - 1; + const int64_t offset_element_size = sizeof(int32_t); + + // 2.Create an unaligned memory buffer. + std::vector list_offset_storage(list_offsets.size() * offset_element_size + 10); + uint8_t* unaligned_list_offsets = list_offset_storage.data() + 1; + + std::vector string_offset_storage(string_offsets.size() * offset_element_size + 10); + uint8_t* unaligned_string_offsets = string_offset_storage.data() + 1; + + std::vector value_storage(value_data.size() + 10); + uint8_t* unaligned_values = value_storage.data() + 1; + + // 3. Copy data to unaligned memory + for (size_t i = 0; i < list_offsets.size(); ++i) { + memcpy(unaligned_list_offsets + i * offset_element_size, &list_offsets[i], + offset_element_size); + } + + for (size_t i = 0; i < string_offsets.size(); ++i) { + memcpy(unaligned_string_offsets + i * offset_element_size, &string_offsets[i], + offset_element_size); + } + + memcpy(unaligned_values, value_data.data(), value_data.size()); + + // 4. Create Arrow array with unaligned memory + auto value_buffer = arrow::Buffer::Wrap(unaligned_values, value_data.size()); + auto string_offsets_buffer = + arrow::Buffer::Wrap(unaligned_string_offsets, string_offsets.size() * sizeof(int32_t)); + auto string_array = std::make_shared(string_offsets.size() - 1, + string_offsets_buffer, value_buffer); + auto list_offsets_buffer = + arrow::Buffer::Wrap(unaligned_list_offsets, list_offsets.size() * offset_element_size); + auto list_offsets_array = + std::make_shared(list_offsets.size(), list_offsets_buffer); + + auto arr = std::make_shared(arrow::list(arrow::utf8()), num_lists, + list_offsets_buffer, string_array); + + const auto* concrete_array = dynamic_cast(arr.get()); + auto arrow_offsets_array = concrete_array->offsets(); + auto* arrow_offsets = dynamic_cast(arrow_offsets_array.get()); + + const auto* offsets_ptr = arrow_offsets->raw_values(); + uintptr_t offsets_address = reinterpret_cast(offsets_ptr); + EXPECT_EQ(offsets_address % 4, 1); + + const auto* values_ptr = string_array->value_data()->data(); + uintptr_t values_address = reinterpret_cast(values_ptr); + EXPECT_EQ(values_address % 4, 1); + + // 5.Test read_column_from_arrow + auto ser_col = ColumnArray::create(ColumnString::create(), ColumnOffset64::create()); + cctz::time_zone tz; + auto serde_list = std::make_shared(serde_str); + auto st = serde_list->read_column_from_arrow(*ser_col, arr.get(), 0, 1, tz); + EXPECT_TRUE(st.ok()); +} + +} // namespace doris::vectorized