Skip to content

Conversation

@duxiao1212
Copy link
Contributor

Summary: Completely replace TestShuffleWriter/Reader with local shuffle, and refactor the ShuffleTest class

Differential Revision: D87160791

@duxiao1212 duxiao1212 requested review from a team as code owners November 18, 2025 00:32
@prestodb-ci prestodb-ci added the from:Meta PR from Meta label Nov 18, 2025
@linux-foundation-easycla
Copy link

linux-foundation-easycla bot commented Nov 18, 2025

CLA Signed

The committers listed above are authorized under a signed CLA.

@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Nov 18, 2025

Reviewer's Guide

Refactors the ShuffleTest suite to remove custom in-memory TestShuffleWriter/Reader and leverage LocalPersistentShuffleFactory with on-disk shuffle; adds JSON-based helpers and temp-directory management in tests; extends LocalShuffle Reader/Writer with sorted-shuffle support via a k-way merge stream; and enhances logging, error-injection hooks, and factory parsing for sortedShuffle.

Sequence diagram for sorted shuffle reading with k-way merge

sequenceDiagram
    participant Factory as LocalPersistentShuffleFactory
    participant Reader as LocalShuffleReader
    participant Merge as TreeOfLosers
    participant Stream as SortedFileInputStream
    Factory->>Reader: createReader(serializedStr, ...)
    Reader->>Reader: initialize()
    Reader->>Stream: create SortedFileInputStream for each file
    Stream->>Merge: add to k-way merge
    Reader->>Merge: setup TreeOfLosers
    Reader->>Reader: nextSorted(maxBytes)
    loop for each batch
        Merge->>Stream: next()
        Stream->>Reader: provide currentKey/currentData
        Reader->>Reader: copy data to batch
        Stream->>Stream: next()
    end
Loading

ER diagram for LocalShuffleWriteInfo and LocalShuffleReadInfo JSON deserialization

erDiagram
    LOCALSHUFFLEWRITEINFO {
        string rootPath
        string queryId
        string shuffleId
        int numPartitions
    }
    LOCALSHUFFLEREADINFO {
        string rootPath
        string queryId
        string[] partitionIds
    }
    LOCALSHUFFLEWRITEINFO ||--o{ LOCALSHUFFLEREADINFO : "related to"
    LOCALSHUFFLEWRITEINFO }|..|{ JSON : "deserialized from"
    LOCALSHUFFLEREADINFO }|..|{ JSON : "deserialized from"
Loading

Class diagram for LocalShuffleReader and SortedFileInputStream refactor

classDiagram
    class LocalShuffleReader {
        +LocalShuffleReader(rootPath, queryId, partitionIds, sortedShuffle, pool)
        +void initialize()
        +folly::SemiFuture<std::vector<std::unique_ptr<ReadBatch>>> next(uint64_t maxBytes)
        +void noMoreData(bool success)
        +std::vector<std::string> getReadPartitionFiles() const
        +std::vector<std::unique_ptr<ReadBatch>> nextSorted(uint64_t maxBytes)
        +std::vector<std::unique_ptr<ReadBatch>> nextUnsorted(uint64_t maxBytes)
        -std::unique_ptr<velox::TreeOfLosers<velox::MergeStream, uint16_t>> merge_
        -bool initialized_
    }
    class SortedFileInputStream {
        +SortedFileInputStream(filePath, streamIdx, pool, bufferSize)
        +bool next()
        +std::string_view currentKey() const
        +std::string_view currentData() const
        +bool hasData() const
        +bool operator<(const velox::MergeStream& other) const
        -std::string_view nextStringView(TRowSize size, std::string& storage)
        -TStreamIdx streamIdx_
        -std::string_view currentKey_
        -std::string_view currentData_
        -std::string keyStorage_
        -std::string dataStorage_
    }
    class velox_MergeStream {
    }
    class velox_common_FileInputStream {
    }
    class velox_TreeOfLosers_MergeStream_uint16_t {
    }
    SortedFileInputStream --|> velox_common_FileInputStream
    SortedFileInputStream --|> velox_MergeStream
    LocalShuffleReader --> SortedFileInputStream : uses for sorted shuffle
    LocalShuffleReader --> velox_TreeOfLosers_MergeStream_uint16_t : uses for k-way merge
Loading

Class diagram for LocalPersistentShuffleFactory changes

classDiagram
    class LocalPersistentShuffleFactory {
        +std::shared_ptr<ShuffleReader> createReader(serializedStr, partition, pool)
        +std::shared_ptr<ShuffleWriter> createWriter(serializedStr, pool)
    }
    class LocalShuffleReader {
    }
    class LocalShuffleWriter {
    }
    LocalPersistentShuffleFactory --> LocalShuffleReader : creates
    LocalPersistentShuffleFactory --> LocalShuffleWriter : creates
Loading

File-Level Changes

Change Details Files
Refactor ShuffleTest to use local shuffle factory
  • Removed TestShuffleWriter/Reader/TestShuffleFactory classes
  • Updated SetUp/TearDown to register LocalPersistentShuffleFactory and use temp directories
  • Replaced shuffleName_ logic and runShuffleTest parameters to use localShuffleWriteInfo/readInfo
  • Cleaned up shuffle files between test scenarios
presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp
Add test helper utilities and JSON info builders
  • Introduced makeTaskId, countNulls, cleanupDirectory, createSerdeLayoutType in test namespace
  • Added localShuffleWriteInfo and localShuffleReadInfo JSON generators
  • Unified duplicate logic for deserialization, copying, and directory cleanup
presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp
Implement sorted-shuffle streaming merge in LocalShuffle
  • Added SortedFileInputStream implementing MergeStream
  • Integrated TreeOfLosers k-way merge in LocalShuffleReader::nextSorted
  • Split reader into initialize(), nextSorted() and nextUnsorted() paths
  • Refactored extractRowData and extractRowMetadata to support lexicographical three-way compare
presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp
presto-native-execution/presto_cpp/main/operators/LocalShuffle.h
Enhance LocalShuffle factories, logging and test hooks
  • Parse sortedShuffle flag from JSON in LocalPersistentShuffleFactory
  • Added LOG(INFO) statements to writer/reader collect/next methods for debugging
  • Inserted TestValue injection points for exception testing
  • Replaced std::sort and iota with boost range algorithms and lexicographical_compare_three_way
presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp
presto-native-execution/presto_cpp/main/operators/LocalShuffle.h
presto-native-execution/presto_cpp/main/operators/ShuffleRead.cpp

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

  • The ShuffleTest class contains many near-duplicate loops over test configurations; consider using GTest parameterized tests or extracting helpers to reduce duplication and improve maintainability.
  • In LocalShuffleWriter::collect and LocalShuffleReader::next, the widespread use of LOG(INFO) on hot paths may clutter test and production logs—consider using a debug-level log or guarding these statements behind a flag.
  • The new SortedFileInputStream and loser-tree merge logic reimplements functionality that may exist elsewhere in Velox; consider refactoring to reuse existing streaming or merge utilities and adding focused unit tests for edge cases (empty partitions, very large keys).
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The ShuffleTest class contains many near-duplicate loops over test configurations; consider using GTest parameterized tests or extracting helpers to reduce duplication and improve maintainability.
- In LocalShuffleWriter::collect and LocalShuffleReader::next, the widespread use of LOG(INFO) on hot paths may clutter test and production logs—consider using a debug-level log or guarding these statements behind a flag.
- The new SortedFileInputStream and loser-tree merge logic reimplements functionality that may exist elsewhere in Velox; consider refactoring to reuse existing streaming or merge utilities and adding focused unit tests for edge cases (empty partitions, very large keys).

## Individual Comments

### Comment 1
<location> `presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp:356-359` </location>
<code_context>
     int32_t partition,
     std::string_view key,
     std::string_view data) {
+  LOG(INFO) << "LocalShuffleWriter::collect START partition=" << partition
+            << " sortedShuffle=" << sortedShuffle_
+            << " keySize=" << key.size()
</code_context>

<issue_to_address>
**suggestion (performance):** Extensive logging in collect() may impact performance in production.

Consider reducing the log level or making logging conditional to avoid overhead and excessive log volume in production.

```suggestion
  VLOG(1) << "LocalShuffleWriter::collect START partition=" << partition
          << " sortedShuffle=" << sortedShuffle_
          << " keySize=" << key.size()
          << " dataSize=" << data.size();
```
</issue_to_address>

### Comment 2
<location> `presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp:372-376` </location>
<code_context>
+      "facebook::presto::operators::LocalShuffleWriter::collect", this);
+
+  // Log the actual data content for debugging
+  if (data.size() >= 16) {
+    LOG(INFO) << "LocalShuffleWriter::collect data first 16 bytes (hex): "
+              << folly::hexlify(std::string_view(data.data(), std::min(data.size(), size_t(16))));
+  }
</code_context>

<issue_to_address>
**🚨 suggestion (security):** Logging raw data in hex may expose sensitive information.

In production, restrict or redact this logging to avoid leaking sensitive data.

```suggestion
#if !defined(NDEBUG)
  // Log the actual data content for debugging (only in debug builds)
  if (data.size() >= 16) {
    LOG(INFO) << "LocalShuffleWriter::collect data first 16 bytes (hex): "
              << folly::hexlify(std::string_view(data.data(), std::min(data.size(), size_t(16))));
  }
#else
  // In production, do not log raw data to avoid leaking sensitive information.
  LOG(INFO) << "LocalShuffleWriter::collect data logging is redacted in production.";
#endif
```
</issue_to_address>

### Comment 3
<location> `presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp:428` </location>
<code_context>
-  if (readPartitionFiles_.empty()) {
-    readPartitionFiles_ = getReadPartitionFiles();
+void LocalShuffleReader::initialize() {
+  LOG(INFO) << "LocalShuffleReader::initialize START sortedShuffle=" << sortedShuffle_;
+  VELOX_CHECK(!initialized_, "LocalShuffleReader already initialized");
+
</code_context>

<issue_to_address>
**suggestion:** Initialization is now required before next(), but not enforced at construction.

Consider documenting or enforcing initialization at the API level to prevent misuse, as the requirement is currently only checked at runtime.

Suggested implementation:

```cpp
LocalShuffleReader::LocalShuffleReader(
  fileSystem_ = velox::filesystems::getFileSystem(rootPath_, nullptr);
  // Automatically initialize upon construction to enforce API usage.
  initialize();
}

```

```cpp
void LocalShuffleReader::initialize() {
  LOG(INFO) << "LocalShuffleReader::initialize START sortedShuffle=" << sortedShuffle_;
  VELOX_CHECK(!initialized_, "LocalShuffleReader already initialized");
  initialized_ = true;
}

#include "velox/common/file/FileInputStream.h"

```

```cpp
namespace facebook::presto::operators {

// Document that initialization is enforced at construction and next() requires it.
/*
 * LocalShuffleReader
 *
 * This class enforces initialization at construction. The next() method requires
 * that the reader is initialized, which is now guaranteed by the constructor.
 */

```

If `initialize()` must be called with parameters or in a specific order, consider making it private and only accessible from the constructor. If you want to allow deferred initialization, add a VELOX_CHECK(initialized_, ...) at the start of `next()` to enforce the requirement at the API level.

If the header file (.h) for LocalShuffleReader exists, update its documentation and method visibility accordingly.
</issue_to_address>

### Comment 4
<location> `presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp:475` </location>
<code_context>
+  std::vector<std::string_view> rows;
+  uint64_t bufferUsed = 0;
+
+  while (auto* stream = merge_->next()) {
+    auto* reader = dynamic_cast<SortedFileInputStream*>(stream);
+    const auto data = reader->currentData();
</code_context>

<issue_to_address>
**suggestion (performance):** Potential for large memory usage if many rows are merged in nextSorted.

If a single row exceeds maxBytes, the buffer grows beyond the intended limit, which may cause excessive memory consumption. Adding explicit checks or limits for row size could help prevent this.
</issue_to_address>

### Comment 5
<location> `presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp:497-501` </location>
<code_context>
+
+    // Copy data as-is without size prefix
+    char* writePos = batchBuffer->asMutable<char>() + bufferUsed;
+    if (!data.empty()) {
+      memcpy(writePos, data.data(), data.size());
+    }
+
</code_context>

<issue_to_address>
**suggestion (bug_risk):** No bounds check on memcpy for data copy.

Consider adding an explicit bounds check before memcpy to ensure batchBuffer has sufficient space at writePos, preventing potential buffer overruns from logic errors.

```suggestion
    // Copy data as-is without size prefix
    char* writePos = batchBuffer->asMutable<char>() + bufferUsed;
    if (!data.empty()) {
      // Bounds check to prevent buffer overrun
      if (bufferUsed + data.size() > batchBuffer->capacity()) {
        throw std::runtime_error("Buffer overrun: not enough space in batchBuffer for data copy.");
      }
      memcpy(writePos, data.data(), data.size());
    }
```
</issue_to_address>

### Comment 6
<location> `presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp:623-631` </location>
<code_context>
-  return std::make_shared<LocalShuffleReader>(
+  // Check if sortedShuffle field is present in the JSON
+  bool sortedShuffle = false;
+  try {
+    const auto jsonReadInfo = json::parse(serializedStr);
+    if (jsonReadInfo.contains("sortedShuffle")) {
+      jsonReadInfo.at("sortedShuffle").get_to(sortedShuffle);
+    }
+  } catch (const std::exception& /*e*/) {
+    // If parsing fails or field doesn't exist, default to false
+    sortedShuffle = false;
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Silent catch of exceptions may hide deserialization errors.

Instead of silently defaulting to false, log the exception or add error handling to aid debugging.

```suggestion
  try {
    const auto jsonReadInfo = json::parse(serializedStr);
    if (jsonReadInfo.contains("sortedShuffle")) {
      jsonReadInfo.at("sortedShuffle").get_to(sortedShuffle);
    }
  } catch (const std::exception& e) {
    LOG(ERROR) << "Error parsing sortedShuffle from JSON: " << e.what();
    // If parsing fails or field doesn't exist, default to false
    sortedShuffle = false;
  }
```
</issue_to_address>

### Comment 7
<location> `presto-native-execution/presto_cpp/main/operators/ShuffleRead.cpp:90-93` </location>
<code_context>
       auto* batch = checked_pointer_cast<ShuffleRowBatch>(page.get());
       const auto& rows = batch->rows();
       for (const auto& row : rows) {
+        LOG(INFO) << "ShuffleRead::getOutput adding row to rows_: size=" << row.size();
+        if (row.size() >= 16) {
+          LOG(INFO) << "ShuffleRead::getOutput row data (hex): "
+                    << folly::hexlify(std::string_view(row.data(), std::min(row.size(), size_t(32))));
</code_context>

<issue_to_address>
**🚨 suggestion (security):** Logging row data in getOutput may leak sensitive information.

Restrict hex logging to debug builds or redact sensitive fields to prevent exposure of confidential data.

```suggestion
#ifdef NDEBUG
        // In release builds, do not log row data to avoid leaking sensitive information.
        (void)row; // Suppress unused variable warning if logging is disabled.
#else
        if (row.size() >= 16) {
          LOG(INFO) << "ShuffleRead::getOutput row data (hex): "
                    << folly::hexlify(std::string_view(row.data(), std::min(row.size(), size_t(32))));
        }
#endif
```
</issue_to_address>

### Comment 8
<location> `presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp:663-672` </location>
<code_context>
+  void fuzzerTest(bool replicateNullsAndAny, size_t numPartitions) {
</code_context>

<issue_to_address>
**suggestion (testing):** Consider adding assertions for output batch sizes and row counts in fuzzerTest.

Adding assertions for output batch sizes and row counts will help ensure that no rows are lost or duplicated, particularly in cases with nulls or large partitions.

Suggested implementation:

```cpp
  void fuzzerTest(bool replicateNullsAndAny, size_t numPartitions) {
    // ... existing setup code ...

    // Track total input rows.
    size_t totalInputRows = 0;
    for (const auto& inputBatch : inputBatches) {
      totalInputRows += inputBatch->size();
    }

    // ... shuffle logic ...

    // Track output batch sizes and row counts.
    size_t totalOutputRows = 0;
    std::vector<size_t> outputBatchSizes;
    for (const auto& outputBatch : outputBatches) {
      outputBatchSizes.push_back(outputBatch->size());
      totalOutputRows += outputBatch->size();
    }

    // Assert that no rows are lost or duplicated.
    ASSERT_EQ(totalOutputRows, totalInputRows) << "Total output rows should match input rows";

    // Optionally, assert that batch sizes are non-zero and reasonable.
    for (size_t i = 0; i < outputBatchSizes.size(); ++i) {
      ASSERT_GT(outputBatchSizes[i], 0) << "Output batch " << i << " should not be empty";
    }

    // ... rest of the test ...

```

You may need to adjust variable names (`inputBatches`, `outputBatches`) to match those used in your test function. Place the assertions after the shuffle logic and before any cleanup or teardown code.
</issue_to_address>

### Comment 9
<location> `presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp:780` </location>
<code_context>
+  EXPECT_EQ(results->size(), 0);
 }

 DEBUG_ONLY_TEST_F(ShuffleTest, shuffleWriterExceptions) {
</code_context>

<issue_to_address>
**suggestion (testing):** Exception handling in shuffle writer is tested, but consider testing more error scenarios.

Consider adding tests for invalid partition numbers, oversized rows, and file system errors to improve coverage of error handling.
</issue_to_address>

### Comment 10
<location> `presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp:805` </location>
<code_context>
+DEBUG_ONLY_TEST_F(ShuffleTest, shuffleReaderExceptions) {
</code_context>

<issue_to_address>
**suggestion (testing):** Exception handling in shuffle reader is tested, but missing test for file not found or corrupted file.

Please add tests for scenarios such as missing files, corrupted shuffle files, and permission errors to verify robust exception handling.
</issue_to_address>

### Comment 11
<location> `presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp:1259-1268` </location>
<code_context>
+  for (const auto& config : testSettings) {
</code_context>

<issue_to_address>
**suggestion (testing):** Consider adding a test for empty input data in shuffleWriterReader.

Adding a test for empty input will ensure the reader correctly handles cases with no data and returns no batches.

Suggested implementation:

```cpp
  std::vector<TestConfig> testSettings = {
      {.testName = "basic",
       .dataType = DataType::BASIC,
       .numPartitions = 2,
       .numMapDrivers = 2,
       .replicateNullsAndAny = false,
       .sortedShuffle = false,
       .withRowLimit = false,
       .useCustomTempDir = false},
      {.testName = "localShuffle",
       .dataType = DataType::BASIC,
       .numPartitions = 1,
       .numMapDrivers = 1,
       .replicateNullsAndAny = false,
       .sortedShuffle = false,
       .withRowLimit = false,
       .useCustomTempDir = true},
      // Added test for empty input data
      {.testName = "emptyInput",
       .dataType = DataType::BASIC,
       .numPartitions = 2,
       .numMapDrivers = 2,
       .replicateNullsAndAny = false,
       .sortedShuffle = false,
       .withRowLimit = false,
       .useCustomTempDir = false,
       .emptyInput = true}, // Add a flag to indicate empty input
  };

```

```cpp
  for (const auto& config : testSettings) {
    SCOPED_TRACE(config.debugString());

    std::shared_ptr<exec::test::TempDirectoryPath> customTempDir;
    std::string rootPath;
    if (config.useCustomTempDir) {
      customTempDir = exec::test::TempDirectoryPath::create();
      rootPath = customTempDir->getPath();
    } else {
      rootPath = tempDir_->getPath();
      cleanupDirectory(rootPath);
    }

    // If this is the empty input test, run the shuffleWriterReader with no input and check output
    if (config.testName == "emptyInput") {
      std::vector<RowVectorPtr> emptyInput;
      auto result = shuffleWriterReader(
          config.dataType,
          emptyInput,
          config.numPartitions,
          config.numMapDrivers,
          config.replicateNullsAndAny,
          config.sortedShuffle,
          config.withRowLimit,
          rootPath);

      // Expect no batches returned
      ASSERT_TRUE(result.empty()) << "Expected no batches for empty input";
      continue;
    }

```

- You may need to update the `TestConfig` struct to include the `emptyInput` field (e.g., `bool emptyInput = false;`).
- If `shuffleWriterReader` requires additional parameters or setup for empty input, ensure those are handled.
- If the test framework uses a different way to identify test cases, adjust the conditional check accordingly.
</issue_to_address>

### Comment 12
<location> `presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp:28` </location>
<code_context>

 namespace {

+using TStreamIdx = uint16_t;
+
+/// SortedFileInputStream reads sorted (key, data) pairs from a single
</code_context>

<issue_to_address>
**issue (review_instructions):** Type alias 'TStreamIdx' should use PascalCase naming convention.

The type alias 'TStreamIdx' should be named using PascalCase, e.g., 'StreamIdx', to comply with the naming convention for types.

<details>
<summary>Review instructions:</summary>

**Path patterns:** `presto-native-execution/**/*.hpp,presto-native-execution/**/*.hpp,presto-native-execution/**/*.cpp`

**Instructions:**
Use PascalCase for types (classes, structs, enums, type aliases, type template parameters) and file names.

</details>
</issue_to_address>

### Comment 13
<location> `presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp:28` </location>
<code_context>

 namespace {

+using TStreamIdx = uint16_t;
+
+/// SortedFileInputStream reads sorted (key, data) pairs from a single
</code_context>

<issue_to_address>
**issue (review_instructions):** Type alias 'TStreamIdx' should use PascalCase naming convention for types.

Please rename 'TStreamIdx' to 'StreamIdx' to follow the PascalCase convention for type aliases.

<details>
<summary>Review instructions:</summary>

**Path patterns:** `presto-native-execution/**/*.hpp,presto-native-execution/**/*.cpp`

**Instructions:**
Use PascalCase for types (classes, structs, enums, type aliases, type template parameters) and file names.

</details>
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

duxiao1212 added a commit to duxiao1212/presto that referenced this pull request Nov 18, 2025
Summary:
Pull Request resolved: prestodb#26643

Completely replace TestShuffleWriter/Reader with local shuffle, and refactor the ShuffleTest class

Differential Revision: D87160791
duxiao1212 added a commit to duxiao1212/presto that referenced this pull request Nov 18, 2025
Summary:

Completely replace TestShuffleWriter/Reader with local shuffle, and refactor the ShuffleTest class

Differential Revision: D87160791
duxiao1212 added a commit to duxiao1212/presto that referenced this pull request Nov 18, 2025
Summary:

Completely replace TestShuffleWriter/Reader with local shuffle, and refactor the ShuffleTest class

Differential Revision: D87160791
duxiao1212 added a commit to duxiao1212/presto that referenced this pull request Nov 18, 2025
Summary:

Completely replace TestShuffleWriter/Reader with local shuffle, and refactor the ShuffleTest class

Differential Revision: D87160791
Comment on lines 608 to 613
bool sortedShuffle = false;
try {
const auto jsonWriteInfo = json::parse(serializedStr);
if (jsonWriteInfo.contains("sortedShuffle")) {
jsonWriteInfo.at("sortedShuffle").get_to(sortedShuffle);
}
} catch (const std::exception& /*e*/) {
// If parsing fails or field doesn't exist, default to false
sortedShuffle = false;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we do this?shouldn't it be an additional field in the struct and in the serde itself?

Comment on lines 579 to 584
bool sortedShuffle = false;
try {
const auto jsonReadInfo = json::parse(serializedStr);
if (jsonReadInfo.contains("sortedShuffle")) {
jsonReadInfo.at("sortedShuffle").get_to(sortedShuffle);
}
} catch (const std::exception& /*e*/) {
// If parsing fails or field doesn't exist, default to false
sortedShuffle = false;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

duxiao1212 added a commit to duxiao1212/presto that referenced this pull request Nov 18, 2025
Summary:

Completely replace TestShuffleWriter/Reader with local shuffle, and refactor the ShuffleTest class

Differential Revision: D87160791
Summary:

Implement sorted shuffle k-way merge for LocalShuffleReader, when it's sortedShuffle.

Added k-way merge support using TreeOfLosers to efficiently merge multiple sorted shuffle files. The reader streams data from sorted files and returns merged results in sorted order.

Reviewed By: tanjialiang

Differential Revision: D86888221
Summary:

Completely replace TestShuffleWriter/Reader with local shuffle, and refactor the ShuffleTest class

Differential Revision: D87160791
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants