Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions be/src/vec/data_types/serde/data_type_array_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,11 +326,21 @@ Status DataTypeArraySerDe::read_column_from_arrow(IColumn& column, const arrow::
auto arrow_offsets_array = concrete_array->offsets();
auto* arrow_offsets = dynamic_cast<arrow::Int32Array*>(arrow_offsets_array.get());
auto prev_size = offsets_data.back();
auto arrow_nested_start_offset = arrow_offsets->Value(start);
auto arrow_nested_end_offset = arrow_offsets->Value(end);
const auto* base_offsets_ptr = reinterpret_cast<const uint8_t*>(arrow_offsets->raw_values());
const size_t offset_element_size = sizeof(int32_t);
int32_t arrow_nested_start_offset = 0;
int32_t arrow_nested_end_offset = 0;
const uint8_t* start_offset_ptr = base_offsets_ptr + start * offset_element_size;
const uint8_t* end_offset_ptr = base_offsets_ptr + end * offset_element_size;
memcpy(&arrow_nested_start_offset, start_offset_ptr, offset_element_size);
memcpy(&arrow_nested_end_offset, end_offset_ptr, offset_element_size);

for (auto i = start + 1; i < end + 1; ++i) {
int32_t current_offset = 0;
const uint8_t* current_offset_ptr = base_offsets_ptr + i * offset_element_size;
memcpy(&current_offset, current_offset_ptr, offset_element_size);
// convert to doris offset, start from offsets.back()
offsets_data.emplace_back(prev_size + arrow_offsets->Value(i) - arrow_nested_start_offset);
offsets_data.emplace_back(prev_size + current_offset - arrow_nested_start_offset);
}
return nested_serde->read_column_from_arrow(
column_array.get_data(), concrete_array->values().get(), arrow_nested_start_offset,
Expand Down
7 changes: 6 additions & 1 deletion be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,13 @@ Status DataTypeDateTimeV2SerDe::read_column_from_arrow(IColumn& column,
type->unit());
}
}
const auto* base_ptr = reinterpret_cast<const uint8_t*>(concrete_array->raw_values());
const size_t element_size = sizeof(int64_t);
for (auto value_i = start; value_i < end; ++value_i) {
auto utc_epoch = static_cast<UInt64>(concrete_array->Value(value_i));
int64_t date_value = 0;
const uint8_t* raw_byte_ptr = base_ptr + value_i * element_size;
memcpy(&date_value, raw_byte_ptr, element_size);
auto utc_epoch = static_cast<UInt64>(date_value);

DateV2Value<DateTimeV2ValueType> v;
// convert second
Expand Down
8 changes: 7 additions & 1 deletion be/src/vec/data_types/serde/data_type_datev2_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,15 @@ Status DataTypeDateV2SerDe::read_column_from_arrow(IColumn& column, const arrow:
const cctz::time_zone& ctz) const {
auto& col_data = static_cast<ColumnDateV2&>(column).get_data();
const auto* concrete_array = dynamic_cast<const arrow::Date32Array*>(arrow_array);
const auto* base_ptr = reinterpret_cast<const uint8_t*>(concrete_array->raw_values());
const size_t element_size = sizeof(int32_t);
for (auto value_i = start; value_i < end; ++value_i) {
int32_t date_value = 0;
const uint8_t* raw_byte_ptr = base_ptr + value_i * element_size;
memcpy(&date_value, raw_byte_ptr, element_size);

DateV2Value<DateV2ValueType> v;
v.get_date_from_daynr(concrete_array->Value(value_i) + date_threshold);
v.get_date_from_daynr(date_value + date_threshold);
col_data.emplace_back(binary_cast<DateV2Value<DateV2ValueType>, UInt32>(v));
}
return Status::OK();
Expand Down
6 changes: 4 additions & 2 deletions be/src/vec/data_types/serde/data_type_decimal_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,10 @@ Status DataTypeDecimalSerDe<T>::read_column_from_arrow(IColumn& column,
} else if constexpr (T == TYPE_DECIMAL32 || T == TYPE_DECIMAL64 || T == TYPE_DECIMAL128I) {
const auto* concrete_array = dynamic_cast<const arrow::DecimalArray*>(arrow_array);
for (auto value_i = start; value_i < end; ++value_i) {
column_data.emplace_back(
*reinterpret_cast<const FieldType*>(concrete_array->Value(value_i)));
const auto* value = concrete_array->Value(value_i);
FieldType decimal_value = FieldType {};
memcpy(&decimal_value, value, sizeof(FieldType));
column_data.emplace_back(decimal_value);
}
} else if constexpr (T == TYPE_DECIMAL256) {
const auto* concrete_array = dynamic_cast<const arrow::Decimal256Array*>(arrow_array);
Expand Down
11 changes: 9 additions & 2 deletions be/src/vec/data_types/serde/data_type_number_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,17 @@ Status DataTypeNumberSerDe<T>::read_column_from_arrow(IColumn& column,
const auto* concrete_array = dynamic_cast<const arrow::StringArray*>(arrow_array);
std::shared_ptr<arrow::Buffer> buffer = concrete_array->value_data();

const auto* offsets_data = concrete_array->value_offsets()->data();
const size_t offset_size = sizeof(int32_t);
for (size_t offset_i = start; offset_i < end; ++offset_i) {
if (!concrete_array->IsNull(offset_i)) {
const auto* raw_data = buffer->data() + concrete_array->value_offset(offset_i);
const auto raw_data_len = concrete_array->value_length(offset_i);
int32_t start_offset = 0;
int32_t end_offset = 0;
memcpy(&start_offset, offsets_data + offset_i * offset_size, offset_size);
memcpy(&end_offset, offsets_data + (offset_i + 1) * offset_size, offset_size);

const auto* raw_data = buffer->data() + start_offset;
const auto raw_data_len = end_offset - start_offset;

if (raw_data_len == 0) {
col_data.emplace_back(Int128()); // Int128() is NULL
Expand Down
15 changes: 12 additions & 3 deletions be/src/vec/data_types/serde/data_type_string_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,12 +262,21 @@ Status DataTypeStringSerDeBase<ColumnType>::read_column_from_arrow(
arrow_array->type_id() == arrow::Type::BINARY) {
const auto* concrete_array = dynamic_cast<const arrow::BinaryArray*>(arrow_array);
std::shared_ptr<arrow::Buffer> buffer = concrete_array->value_data();
const uint8_t* offsets_data = concrete_array->value_offsets()->data();
const size_t offset_size = sizeof(int32_t);

for (auto offset_i = start; offset_i < end; ++offset_i) {
if (!concrete_array->IsNull(offset_i)) {
const auto* raw_data = buffer->data() + concrete_array->value_offset(offset_i);
int32_t start_offset = 0;
int32_t end_offset = 0;
memcpy(&start_offset, offsets_data + offset_i * offset_size, offset_size);
memcpy(&end_offset, offsets_data + (offset_i + 1) * offset_size, offset_size);

int32_t length = end_offset - start_offset;
const auto* raw_data = buffer->data() + start_offset;

assert_cast<ColumnType&>(column).insert_data(
(char*)raw_data, concrete_array->value_length(offset_i));
reinterpret_cast<const char*>(raw_data), length);
} else {
assert_cast<ColumnType&>(column).insert_default();
}
Expand Down Expand Up @@ -451,4 +460,4 @@ template class DataTypeStringSerDeBase<ColumnString>;
template class DataTypeStringSerDeBase<ColumnString64>;
template class DataTypeStringSerDeBase<ColumnFixedLengthObject>;

} // namespace doris::vectorized
} // namespace doris::vectorized
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

#include <arrow/api.h>
#include <cctz/time_zone.h>
#include <gtest/gtest-message.h>
#include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>
Expand Down Expand Up @@ -221,4 +223,66 @@ TEST_F(DataTypeDateTimeV2SerDeTest, serdes) {
test_func(*serde_time_v2_0, column_time_v2_0);
}

} // namespace doris::vectorized
// Run with UBSan enabled to catch misalignment errors.
TEST_F(DataTypeDateTimeV2SerDeTest, ArrowMemNotAlignedDate) {
// 1.Prepare the data.
std::vector<int32_t> dates = {0, 365, 1000, 5000, 10000};
const int64_t num_elements = dates.size();
const int64_t element_size = sizeof(int32_t);

// 2.Create an unaligned memory buffer.
std::vector<uint8_t> data_storage(num_elements * element_size + 10);
uint8_t* unaligned_data = data_storage.data() + 1;

// 3.Copy data to unaligned memory
for (size_t i = 0; i < dates.size(); ++i) {
memcpy(unaligned_data + i * element_size, &dates[i], element_size);
}

// 4. Create Arrow array with unaligned memory
auto unaligned_buffer = arrow::Buffer::Wrap(unaligned_data, num_elements * element_size);
auto arr = std::make_shared<arrow::Date32Array>(num_elements, unaligned_buffer);
const auto* raw_values_ptr = arr->raw_values();
uintptr_t address = reinterpret_cast<uintptr_t>(raw_values_ptr);
EXPECT_EQ(address % 4, 1);

// 5.Test read_column_from_arrow
cctz::time_zone tz;
auto st = serde_date_v2->read_column_from_arrow(*column_date_v2, arr.get(), 0, 1, tz);
EXPECT_TRUE(st.ok());
}

// Run with UBSan enabled to catch misalignment errors.
TEST_F(DataTypeDateTimeV2SerDeTest, ArrowMemNotAlignedDateTime) {
// 1.Prepare the data.
std::vector<int64_t> timestamps = {0, 86400000000, 31536000000000, 100000000000, 500000000000};
const int64_t num_elements = timestamps.size();
const int64_t element_size = sizeof(int64_t);

// 2.Create an unaligned memory buffer.
std::vector<uint8_t> data_storage(num_elements * element_size + 10);
uint8_t* unaligned_data = data_storage.data() + 1;

// 3. Copy data to unaligned memory
for (size_t i = 0; i < timestamps.size(); ++i) {
memcpy(unaligned_data + i * element_size, &timestamps[i], element_size);
}

// 4. Create Arrow array with unaligned memory
auto unaligned_buffer = arrow::Buffer::Wrap(unaligned_data, num_elements * element_size);
auto timestamp_type = arrow::timestamp(arrow::TimeUnit::MICRO);
auto arr =
std::make_shared<arrow::TimestampArray>(timestamp_type, num_elements, unaligned_buffer);

const auto* raw_values_ptr = arr->raw_values();
uintptr_t address = reinterpret_cast<uintptr_t>(raw_values_ptr);
EXPECT_EQ(address % 4, 1);

// 5.Test read_column_from_arrow
cctz::time_zone tz;
auto st =
serde_datetime_v2_6->read_column_from_arrow(*column_datetime_v2_6, arr.get(), 0, 1, tz);
EXPECT_TRUE(st.ok());
}

} // namespace doris::vectorized
48 changes: 47 additions & 1 deletion be/test/vec/data_types/serde/data_type_serde_decimal_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

#include <arrow/api.h>
#include <cctz/time_zone.h>
#include <gtest/gtest-message.h>
#include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>
Expand Down Expand Up @@ -291,4 +293,48 @@ TEST_F(DataTypeDecimalSerDeTest, serdes) {

test_func(*serde_decimal128v2, column_decimal128_v2);
}
} // namespace doris::vectorized

// Run with UBSan enabled to catch misalignment errors.
TEST_F(DataTypeDecimalSerDeTest, ArrowMemNotAligned) {
// 1.Prepare the data.
arrow::Decimal128Builder builder(arrow::decimal(38, 30));
std::vector<std::string> decimal_strings = {"12345.67", "89.10", "1112.13", "1415.16",
"1718.19"};

for (const auto& str : decimal_strings) {
EXPECT_TRUE(builder.Append(arrow::Decimal128(str)).ok());
}

std::shared_ptr<arrow::Array> aligned_array;
EXPECT_TRUE(builder.Finish(&aligned_array).ok());
auto decimal_array = std::static_pointer_cast<arrow::DecimalArray>(aligned_array);

// 2.Create an unaligned memory buffer.
const int64_t num_elements = decimal_array->length();
const int64_t element_size = decimal_array->byte_width();

std::vector<uint8_t> data_storage(num_elements * element_size + 10);
uint8_t* unaligned_data = data_storage.data() + 1;

// 3. Copy data to unaligned memory
const uint8_t* original_data = decimal_array->raw_values();
memcpy(unaligned_data, original_data, num_elements * element_size);

// 4. Create Arrow array with unaligned memory
auto unaligned_buffer = arrow::Buffer::Wrap(unaligned_data, num_elements * element_size);

auto arr = std::make_shared<arrow::DecimalArray>(arrow::decimal(38, 30), num_elements,
unaligned_buffer);

const auto* raw_values_ptr = arr->raw_values();
uintptr_t address = reinterpret_cast<uintptr_t>(raw_values_ptr);
EXPECT_EQ(address % 4, 1);

// 5.Test read_column_from_arrow
cctz::time_zone tz;
auto st = serde_decimal128v3_2->read_column_from_arrow(*column_decimal128v3_2, arr.get(), 0, 1,
tz);
EXPECT_TRUE(st.ok());
}

} // namespace doris::vectorized
53 changes: 52 additions & 1 deletion be/test/vec/data_types/serde/data_type_serde_number_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

#include <arrow/api.h>
#include <cctz/time_zone.h>
#include <gtest/gtest-message.h>
#include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>
Expand Down Expand Up @@ -291,4 +293,53 @@ TEST_F(DataTypeNumberSerDeTest, serdes) {
test_func(*serde_int128, column_int128);
test_func(*serde_uint8, column_uint8);
}
} // namespace doris::vectorized

// Run with UBSan enabled to catch misalignment errors.
TEST_F(DataTypeNumberSerDeTest, ArrowMemNotAligned) {
// 1.Prepare the data.
std::vector<std::string> strings = {"9223372036854775807", "9223372036854775806",
"9223372036854775805", "9223372036854775804",
"9223372036854775803"};

int32_t total_length = 0;
std::vector<int32_t> offsets = {0};
for (const auto& str : strings) {
total_length += static_cast<int32_t>(str.length());
offsets.push_back(total_length);
}

// 2.Create an unaligned memory buffer.
std::vector<uint8_t> value_storage(total_length + 10);
std::vector<uint8_t> offset_storage((strings.size() + 1) * sizeof(int32_t) + 10);

uint8_t* unaligned_value_data = value_storage.data() + 1;
uint8_t* unaligned_offset_data = offset_storage.data() + 1;

// 3. Copy data to unaligned memory
int32_t current_pos = 0;
for (size_t i = 0; i < strings.size(); ++i) {
memcpy(unaligned_value_data + current_pos, strings[i].data(), strings[i].length());
current_pos += strings[i].length();
}

for (size_t i = 0; i < offsets.size(); ++i) {
memcpy(unaligned_offset_data + i * sizeof(int32_t), &offsets[i], sizeof(int32_t));
}

// 4. Create Arrow array with unaligned memory
auto value_buffer = arrow::Buffer::Wrap(unaligned_value_data, total_length);
auto offset_buffer =
arrow::Buffer::Wrap(unaligned_offset_data, offsets.size() * sizeof(int32_t));
auto arr = std::make_shared<arrow::StringArray>(strings.size(), offset_buffer, value_buffer);

const auto* offsets_ptr = arr->raw_value_offsets();
uintptr_t address = reinterpret_cast<uintptr_t>(offsets_ptr);
EXPECT_EQ((reinterpret_cast<uintptr_t>(address) % 4), 1);

// 5.Test read_column_from_arrow
cctz::time_zone tz;
auto st = serde_int128->read_column_from_arrow(*column_int128, arr.get(), 0, 1, tz);
EXPECT_TRUE(st.ok());
}

} // namespace doris::vectorized
Loading
Loading