-
Notifications
You must be signed in to change notification settings - Fork 5.5k
misc: Refactor ShuffleTest #26643
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
misc: Refactor ShuffleTest #26643
Conversation
Reviewer's GuideRefactors the ShuffleTest suite to remove custom in-memory TestShuffleWriter/Reader and leverage LocalPersistentShuffleFactory with on-disk shuffle; adds JSON-based helpers and temp-directory management in tests; extends LocalShuffle Reader/Writer with sorted-shuffle support via a k-way merge stream; and enhances logging, error-injection hooks, and factory parsing for sortedShuffle. Sequence diagram for sorted shuffle reading with k-way mergesequenceDiagram
participant Factory as LocalPersistentShuffleFactory
participant Reader as LocalShuffleReader
participant Merge as TreeOfLosers
participant Stream as SortedFileInputStream
Factory->>Reader: createReader(serializedStr, ...)
Reader->>Reader: initialize()
Reader->>Stream: create SortedFileInputStream for each file
Stream->>Merge: add to k-way merge
Reader->>Merge: setup TreeOfLosers
Reader->>Reader: nextSorted(maxBytes)
loop for each batch
Merge->>Stream: next()
Stream->>Reader: provide currentKey/currentData
Reader->>Reader: copy data to batch
Stream->>Stream: next()
end
ER diagram for LocalShuffleWriteInfo and LocalShuffleReadInfo JSON deserializationerDiagram
LOCALSHUFFLEWRITEINFO {
string rootPath
string queryId
string shuffleId
int numPartitions
}
LOCALSHUFFLEREADINFO {
string rootPath
string queryId
string[] partitionIds
}
LOCALSHUFFLEWRITEINFO ||--o{ LOCALSHUFFLEREADINFO : "related to"
LOCALSHUFFLEWRITEINFO }|..|{ JSON : "deserialized from"
LOCALSHUFFLEREADINFO }|..|{ JSON : "deserialized from"
Class diagram for LocalShuffleReader and SortedFileInputStream refactorclassDiagram
class LocalShuffleReader {
+LocalShuffleReader(rootPath, queryId, partitionIds, sortedShuffle, pool)
+void initialize()
+folly::SemiFuture<std::vector<std::unique_ptr<ReadBatch>>> next(uint64_t maxBytes)
+void noMoreData(bool success)
+std::vector<std::string> getReadPartitionFiles() const
+std::vector<std::unique_ptr<ReadBatch>> nextSorted(uint64_t maxBytes)
+std::vector<std::unique_ptr<ReadBatch>> nextUnsorted(uint64_t maxBytes)
-std::unique_ptr<velox::TreeOfLosers<velox::MergeStream, uint16_t>> merge_
-bool initialized_
}
class SortedFileInputStream {
+SortedFileInputStream(filePath, streamIdx, pool, bufferSize)
+bool next()
+std::string_view currentKey() const
+std::string_view currentData() const
+bool hasData() const
+bool operator<(const velox::MergeStream& other) const
-std::string_view nextStringView(TRowSize size, std::string& storage)
-TStreamIdx streamIdx_
-std::string_view currentKey_
-std::string_view currentData_
-std::string keyStorage_
-std::string dataStorage_
}
class velox_MergeStream {
}
class velox_common_FileInputStream {
}
class velox_TreeOfLosers_MergeStream_uint16_t {
}
SortedFileInputStream --|> velox_common_FileInputStream
SortedFileInputStream --|> velox_MergeStream
LocalShuffleReader --> SortedFileInputStream : uses for sorted shuffle
LocalShuffleReader --> velox_TreeOfLosers_MergeStream_uint16_t : uses for k-way merge
Class diagram for LocalPersistentShuffleFactory changesclassDiagram
class LocalPersistentShuffleFactory {
+std::shared_ptr<ShuffleReader> createReader(serializedStr, partition, pool)
+std::shared_ptr<ShuffleWriter> createWriter(serializedStr, pool)
}
class LocalShuffleReader {
}
class LocalShuffleWriter {
}
LocalPersistentShuffleFactory --> LocalShuffleReader : creates
LocalPersistentShuffleFactory --> LocalShuffleWriter : creates
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- The ShuffleTest class contains many near-duplicate loops over test configurations; consider using GTest parameterized tests or extracting helpers to reduce duplication and improve maintainability.
- In LocalShuffleWriter::collect and LocalShuffleReader::next, the widespread use of LOG(INFO) on hot paths may clutter test and production logs—consider using a debug-level log or guarding these statements behind a flag.
- The new SortedFileInputStream and loser-tree merge logic reimplements functionality that may exist elsewhere in Velox; consider refactoring to reuse existing streaming or merge utilities and adding focused unit tests for edge cases (empty partitions, very large keys).
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The ShuffleTest class contains many near-duplicate loops over test configurations; consider using GTest parameterized tests or extracting helpers to reduce duplication and improve maintainability.
- In LocalShuffleWriter::collect and LocalShuffleReader::next, the widespread use of LOG(INFO) on hot paths may clutter test and production logs—consider using a debug-level log or guarding these statements behind a flag.
- The new SortedFileInputStream and loser-tree merge logic reimplements functionality that may exist elsewhere in Velox; consider refactoring to reuse existing streaming or merge utilities and adding focused unit tests for edge cases (empty partitions, very large keys).
## Individual Comments
### Comment 1
<location> `presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp:356-359` </location>
<code_context>
int32_t partition,
std::string_view key,
std::string_view data) {
+ LOG(INFO) << "LocalShuffleWriter::collect START partition=" << partition
+ << " sortedShuffle=" << sortedShuffle_
+ << " keySize=" << key.size()
</code_context>
<issue_to_address>
**suggestion (performance):** Extensive logging in collect() may impact performance in production.
Consider reducing the log level or making logging conditional to avoid overhead and excessive log volume in production.
```suggestion
VLOG(1) << "LocalShuffleWriter::collect START partition=" << partition
<< " sortedShuffle=" << sortedShuffle_
<< " keySize=" << key.size()
<< " dataSize=" << data.size();
```
</issue_to_address>
### Comment 2
<location> `presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp:372-376` </location>
<code_context>
+ "facebook::presto::operators::LocalShuffleWriter::collect", this);
+
+ // Log the actual data content for debugging
+ if (data.size() >= 16) {
+ LOG(INFO) << "LocalShuffleWriter::collect data first 16 bytes (hex): "
+ << folly::hexlify(std::string_view(data.data(), std::min(data.size(), size_t(16))));
+ }
</code_context>
<issue_to_address>
**🚨 suggestion (security):** Logging raw data in hex may expose sensitive information.
In production, restrict or redact this logging to avoid leaking sensitive data.
```suggestion
#if !defined(NDEBUG)
// Log the actual data content for debugging (only in debug builds)
if (data.size() >= 16) {
LOG(INFO) << "LocalShuffleWriter::collect data first 16 bytes (hex): "
<< folly::hexlify(std::string_view(data.data(), std::min(data.size(), size_t(16))));
}
#else
// In production, do not log raw data to avoid leaking sensitive information.
LOG(INFO) << "LocalShuffleWriter::collect data logging is redacted in production.";
#endif
```
</issue_to_address>
### Comment 3
<location> `presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp:428` </location>
<code_context>
- if (readPartitionFiles_.empty()) {
- readPartitionFiles_ = getReadPartitionFiles();
+void LocalShuffleReader::initialize() {
+ LOG(INFO) << "LocalShuffleReader::initialize START sortedShuffle=" << sortedShuffle_;
+ VELOX_CHECK(!initialized_, "LocalShuffleReader already initialized");
+
</code_context>
<issue_to_address>
**suggestion:** Initialization is now required before next(), but not enforced at construction.
Consider documenting or enforcing initialization at the API level to prevent misuse, as the requirement is currently only checked at runtime.
Suggested implementation:
```cpp
LocalShuffleReader::LocalShuffleReader(
fileSystem_ = velox::filesystems::getFileSystem(rootPath_, nullptr);
// Automatically initialize upon construction to enforce API usage.
initialize();
}
```
```cpp
void LocalShuffleReader::initialize() {
LOG(INFO) << "LocalShuffleReader::initialize START sortedShuffle=" << sortedShuffle_;
VELOX_CHECK(!initialized_, "LocalShuffleReader already initialized");
initialized_ = true;
}
#include "velox/common/file/FileInputStream.h"
```
```cpp
namespace facebook::presto::operators {
// Document that initialization is enforced at construction and next() requires it.
/*
* LocalShuffleReader
*
* This class enforces initialization at construction. The next() method requires
* that the reader is initialized, which is now guaranteed by the constructor.
*/
```
If `initialize()` must be called with parameters or in a specific order, consider making it private and only accessible from the constructor. If you want to allow deferred initialization, add a VELOX_CHECK(initialized_, ...) at the start of `next()` to enforce the requirement at the API level.
If the header file (.h) for LocalShuffleReader exists, update its documentation and method visibility accordingly.
</issue_to_address>
### Comment 4
<location> `presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp:475` </location>
<code_context>
+ std::vector<std::string_view> rows;
+ uint64_t bufferUsed = 0;
+
+ while (auto* stream = merge_->next()) {
+ auto* reader = dynamic_cast<SortedFileInputStream*>(stream);
+ const auto data = reader->currentData();
</code_context>
<issue_to_address>
**suggestion (performance):** Potential for large memory usage if many rows are merged in nextSorted.
If a single row exceeds maxBytes, the buffer grows beyond the intended limit, which may cause excessive memory consumption. Adding explicit checks or limits for row size could help prevent this.
</issue_to_address>
### Comment 5
<location> `presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp:497-501` </location>
<code_context>
+
+ // Copy data as-is without size prefix
+ char* writePos = batchBuffer->asMutable<char>() + bufferUsed;
+ if (!data.empty()) {
+ memcpy(writePos, data.data(), data.size());
+ }
+
</code_context>
<issue_to_address>
**suggestion (bug_risk):** No bounds check on memcpy for data copy.
Consider adding an explicit bounds check before memcpy to ensure batchBuffer has sufficient space at writePos, preventing potential buffer overruns from logic errors.
```suggestion
// Copy data as-is without size prefix
char* writePos = batchBuffer->asMutable<char>() + bufferUsed;
if (!data.empty()) {
// Bounds check to prevent buffer overrun
if (bufferUsed + data.size() > batchBuffer->capacity()) {
throw std::runtime_error("Buffer overrun: not enough space in batchBuffer for data copy.");
}
memcpy(writePos, data.data(), data.size());
}
```
</issue_to_address>
### Comment 6
<location> `presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp:623-631` </location>
<code_context>
- return std::make_shared<LocalShuffleReader>(
+ // Check if sortedShuffle field is present in the JSON
+ bool sortedShuffle = false;
+ try {
+ const auto jsonReadInfo = json::parse(serializedStr);
+ if (jsonReadInfo.contains("sortedShuffle")) {
+ jsonReadInfo.at("sortedShuffle").get_to(sortedShuffle);
+ }
+ } catch (const std::exception& /*e*/) {
+ // If parsing fails or field doesn't exist, default to false
+ sortedShuffle = false;
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Silent catch of exceptions may hide deserialization errors.
Instead of silently defaulting to false, log the exception or add error handling to aid debugging.
```suggestion
try {
const auto jsonReadInfo = json::parse(serializedStr);
if (jsonReadInfo.contains("sortedShuffle")) {
jsonReadInfo.at("sortedShuffle").get_to(sortedShuffle);
}
} catch (const std::exception& e) {
LOG(ERROR) << "Error parsing sortedShuffle from JSON: " << e.what();
// If parsing fails or field doesn't exist, default to false
sortedShuffle = false;
}
```
</issue_to_address>
### Comment 7
<location> `presto-native-execution/presto_cpp/main/operators/ShuffleRead.cpp:90-93` </location>
<code_context>
auto* batch = checked_pointer_cast<ShuffleRowBatch>(page.get());
const auto& rows = batch->rows();
for (const auto& row : rows) {
+ LOG(INFO) << "ShuffleRead::getOutput adding row to rows_: size=" << row.size();
+ if (row.size() >= 16) {
+ LOG(INFO) << "ShuffleRead::getOutput row data (hex): "
+ << folly::hexlify(std::string_view(row.data(), std::min(row.size(), size_t(32))));
</code_context>
<issue_to_address>
**🚨 suggestion (security):** Logging row data in getOutput may leak sensitive information.
Restrict hex logging to debug builds or redact sensitive fields to prevent exposure of confidential data.
```suggestion
#ifdef NDEBUG
// In release builds, do not log row data to avoid leaking sensitive information.
(void)row; // Suppress unused variable warning if logging is disabled.
#else
if (row.size() >= 16) {
LOG(INFO) << "ShuffleRead::getOutput row data (hex): "
<< folly::hexlify(std::string_view(row.data(), std::min(row.size(), size_t(32))));
}
#endif
```
</issue_to_address>
### Comment 8
<location> `presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp:663-672` </location>
<code_context>
+ void fuzzerTest(bool replicateNullsAndAny, size_t numPartitions) {
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding assertions for output batch sizes and row counts in fuzzerTest.
Adding assertions for output batch sizes and row counts will help ensure that no rows are lost or duplicated, particularly in cases with nulls or large partitions.
Suggested implementation:
```cpp
void fuzzerTest(bool replicateNullsAndAny, size_t numPartitions) {
// ... existing setup code ...
// Track total input rows.
size_t totalInputRows = 0;
for (const auto& inputBatch : inputBatches) {
totalInputRows += inputBatch->size();
}
// ... shuffle logic ...
// Track output batch sizes and row counts.
size_t totalOutputRows = 0;
std::vector<size_t> outputBatchSizes;
for (const auto& outputBatch : outputBatches) {
outputBatchSizes.push_back(outputBatch->size());
totalOutputRows += outputBatch->size();
}
// Assert that no rows are lost or duplicated.
ASSERT_EQ(totalOutputRows, totalInputRows) << "Total output rows should match input rows";
// Optionally, assert that batch sizes are non-zero and reasonable.
for (size_t i = 0; i < outputBatchSizes.size(); ++i) {
ASSERT_GT(outputBatchSizes[i], 0) << "Output batch " << i << " should not be empty";
}
// ... rest of the test ...
```
You may need to adjust variable names (`inputBatches`, `outputBatches`) to match those used in your test function. Place the assertions after the shuffle logic and before any cleanup or teardown code.
</issue_to_address>
### Comment 9
<location> `presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp:780` </location>
<code_context>
+ EXPECT_EQ(results->size(), 0);
}
DEBUG_ONLY_TEST_F(ShuffleTest, shuffleWriterExceptions) {
</code_context>
<issue_to_address>
**suggestion (testing):** Exception handling in shuffle writer is tested, but consider testing more error scenarios.
Consider adding tests for invalid partition numbers, oversized rows, and file system errors to improve coverage of error handling.
</issue_to_address>
### Comment 10
<location> `presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp:805` </location>
<code_context>
+DEBUG_ONLY_TEST_F(ShuffleTest, shuffleReaderExceptions) {
</code_context>
<issue_to_address>
**suggestion (testing):** Exception handling in shuffle reader is tested, but missing test for file not found or corrupted file.
Please add tests for scenarios such as missing files, corrupted shuffle files, and permission errors to verify robust exception handling.
</issue_to_address>
### Comment 11
<location> `presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp:1259-1268` </location>
<code_context>
+ for (const auto& config : testSettings) {
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a test for empty input data in shuffleWriterReader.
Adding a test for empty input will ensure the reader correctly handles cases with no data and returns no batches.
Suggested implementation:
```cpp
std::vector<TestConfig> testSettings = {
{.testName = "basic",
.dataType = DataType::BASIC,
.numPartitions = 2,
.numMapDrivers = 2,
.replicateNullsAndAny = false,
.sortedShuffle = false,
.withRowLimit = false,
.useCustomTempDir = false},
{.testName = "localShuffle",
.dataType = DataType::BASIC,
.numPartitions = 1,
.numMapDrivers = 1,
.replicateNullsAndAny = false,
.sortedShuffle = false,
.withRowLimit = false,
.useCustomTempDir = true},
// Added test for empty input data
{.testName = "emptyInput",
.dataType = DataType::BASIC,
.numPartitions = 2,
.numMapDrivers = 2,
.replicateNullsAndAny = false,
.sortedShuffle = false,
.withRowLimit = false,
.useCustomTempDir = false,
.emptyInput = true}, // Add a flag to indicate empty input
};
```
```cpp
for (const auto& config : testSettings) {
SCOPED_TRACE(config.debugString());
std::shared_ptr<exec::test::TempDirectoryPath> customTempDir;
std::string rootPath;
if (config.useCustomTempDir) {
customTempDir = exec::test::TempDirectoryPath::create();
rootPath = customTempDir->getPath();
} else {
rootPath = tempDir_->getPath();
cleanupDirectory(rootPath);
}
// If this is the empty input test, run the shuffleWriterReader with no input and check output
if (config.testName == "emptyInput") {
std::vector<RowVectorPtr> emptyInput;
auto result = shuffleWriterReader(
config.dataType,
emptyInput,
config.numPartitions,
config.numMapDrivers,
config.replicateNullsAndAny,
config.sortedShuffle,
config.withRowLimit,
rootPath);
// Expect no batches returned
ASSERT_TRUE(result.empty()) << "Expected no batches for empty input";
continue;
}
```
- You may need to update the `TestConfig` struct to include the `emptyInput` field (e.g., `bool emptyInput = false;`).
- If `shuffleWriterReader` requires additional parameters or setup for empty input, ensure those are handled.
- If the test framework uses a different way to identify test cases, adjust the conditional check accordingly.
</issue_to_address>
### Comment 12
<location> `presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp:28` </location>
<code_context>
namespace {
+using TStreamIdx = uint16_t;
+
+/// SortedFileInputStream reads sorted (key, data) pairs from a single
</code_context>
<issue_to_address>
**issue (review_instructions):** Type alias 'TStreamIdx' should use PascalCase naming convention.
The type alias 'TStreamIdx' should be named using PascalCase, e.g., 'StreamIdx', to comply with the naming convention for types.
<details>
<summary>Review instructions:</summary>
**Path patterns:** `presto-native-execution/**/*.hpp,presto-native-execution/**/*.hpp,presto-native-execution/**/*.cpp`
**Instructions:**
Use PascalCase for types (classes, structs, enums, type aliases, type template parameters) and file names.
</details>
</issue_to_address>
### Comment 13
<location> `presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp:28` </location>
<code_context>
namespace {
+using TStreamIdx = uint16_t;
+
+/// SortedFileInputStream reads sorted (key, data) pairs from a single
</code_context>
<issue_to_address>
**issue (review_instructions):** Type alias 'TStreamIdx' should use PascalCase naming convention for types.
Please rename 'TStreamIdx' to 'StreamIdx' to follow the PascalCase convention for type aliases.
<details>
<summary>Review instructions:</summary>
**Path patterns:** `presto-native-execution/**/*.hpp,presto-native-execution/**/*.cpp`
**Instructions:**
Use PascalCase for types (classes, structs, enums, type aliases, type template parameters) and file names.
</details>
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp
Outdated
Show resolved
Hide resolved
presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp
Outdated
Show resolved
Hide resolved
Summary: Pull Request resolved: prestodb#26643 Completely replace TestShuffleWriter/Reader with local shuffle, and refactor the ShuffleTest class Differential Revision: D87160791
1a779f7 to
3784486
Compare
Summary: Completely replace TestShuffleWriter/Reader with local shuffle, and refactor the ShuffleTest class Differential Revision: D87160791
3784486 to
cf06a96
Compare
Summary: Completely replace TestShuffleWriter/Reader with local shuffle, and refactor the ShuffleTest class Differential Revision: D87160791
cf06a96 to
286f0cd
Compare
Summary: Completely replace TestShuffleWriter/Reader with local shuffle, and refactor the ShuffleTest class Differential Revision: D87160791
286f0cd to
667db80
Compare
| bool sortedShuffle = false; | ||
| try { | ||
| const auto jsonWriteInfo = json::parse(serializedStr); | ||
| if (jsonWriteInfo.contains("sortedShuffle")) { | ||
| jsonWriteInfo.at("sortedShuffle").get_to(sortedShuffle); | ||
| } | ||
| } catch (const std::exception& /*e*/) { | ||
| // If parsing fails or field doesn't exist, default to false | ||
| sortedShuffle = false; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we do this?shouldn't it be an additional field in the struct and in the serde itself?
| bool sortedShuffle = false; | ||
| try { | ||
| const auto jsonReadInfo = json::parse(serializedStr); | ||
| if (jsonReadInfo.contains("sortedShuffle")) { | ||
| jsonReadInfo.at("sortedShuffle").get_to(sortedShuffle); | ||
| } | ||
| } catch (const std::exception& /*e*/) { | ||
| // If parsing fails or field doesn't exist, default to false | ||
| sortedShuffle = false; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Summary: Completely replace TestShuffleWriter/Reader with local shuffle, and refactor the ShuffleTest class Differential Revision: D87160791
667db80 to
347896e
Compare
Summary: Implement sorted shuffle k-way merge for LocalShuffleReader, when it's sortedShuffle. Added k-way merge support using TreeOfLosers to efficiently merge multiple sorted shuffle files. The reader streams data from sorted files and returns merged results in sorted order. Reviewed By: tanjialiang Differential Revision: D86888221
Summary: Completely replace TestShuffleWriter/Reader with local shuffle, and refactor the ShuffleTest class Differential Revision: D87160791
347896e to
7946693
Compare
Summary: Completely replace TestShuffleWriter/Reader with local shuffle, and refactor the ShuffleTest class
Differential Revision: D87160791