-
Notifications
You must be signed in to change notification settings - Fork 122
refactor: separate async handler logic from sync bridge logic #1435
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1435 +/- ##
==========================================
+ Coverage 84.85% 85.06% +0.21%
==========================================
Files 119 120 +1
Lines 30961 31928 +967
Branches 30961 31928 +967
==========================================
+ Hits 26271 27160 +889
- Misses 3408 3445 +37
- Partials 1282 1323 +41 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
3a80417 to
90a7e5e
Compare
| // Unfortunately, `Path` provides no easy way to check whether a name is directory-like, | ||
| // because it strips trailing /, so we're reduced to manually checking the original URL. | ||
| let offset = Path::from_url_path(path.path())?; | ||
| let prefix = if path.path().ends_with('/') { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indentation change -- suggest to ignore whitespace while reviewing
| // Move the stream into the future so we can block on it. | ||
| let mut stream = self.stream.take()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the magic. By passing the owned stream through the future, we satisfy the 'static lifetime constraint of block_on and we no longer need a channel.
| let future = read_json_files_impl( | ||
| self.store.clone(), | ||
| files.to_vec(), | ||
| physical_schema, | ||
| predicate, | ||
| self.batch_size, | ||
| self.buffer_size, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The future returned by this async impl imposes 'static lifetime constraint on everything passed to it. So we cannot pass self and must instead pass owned copies of everything the future will consume.
nicklan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems great, and simplifies things significantly.
it would be nice to be able to benchmark it, but I think we first need to get rid of FileStream to make any benchmarks worthwhile. I think this should make that even easier too, cc @zachschuermann
| let mut stream = self.stream.take()?; | ||
| let (item, stream) = self | ||
| .task_executor | ||
| .block_on(async move { (stream.next().await, stream) }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably worth noting that this is the point where background async processing can occur, because next() on a Buffered stream will spawn off futures up to the size of the buffer which can execute in parallel, and the Buffered stream will queue up responses for us.
It's neat, but took me some time to unravel :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added code commentary to explain that.
nicklan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, would just request a comment explaining where the async tasks get kicked off
I added that (per your request) in my latest commit 51cdc45? /// Each call to `next` on the iterator translates to a blocking `stream.next()` call, using the
/// provided `task_executor`. Buffered streams allow concurrency in the form of prefetching, because
/// every call to `stream.next()` leaves an empty slot (out of N buffer slots) that the stream
/// immediately attempts to fill by launching another future that can make progress in the
/// background while we block on and consume each of the N-1 entries that precede it.
|
Clarified documentation for stream_future_to_iter function.
|
Actually, in retrospect I think you meant something different. How's this? /// This method performs the initial blocking call to extract the stream from the future, and each
/// subsequent call to `next` on the iterator translates to a blocking `stream.next()` call, using
/// the provided `task_executor`. Buffered streams allow concurrency in the form of prefetching,
/// because that initial call will attempt to populate the N buffer slots; every call to
/// `stream.next()` leaves an empty slot (out of N buffer slots) that the stream immediately
/// attempts to fill by launching another future that can make progress in the background while we
/// block on and consume each of the N-1 entries that precede it.
|
What changes are proposed in this pull request?
The various default engine handler trait implementations mixed natively async I/O with blocking queue logic, and inconsistently so (each takes a slightly different approach, with different buffering and prefetching).
Factor out the stream -> iterator transformation to a helper method that all the handlers can use, allowing a separation of concerns. Also, eliminate the need for channels with an impl Iterator whose
nextmethod blocks directly on the stream's next value. A buffered stream can prefetch multiple values concurrently whilenextis blocked, improving overall throughput.Breaking changes
FileStream::new_async_read_iteratorwas removed (not sure why that internal helper was ever public??)How was this change tested?
Existing unit tests.