Skip to content

Conversation

@scovich
Copy link
Collaborator

@scovich scovich commented Oct 29, 2025

What changes are proposed in this pull request?

The various default engine handler trait implementations mixed natively async I/O with blocking queue logic, and inconsistently so (each takes a slightly different approach, with different buffering and prefetching).

Factor out the stream -> iterator transformation to a helper method that all the handlers can use, allowing a separation of concerns. Also, eliminate the need for channels with an impl Iterator whose next method blocks directly on the stream's next value. A buffered stream can prefetch multiple values concurrently while next is blocked, improving overall throughput.

Breaking changes

FileStream::new_async_read_iterator was removed (not sure why that internal helper was ever public??)

How was this change tested?

Existing unit tests.

@codecov
Copy link

codecov bot commented Oct 29, 2025

Codecov Report

❌ Patch coverage is 82.72727% with 38 lines in your changes missing coverage. Please review.
✅ Project coverage is 85.06%. Comparing base (70143ca) to head (d6d28f0).
⚠️ Report is 25 commits behind head on main.

Files with missing lines Patch % Lines
kernel/src/engine/default/filesystem.rs 81.48% 5 Missing and 10 partials ⚠️
kernel/src/engine/default/json.rs 85.39% 5 Missing and 8 partials ⚠️
kernel/src/engine/default/parquet.rs 75.00% 6 Missing and 2 partials ⚠️
kernel/src/engine/default/mod.rs 88.88% 0 Missing and 2 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1435      +/-   ##
==========================================
+ Coverage   84.85%   85.06%   +0.21%     
==========================================
  Files         119      120       +1     
  Lines       30961    31928     +967     
  Branches    30961    31928     +967     
==========================================
+ Hits        26271    27160     +889     
- Misses       3408     3445      +37     
- Partials     1282     1323      +41     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@github-actions github-actions bot added the breaking-change Change that require a major version bump label Oct 29, 2025
@scovich scovich removed the breaking-change Change that require a major version bump label Oct 29, 2025
@github-actions github-actions bot added the breaking-change Change that require a major version bump label Oct 29, 2025
@scovich scovich force-pushed the async-handler-bridge branch from 3a80417 to 90a7e5e Compare October 30, 2025 06:14
// Unfortunately, `Path` provides no easy way to check whether a name is directory-like,
// because it strips trailing /, so we're reduced to manually checking the original URL.
let offset = Path::from_url_path(path.path())?;
let prefix = if path.path().ends_with('/') {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation change -- suggest to ignore whitespace while reviewing

Comment on lines +64 to +65
// Move the stream into the future so we can block on it.
let mut stream = self.stream.take()?;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the magic. By passing the owned stream through the future, we satisfy the 'static lifetime constraint of block_on and we no longer need a channel.

Comment on lines +156 to +162
let future = read_json_files_impl(
self.store.clone(),
files.to_vec(),
physical_schema,
predicate,
self.batch_size,
self.buffer_size,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The future returned by this async impl imposes 'static lifetime constraint on everything passed to it. So we cannot pass self and must instead pass owned copies of everything the future will consume.

Copy link
Collaborator

@nicklan nicklan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems great, and simplifies things significantly.

it would be nice to be able to benchmark it, but I think we first need to get rid of FileStream to make any benchmarks worthwhile. I think this should make that even easier too, cc @zachschuermann

let mut stream = self.stream.take()?;
let (item, stream) = self
.task_executor
.block_on(async move { (stream.next().await, stream) });
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably worth noting that this is the point where background async processing can occur, because next() on a Buffered stream will spawn off futures up to the size of the buffer which can execute in parallel, and the Buffered stream will queue up responses for us.

It's neat, but took me some time to unravel :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added code commentary to explain that.

@nicklan nicklan removed the request for review from zachschuermann November 12, 2025 21:37
Copy link
Collaborator

@nicklan nicklan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, would just request a comment explaining where the async tasks get kicked off

@scovich
Copy link
Collaborator Author

scovich commented Nov 13, 2025

lgtm, would just request a comment explaining where the async tasks get kicked off

I added that (per your request) in my latest commit 51cdc45?

/// Each call to `next` on the iterator translates to a blocking `stream.next()` call, using the
/// provided `task_executor`. Buffered streams allow concurrency in the form of prefetching, because
/// every call to `stream.next()` leaves an empty slot (out of N buffer slots) that the stream
/// immediately attempts to fill by launching another future that can make progress in the
/// background while we block on and consume each of the N-1 entries that precede it.

Clarified documentation for stream_future_to_iter function.
@scovich
Copy link
Collaborator Author

scovich commented Nov 13, 2025

Actually, in retrospect I think you meant something different. How's this?

/// This method performs the initial blocking call to extract the stream from the future, and each
/// subsequent call to `next` on the iterator translates to a blocking `stream.next()` call, using
/// the provided `task_executor`. Buffered streams allow concurrency in the form of prefetching,
/// because that initial call will attempt to populate the N buffer slots; every call to
/// `stream.next()` leaves an empty slot (out of N buffer slots) that the stream immediately
/// attempts to fill by launching another future that can make progress in the background while we
/// block on and consume each of the N-1 entries that precede it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

breaking-change Change that require a major version bump

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants