-
Notifications
You must be signed in to change notification settings - Fork 5.5k
fix: Make scheduledFuture thread-safe in HttpNativeExecutionTaskResultFetcher #26649
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Reviewer's guide (collapsed on small PRs)Reviewer's GuideThis PR makes scheduledFuture access in HttpNativeExecutionTaskResultFetcher thread-safe by synchronizing core methods, removes the unused completed flag, and cleans up redundant completion logic in result fetching. Class diagram for updated HttpNativeExecutionTaskResultFetcher thread safetyclassDiagram
class HttpNativeExecutionTaskResultFetcher {
-Object taskHasResult
-AtomicReference<Throwable> lastException
-ScheduledFuture<?> scheduledFuture
-long token
+synchronized void start()
+synchronized void stop(boolean success)
+boolean hasPage()
+synchronized void throwIfFailed()
-void doGetResults()
+synchronized void onSuccess(PageBufferClient.PagesResponse pagesResponse)
}
class ScheduledFuture {
}
HttpNativeExecutionTaskResultFetcher --> ScheduledFuture
class AtomicReference {
}
HttpNativeExecutionTaskResultFetcher --> AtomicReference
class PageBufferClient {
}
class PagesResponse {
}
HttpNativeExecutionTaskResultFetcher --> PagesResponse
PageBufferClient --> PagesResponse
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/HttpNativeExecutionTaskResultFetcher.java:133` </location>
<code_context>
}
- private void throwIfFailed()
+ private synchronized void throwIfFailed()
{
if (scheduledFuture != null && scheduledFuture.isCancelled() && lastException.get() != null) {
</code_context>
<issue_to_address>
**suggestion:** Synchronizing 'throwIfFailed' may be unnecessary if only reading atomic references.
As 'lastException' is an AtomicReference and only reads are performed, you may be able to remove synchronization unless specific ordering is required.
```suggestion
private void throwIfFailed()
```
</issue_to_address>
### Comment 2
<location> `presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/HttpNativeExecutionTaskResultFetcher.java:158` </location>
<code_context>
}
- private void onSuccess(PageBufferClient.PagesResponse pagesResponse)
+ private synchronized void onSuccess(PageBufferClient.PagesResponse pagesResponse)
{
List<SerializedPage> pages = pagesResponse.getPages();
</code_context>
<issue_to_address>
**suggestion (performance):** Synchronizing 'onSuccess' may block other operations; review for performance impact.
If 'onSuccess' is invoked often or handles large batches, method-level synchronization may cause delays. Evaluate if finer-grained synchronization is possible.
Suggested implementation:
```java
private void onSuccess(PageBufferClient.PagesResponse pagesResponse)
{
List<SerializedPage> pages = pagesResponse.getPages();
long bytes = 0;
// Synchronize only the critical section that modifies shared state
synchronized (this) {
token = nextToken;
}
if (pagesResponse.isClientComplete()) {
workerClient.abortResultsAsync(taskId);
synchronized (this) {
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
}
}
}
```
You should review all shared mutable state accessed in `onSuccess` and ensure only those modifications are synchronized. If other variables (e.g., `token`, `scheduledFuture`) are shared across threads, synchronize their access as shown. If more shared state is present in the full method, wrap only those assignments in `synchronized` blocks. This will minimize blocking and improve performance.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
.../com/facebook/presto/spark/execution/nativeprocess/HttpNativeExecutionTaskResultFetcher.java
Show resolved
Hide resolved
|
Thanks @xin-zhang2 for the improvement. Left some comments. |
Thanks @tanjialiang, but I don’t seem to see the comments. Would you mind checking again? |
| } | ||
|
|
||
| private void onSuccess(PageBufferClient.PagesResponse pagesResponse) | ||
| private synchronized void onSuccess(PageBufferClient.PagesResponse pagesResponse) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xin-zhang2 thanks for this change.
I have a bit of concern about the synchronized(taskHasResult) {...} block in this method. Since taskHasResult is a shared object used beyond this class, we need to ensure this won't cause deadlocks.
For example, if another thread holds the lock on taskHasResult and then attempts to call HttpNativeExecutionTaskResultFetcher.close(), this could lead to a deadlock situation. Any thoughts? PLMK if I have misunderstood anything. @xin-zhang2 @tanjialiang
Description
As disscussed in #26550 (comment), we will use synchronized methods to fix the potential thread safety issues in HttpNativeExecutionTaskResultFetcher.
Motivation and Context
Impact
Test Plan
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.