-
Notifications
You must be signed in to change notification settings - Fork 140
Parallelize graph writes #542
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Before you submit for review:
If you did not complete any of these, then please explain below. |
| buffer.clear(); | ||
| var writer = new ByteBufferIndexWriter(buffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ownership model and lifecycle for the buffer here is a bit ambiguous to me, especially because the buffer is passed into this class as a parameter. I wonder if we can make the ByteBufferIndexWriter hide some of the implementation details so that we do not have a buffer.clear() and the buffer management logic at the end of this method. Instead, we could make the ByteBufferIndexWriter manage all of that, by adding a clone and a reset method, perhaps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated ByteBufferIndexWriter with CloneBuffer (to avoid confusion with Object.clone()) and updated reset to include clearing the buffer. Moved that logic out of NodeRecordTask to clarify ownership in latest commit.
| for (int newOrdinal = 0; newOrdinal <= maxOrdinal; newOrdinal++) { | ||
| final int ordinal = newOrdinal; | ||
| final long fileOffset = baseOffset + (long) ordinal * recordSize; | ||
|
|
||
| Future<NodeRecordTask.Result> future = executor.submit(() -> { | ||
| var view = viewPerThread.get(); | ||
| var buffer = bufferPerThread.get(); | ||
|
|
||
| var task = new NodeRecordTask( | ||
| ordinal, | ||
| ordinalMapper, | ||
| graph, | ||
| view, | ||
| inlineFeatures, | ||
| featureStateSuppliers, | ||
| recordSize, | ||
| fileOffset, | ||
| buffer | ||
| ); | ||
|
|
||
| return task.call(); | ||
| }); | ||
|
|
||
| futures.add(future); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, this doesn't have any back pressure mechanism, and we need that to prevent excessive memory utilization. I think we might want to consider an implementation that uses semaphores to manage concurrency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the latest commit I've added backpressure mechanisms to both this loop and the file channel write loop. I'm not entirely clear on what solution you had in mind vis-a-vis semaphores, but I am open to re-implementing in another fashion if you feel that the code could be improved.
|
|
||
| // result.data is already a copy made in NodeRecordTask to avoid | ||
| // race conditions with thread-local buffer reuse | ||
| afc.write(result.data, result.fileOffset).get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The .get() here negates the usages of the async file channel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could use a sempahore to limit the number of write tasks in flight.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this section has been rewritten to remove the get() call at this point in the code and to split the write process into sub-tasks. Again, I'm happy to go with a different implementation if the code can be improved but I'm not sure exactly what you had in mind.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces parallel write capability for graph indexes to improve write throughput. The implementation maintains backward compatibility by defaulting to sequential writes unless explicitly enabled via withParallelWrites(true). Testing shows linear performance scaling with dataset size, with speedups consistent across different hardware configurations.
Key changes:
- Added
ParallelGraphWriterclass that orchestrates parallel L0 record building using thread pools and asynchronous file I/O - Introduced
ByteBufferIndexWriterfor in-memory record construction before bulk disk writes - Added
withParallelWrites(boolean)builder option toOnDiskGraphIndexWriter.Builder
Reviewed Changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| ParallelWriteExample.java | Example demonstrating parallel vs sequential write usage patterns and benchmark comparison |
| Grid.java | Enables parallel writes in the production Grid.buildOnDisk method |
| ParallelGraphWriter.java | Core parallel writer implementation with thread pooling, memory-aware backpressure, and async I/O |
| OnDiskGraphIndexWriter.java | Updated to support optional parallel write mode via builder configuration |
| NodeRecordTask.java | Task implementation for building individual node records in parallel worker threads |
| GraphIndexWriterTypes.java | New enum defining available writer types (sequential vs parallel) |
| GraphIndexWriter.java | Added factory methods for creating appropriate writer builders based on type |
| ByteBufferIndexWriter.java | IndexWriter implementation for writing to ByteBuffers in memory |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/ParallelGraphWriter.java
Outdated
Show resolved
Hide resolved
jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexWriter.java
Outdated
Show resolved
Hide resolved
|
@MarkWolters this is very cool change! re:
Did you mean parallel writes will only be used when the withParallelWrites(true) option is set? |
jvector-base/src/main/java/io/github/jbellis/jvector/disk/ByteBufferIndexWriter.java
Outdated
Show resolved
Hide resolved
jvector-examples/src/main/java/io/github/jbellis/jvector/graph/disk/ParallelWriteExample.java
Show resolved
Hide resolved
jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexWriter.java
Outdated
Show resolved
Hide resolved
jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexWriter.java
Outdated
Show resolved
Hide resolved
jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/ParallelGraphWriter.java
Outdated
Show resolved
Hide resolved
jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/ParallelGraphWriter.java
Outdated
Show resolved
Hide resolved
jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/ParallelGraphWriter.java
Outdated
Show resolved
Hide resolved
jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/ParallelGraphWriter.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added more comments, primarily around two aspects:
- Calculation of buffer - Seems like we need to consider alignment and hardware parallelism to reduce buffer sizes.
- Batching and GC and malloc overhead - It seems as if we are creating a task per ordinal, which seems like something that could be somewhat expensive on memory allocation and GC overhead.
| * This task is designed to be executed in a thread pool, with each worker thread | ||
| * owning its own ImmutableGraphIndex.View for thread-safe neighbor iteration. | ||
| */ | ||
| class NodeRecordTask implements Callable<NodeRecordTask.Result> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would be the memory impact of creating this object for each write operation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That depends on dataset size and vector dimensionality. But the number of NodeRecordTasks in existence at any time is bounded by a buffer that gets sized based on available memory as a back pressure mechanism.
jvector-base/src/main/java/io/github/jbellis/jvector/disk/ByteBufferIndexWriter.java
Outdated
Show resolved
Hide resolved
jvector-examples/src/main/java/io/github/jbellis/jvector/graph/disk/ParallelWriteExample.java
Show resolved
Hide resolved
jvector-base/src/main/java/io/github/jbellis/jvector/disk/ByteBufferIndexWriter.java
Show resolved
Hide resolved
jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/GraphIndexWriterTypes.java
Show resolved
Hide resolved
| @Override | ||
| public Result call() throws Exception { | ||
| // Writer automatically clears buffer on construction | ||
| var writer = new ByteBufferIndexWriter(buffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not make the ByteBufferIndexWriter thread local and avoid recreating it on every call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there's a significant difference. The constructor is trivially cheap, it just stores the buffer reference and initial position, and the real cost (the BytBuffer allocation) is already thread local. ByteBufferIndexWrite itself is tiny, only 2 fields, so allocation cost is negligible.
| * | ||
| * @return the number of records to buffer before writing | ||
| */ | ||
| private int calculateOptimalBufferSize() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we take graph properties to this calculation to consider alignment with device blocks?
Also if we know the supported parallelism of the hardware, can leverage that as well to reduce the buffer size in the calculation to achieve the minimal queue depth?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we take graph properties to this calculation to consider alignment with device blocks? - I'm afraid I don't really understand what you are asking here, can you point me to an example of what you are referring to, or go into a little more detail?
| final long fileOffset = baseOffset + (long) ordinal * recordSize; | ||
|
|
||
| // Submit task to build this record | ||
| Future<NodeRecordTask.Result> future = executor.submit(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will create a new future object for each ordinal? Should we batch those to prevent GC and memory allocation overhead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is batched. The buffer is checked and, if full, written and cleared on lines 190 - 193.
if (futures.size() >= bufferSize) { writeRecordsAsync(futures); futures.clear(); }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please correct me if I'm misunderstanding, but it seems to me that for every ordinal we will (eventually) create a task object? If this is the case, can we avoid that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also wondering about this. If we are wrong, it may be useful to add a comment in the code explaining this point. Others might trip here too in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are correct in your summary @sam-herman, but it seemed to me like the natural splitting point. I could assign a range of ordinals to each task (which, I suppose it what you were suggesting by batching, which I misunderstood) instead of only a single ordinal. What do you think of that? cc:@marianotepper
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a specific suggestion here, besides the ones suggested above. Not sure if that one would be over tuning the implementation either. We could run a micro benchmark to see if memory usage increases significantly or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @MarkWolters,
I've been thinking more about this, and aside from the memory allocation aspect, I have another concern related to disk alignment and throughput (as mentioned in my earlier comment regarding ParallelGraphWriter line 215).
Given a file of length N bytes and a disk capable of k concurrent writes, the ideal scenario for maximizing throughput would be to minimize the disk queue depth Q. In theory, if we have k threads each handling a contiguous buffer of size B = N/k, we could keep Q close to zero.
However, in the current implementation, it looks like the write of each node and it's adjacency list, are being distributed randomly among the threads. This seems to reduce the number of contiguous writes, which could increase the queue depth and lower overall throughput.
Is this understanding correct? Also, for the graphs you mentioned in the description, it might be helpful to measure the actual disk queue depth to assess how effectively we're maximizing throughput.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The eventual solution I settled on is similar to this suggestion, although it uses dataset size in terms of number of vectors rather than absolute file size. Monitoring the disk queue in Java would be tricky as we're talking about writing OS-specific code, and I don't think this enhancement should introduce that level of risk.
jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/GraphIndexWriter.java
Show resolved
Hide resolved
jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/GraphIndexWriterTypes.java
Show resolved
Hide resolved
| } | ||
|
|
||
| @Override | ||
| public Result call() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that it would be useful for SequentialWriter and writeL0RecordsSequential to share this implementation somehow, so that we do not have code duplication.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I 100% agree. My only question is whether that belongs in this PR or not. Originally the logic was duplicated between OnDiskSequentialGraphIndexWriter and OnDiskGraphIndexWriter, so there's not actually any new duplication being added here. If the consensus is that I should tackle that here I will do so. What do you think @marianotepper and @sam-herman?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, we can do that in a subsequent PR.
| final long fileOffset = baseOffset + (long) ordinal * recordSize; | ||
|
|
||
| // Submit task to build this record | ||
| Future<NodeRecordTask.Result> future = executor.submit(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also wondering about this. If we are wrong, it may be useful to add a comment in the code explaining this point. Others might trip here too in the future.
jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/ParallelGraphWriter.java
Outdated
Show resolved
Hide resolved
marianotepper
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This PR adds the option to specify that graph indexes should be written in parallel rather than sequentially. By default the existing sequential write behavior is maintained, parallel writes will only be used when the withParallelWrites(true) option is set through the OnDiskGraphIndexWriter.Builder class. Testing results below show the speedup achieved in the write phase across a number of cores. These gains appear to scale linearly with respect to dataset size (ie writing a dataset of 10 million records will take about 10x as long as a dataset of 1 million records but the speedup in parallel v sequential is roughly equal).
ETA: Testing also showed that the performance gains using a "prod-like" i3.4xlarge w/16 vCPUs and 8 disks striped RAID0 were roughly equivalent to the performance gains using a 64 vCPU m5.16xlarge with standard SSD