Skip to content

Conversation

@MarkWolters
Copy link
Contributor

@MarkWolters MarkWolters commented Oct 15, 2025

This PR adds the option to specify that graph indexes should be written in parallel rather than sequentially. By default the existing sequential write behavior is maintained, parallel writes will only be used when the withParallelWrites(true) option is set through the OnDiskGraphIndexWriter.Builder class. Testing results below show the speedup achieved in the write phase across a number of cores. These gains appear to scale linearly with respect to dataset size (ie writing a dataset of 10 million records will take about 10x as long as a dataset of 1 million records but the speedup in parallel v sequential is roughly equal).

ETA: Testing also showed that the performance gains using a "prod-like" i3.4xlarge w/16 vCPUs and 8 disks striped RAID0 were roughly equivalent to the performance gains using a 64 vCPU m5.16xlarge with standard SSD

sequential parallel speedup

@github-actions
Copy link
Contributor

github-actions bot commented Oct 15, 2025

Before you submit for review:

  • Does your PR follow guidelines from CONTRIBUTIONS.md?
  • Did you summarize what this PR does clearly and concisely?
  • Did you include performance data for changes which may be performance impacting?
  • Did you include useful docs for any user-facing changes or features?
  • Did you include useful javadocs for developer oriented changes, explaining new concepts or key changes?
  • Did you trigger and review regression testing results against the base branch via Run Bench Main?
  • Did you adhere to the code formatting guidelines (TBD)
  • Did you group your changes for easy review, providing meaningful descriptions for each commit?
  • Did you ensure that all files contain the correct copyright header?

If you did not complete any of these, then please explain below.

Comment on lines 84 to 85
buffer.clear();
var writer = new ByteBufferIndexWriter(buffer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ownership model and lifecycle for the buffer here is a bit ambiguous to me, especially because the buffer is passed into this class as a parameter. I wonder if we can make the ByteBufferIndexWriter hide some of the implementation details so that we do not have a buffer.clear() and the buffer management logic at the end of this method. Instead, we could make the ByteBufferIndexWriter manage all of that, by adding a clone and a reset method, perhaps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated ByteBufferIndexWriter with CloneBuffer (to avoid confusion with Object.clone()) and updated reset to include clearing the buffer. Moved that logic out of NodeRecordTask to clarify ownership in latest commit.

Comment on lines 187 to 211
for (int newOrdinal = 0; newOrdinal <= maxOrdinal; newOrdinal++) {
final int ordinal = newOrdinal;
final long fileOffset = baseOffset + (long) ordinal * recordSize;

Future<NodeRecordTask.Result> future = executor.submit(() -> {
var view = viewPerThread.get();
var buffer = bufferPerThread.get();

var task = new NodeRecordTask(
ordinal,
ordinalMapper,
graph,
view,
inlineFeatures,
featureStateSuppliers,
recordSize,
fileOffset,
buffer
);

return task.call();
});

futures.add(future);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, this doesn't have any back pressure mechanism, and we need that to prevent excessive memory utilization. I think we might want to consider an implementation that uses semaphores to manage concurrency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the latest commit I've added backpressure mechanisms to both this loop and the file channel write loop. I'm not entirely clear on what solution you had in mind vis-a-vis semaphores, but I am open to re-implementing in another fashion if you feel that the code could be improved.


// result.data is already a copy made in NodeRecordTask to avoid
// race conditions with thread-local buffer reuse
afc.write(result.data, result.fileOffset).get();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The .get() here negates the usages of the async file channel.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use a sempahore to limit the number of write tasks in flight.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this section has been rewritten to remove the get() call at this point in the code and to split the write process into sub-tasks. Again, I'm happy to go with a different implementation if the code can be improved but I'm not sure exactly what you had in mind.

@MarkWolters MarkWolters requested a review from Copilot October 21, 2025 11:55
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces parallel write capability for graph indexes to improve write throughput. The implementation maintains backward compatibility by defaulting to sequential writes unless explicitly enabled via withParallelWrites(true). Testing shows linear performance scaling with dataset size, with speedups consistent across different hardware configurations.

Key changes:

  • Added ParallelGraphWriter class that orchestrates parallel L0 record building using thread pools and asynchronous file I/O
  • Introduced ByteBufferIndexWriter for in-memory record construction before bulk disk writes
  • Added withParallelWrites(boolean) builder option to OnDiskGraphIndexWriter.Builder

Reviewed Changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
ParallelWriteExample.java Example demonstrating parallel vs sequential write usage patterns and benchmark comparison
Grid.java Enables parallel writes in the production Grid.buildOnDisk method
ParallelGraphWriter.java Core parallel writer implementation with thread pooling, memory-aware backpressure, and async I/O
OnDiskGraphIndexWriter.java Updated to support optional parallel write mode via builder configuration
NodeRecordTask.java Task implementation for building individual node records in parallel worker threads
GraphIndexWriterTypes.java New enum defining available writer types (sequential vs parallel)
GraphIndexWriter.java Added factory methods for creating appropriate writer builders based on type
ByteBufferIndexWriter.java IndexWriter implementation for writing to ByteBuffers in memory

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

@sam-herman
Copy link
Contributor

sam-herman commented Oct 21, 2025

@MarkWolters this is very cool change!

re:

sequential writes will only be used when the withParallelWrites(true) option is set through the OnDiskGraphIndexWriter.Builder class

Did you mean parallel writes will only be used when the withParallelWrites(true) option is set?

Copy link
Contributor

@sam-herman sam-herman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added more comments, primarily around two aspects:

  1. Calculation of buffer - Seems like we need to consider alignment and hardware parallelism to reduce buffer sizes.
  2. Batching and GC and malloc overhead - It seems as if we are creating a task per ordinal, which seems like something that could be somewhat expensive on memory allocation and GC overhead.

* This task is designed to be executed in a thread pool, with each worker thread
* owning its own ImmutableGraphIndex.View for thread-safe neighbor iteration.
*/
class NodeRecordTask implements Callable<NodeRecordTask.Result> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the memory impact of creating this object for each write operation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That depends on dataset size and vector dimensionality. But the number of NodeRecordTasks in existence at any time is bounded by a buffer that gets sized based on available memory as a back pressure mechanism.

@Override
public Result call() throws Exception {
// Writer automatically clears buffer on construction
var writer = new ByteBufferIndexWriter(buffer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not make the ByteBufferIndexWriter thread local and avoid recreating it on every call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's a significant difference. The constructor is trivially cheap, it just stores the buffer reference and initial position, and the real cost (the BytBuffer allocation) is already thread local. ByteBufferIndexWrite itself is tiny, only 2 fields, so allocation cost is negligible.

*
* @return the number of records to buffer before writing
*/
private int calculateOptimalBufferSize() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we take graph properties to this calculation to consider alignment with device blocks?
Also if we know the supported parallelism of the hardware, can leverage that as well to reduce the buffer size in the calculation to achieve the minimal queue depth?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we take graph properties to this calculation to consider alignment with device blocks? - I'm afraid I don't really understand what you are asking here, can you point me to an example of what you are referring to, or go into a little more detail?

final long fileOffset = baseOffset + (long) ordinal * recordSize;

// Submit task to build this record
Future<NodeRecordTask.Result> future = executor.submit(() -> {
Copy link
Contributor

@sam-herman sam-herman Oct 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will create a new future object for each ordinal? Should we batch those to prevent GC and memory allocation overhead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is batched. The buffer is checked and, if full, written and cleared on lines 190 - 193.
if (futures.size() >= bufferSize) { writeRecordsAsync(futures); futures.clear(); }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please correct me if I'm misunderstanding, but it seems to me that for every ordinal we will (eventually) create a task object? If this is the case, can we avoid that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also wondering about this. If we are wrong, it may be useful to add a comment in the code explaining this point. Others might trip here too in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct in your summary @sam-herman, but it seemed to me like the natural splitting point. I could assign a range of ordinals to each task (which, I suppose it what you were suggesting by batching, which I misunderstood) instead of only a single ordinal. What do you think of that? cc:@marianotepper

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a specific suggestion here, besides the ones suggested above. Not sure if that one would be over tuning the implementation either. We could run a micro benchmark to see if memory usage increases significantly or not.

Copy link
Contributor

@sam-herman sam-herman Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @MarkWolters,

I've been thinking more about this, and aside from the memory allocation aspect, I have another concern related to disk alignment and throughput (as mentioned in my earlier comment regarding ParallelGraphWriter line 215).

Given a file of length N bytes and a disk capable of k concurrent writes, the ideal scenario for maximizing throughput would be to minimize the disk queue depth Q. In theory, if we have k threads each handling a contiguous buffer of size B = N/k, we could keep Q close to zero.

However, in the current implementation, it looks like the write of each node and it's adjacency list, are being distributed randomly among the threads. This seems to reduce the number of contiguous writes, which could increase the queue depth and lower overall throughput.

Is this understanding correct? Also, for the graphs you mentioned in the description, it might be helpful to measure the actual disk queue depth to assess how effectively we're maximizing throughput.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The eventual solution I settled on is similar to this suggestion, although it uses dataset size in terms of number of vectors rather than absolute file size. Monitoring the disk queue in Java would be tricky as we're talking about writing OS-specific code, and I don't think this enhancement should introduce that level of risk.

}

@Override
public Result call() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that it would be useful for SequentialWriter and writeL0RecordsSequential to share this implementation somehow, so that we do not have code duplication.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I 100% agree. My only question is whether that belongs in this PR or not. Originally the logic was duplicated between OnDiskSequentialGraphIndexWriter and OnDiskGraphIndexWriter, so there's not actually any new duplication being added here. If the consensus is that I should tackle that here I will do so. What do you think @marianotepper and @sam-herman?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, we can do that in a subsequent PR.

final long fileOffset = baseOffset + (long) ordinal * recordSize;

// Submit task to build this record
Future<NodeRecordTask.Result> future = executor.submit(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also wondering about this. If we are wrong, it may be useful to add a comment in the code explaining this point. Others might trip here too in the future.

Copy link
Contributor

@marianotepper marianotepper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants