Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.page.SerializedPage;

import javax.annotation.concurrent.GuardedBy;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -67,10 +69,9 @@ public class HttpNativeExecutionTaskResultFetcher
private final Object taskHasResult;
private final AtomicReference<Throwable> lastException = new AtomicReference<>();

@GuardedBy("this")
private ScheduledFuture<?> scheduledFuture;

private volatile boolean completed;

private long token;

public HttpNativeExecutionTaskResultFetcher(
Expand All @@ -86,15 +87,15 @@ public HttpNativeExecutionTaskResultFetcher(
this.taskHasResult = requireNonNull(taskHasResult, "taskHasResult is null");
}

public void start()
public synchronized void start()
{
scheduledFuture = scheduler.scheduleAtFixedRate(this::doGetResults,
0,
(long) FETCH_INTERVAL.getValue(),
FETCH_INTERVAL.getUnit());
}

public void stop(boolean success)
public synchronized void stop(boolean success)
{
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
Expand Down Expand Up @@ -129,7 +130,7 @@ public boolean hasPage()
return !pageBuffer.isEmpty();
}

private void throwIfFailed()
private synchronized void throwIfFailed()
{
if (scheduledFuture != null && scheduledFuture.isCancelled() && lastException.get() != null) {
Throwable failure = lastException.get();
Expand All @@ -140,11 +141,6 @@ private void throwIfFailed()

private void doGetResults()
{
if (completed && scheduledFuture != null) {
scheduledFuture.cancel(false);
return;
}

if (bufferMemoryBytes.longValue() >= MAX_BUFFER_SIZE.toBytes()) {
return;
}
Expand All @@ -159,7 +155,7 @@ private void doGetResults()
}
}

private void onSuccess(PageBufferClient.PagesResponse pagesResponse)
private synchronized void onSuccess(PageBufferClient.PagesResponse pagesResponse)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xin-zhang2 thanks for this change.

I have a bit of concern about the synchronized(taskHasResult) {...} block in this method. Since taskHasResult is a shared object used beyond this class, we need to ensure this won't cause deadlocks.

For example, if another thread holds the lock on taskHasResult and then attempts to call HttpNativeExecutionTaskResultFetcher.close(), this could lead to a deadlock situation. Any thoughts? PLMK if I have misunderstood anything. @xin-zhang2 @tanjialiang

{
List<SerializedPage> pages = pagesResponse.getPages();
long bytes = 0;
Expand All @@ -185,7 +181,6 @@ private void onSuccess(PageBufferClient.PagesResponse pagesResponse)
}
token = nextToken;
if (pagesResponse.isClientComplete()) {
completed = true;
workerClient.abortResultsAsync(taskId);
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
Expand Down
Loading