Skip to content

Conversation

@xin-zhang2
Copy link
Contributor

Description

As disscussed in #26550 (comment), we will use synchronized methods to fix the potential thread safety issues in HttpNativeExecutionTaskResultFetcher.

Motivation and Context

Impact

Test Plan

Contributor checklist

  • Please make sure your submission complies with our contributing guide, in particular code style and commit standards.
  • PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
  • Documented new properties (with its default value), SQL syntax, functions, or other functionality.
  • If release notes are required, they follow the release notes guidelines.
  • Adequate tests were added if applicable.
  • CI passed.
  • If adding new dependencies, verified they have an OpenSSF Scorecard score of 5.0 or higher (or obtained explicit TSC approval for lower scores).

Release Notes

Please follow release notes guidelines and fill in the release notes below.

== NO RELEASE NOTE ==

@xin-zhang2 xin-zhang2 requested review from a team and shrinidhijoshi as code owners November 18, 2025 18:01
@prestodb-ci prestodb-ci added the from:IBM PR from IBM label Nov 18, 2025
@prestodb-ci prestodb-ci requested a review from a team November 18, 2025 18:01
@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Nov 18, 2025

Reviewer's guide (collapsed on small PRs)

Reviewer's Guide

This PR makes scheduledFuture access in HttpNativeExecutionTaskResultFetcher thread-safe by synchronizing core methods, removes the unused completed flag, and cleans up redundant completion logic in result fetching.

Class diagram for updated HttpNativeExecutionTaskResultFetcher thread safety

classDiagram
    class HttpNativeExecutionTaskResultFetcher {
        -Object taskHasResult
        -AtomicReference<Throwable> lastException
        -ScheduledFuture<?> scheduledFuture
        -long token
        +synchronized void start()
        +synchronized void stop(boolean success)
        +boolean hasPage()
        +synchronized void throwIfFailed()
        -void doGetResults()
        +synchronized void onSuccess(PageBufferClient.PagesResponse pagesResponse)
    }
    class ScheduledFuture {
    }
    HttpNativeExecutionTaskResultFetcher --> ScheduledFuture
    class AtomicReference {
    }
    HttpNativeExecutionTaskResultFetcher --> AtomicReference
    class PageBufferClient {
    }
    class PagesResponse {
    }
    HttpNativeExecutionTaskResultFetcher --> PagesResponse
    PageBufferClient --> PagesResponse
Loading

File-Level Changes

Change Details Files
Synchronized scheduledFuture management and removed completed flag for thread safety
  • Annotated scheduledFuture with @GuardedBy("this")
  • Made start(), stop(), throwIfFailed(), and onSuccess() synchronized
  • Removed volatile completed field and its usage/checks
  • Eliminated redundant completion check in doGetResults
presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/HttpNativeExecutionTaskResultFetcher.java

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@prestodb-ci prestodb-ci requested review from Dilli-Babu-Godari and Joe-Abraham and removed request for a team November 18, 2025 18:01
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes and they look great!

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location> `presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/HttpNativeExecutionTaskResultFetcher.java:133` </location>
<code_context>
     }

-    private void throwIfFailed()
+    private synchronized void throwIfFailed()
     {
         if (scheduledFuture != null && scheduledFuture.isCancelled() && lastException.get() != null) {
</code_context>

<issue_to_address>
**suggestion:** Synchronizing 'throwIfFailed' may be unnecessary if only reading atomic references.

As 'lastException' is an AtomicReference and only reads are performed, you may be able to remove synchronization unless specific ordering is required.

```suggestion
    private void throwIfFailed()
```
</issue_to_address>

### Comment 2
<location> `presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/HttpNativeExecutionTaskResultFetcher.java:158` </location>
<code_context>
     }

-    private void onSuccess(PageBufferClient.PagesResponse pagesResponse)
+    private synchronized void onSuccess(PageBufferClient.PagesResponse pagesResponse)
     {
         List<SerializedPage> pages = pagesResponse.getPages();
</code_context>

<issue_to_address>
**suggestion (performance):** Synchronizing 'onSuccess' may block other operations; review for performance impact.

If 'onSuccess' is invoked often or handles large batches, method-level synchronization may cause delays. Evaluate if finer-grained synchronization is possible.

Suggested implementation:

```java
    private void onSuccess(PageBufferClient.PagesResponse pagesResponse)
    {
        List<SerializedPage> pages = pagesResponse.getPages();
        long bytes = 0;
        // Synchronize only the critical section that modifies shared state
        synchronized (this) {
            token = nextToken;
        }
        if (pagesResponse.isClientComplete()) {
            workerClient.abortResultsAsync(taskId);
            synchronized (this) {
                if (scheduledFuture != null) {
                    scheduledFuture.cancel(false);
                }
            }
        }

```

You should review all shared mutable state accessed in `onSuccess` and ensure only those modifications are synchronized. If other variables (e.g., `token`, `scheduledFuture`) are shared across threads, synchronize their access as shown. If more shared state is present in the full method, wrap only those assignments in `synchronized` blocks. This will minimize blocking and improve performance.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@tanjialiang
Copy link
Contributor

Thanks @xin-zhang2 for the improvement. Left some comments.

@xin-zhang2
Copy link
Contributor Author

Left some comments.

Thanks @tanjialiang, but I don’t seem to see the comments. Would you mind checking again?

}

private void onSuccess(PageBufferClient.PagesResponse pagesResponse)
private synchronized void onSuccess(PageBufferClient.PagesResponse pagesResponse)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xin-zhang2 thanks for this change.

I have a bit of concern about the synchronized(taskHasResult) {...} block in this method. Since taskHasResult is a shared object used beyond this class, we need to ensure this won't cause deadlocks.

For example, if another thread holds the lock on taskHasResult and then attempts to call HttpNativeExecutionTaskResultFetcher.close(), this could lead to a deadlock situation. Any thoughts? PLMK if I have misunderstood anything. @xin-zhang2 @tanjialiang

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

from:IBM PR from IBM

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants