Skip to content

Commit 217e178

Browse files
authored
fix(error): Fix error passthrough in queued tasks (#145)
1 parent 88d794f commit 217e178

File tree

1 file changed

+16
-12
lines changed

1 file changed

+16
-12
lines changed

src/core/index.ts

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -339,28 +339,30 @@ export class ZarrArray<StoreGetOptions = any> {
339339
// create promise queue with concurrency control
340340
const queue = new PQueue({ concurrency: concurrencyLimit });
341341

342+
const allTasks = [];
343+
342344
if (progressCallback) {
343345

344346
let progress = 0;
345347
let queueSize = 0;
346348
for (const _ of indexer.iter()) queueSize += 1;
347349
progressCallback({ progress: 0, queueSize: queueSize });
348350
for (const proj of indexer.iter()) {
349-
(async () => {
350-
await queue.add(() => this.chunkGetItem(proj.chunkCoords, proj.chunkSelection, out, proj.outSelection, indexer.dropAxes, storeOptions));
351+
allTasks.push(queue.add(async () => {
352+
await this.chunkGetItem(proj.chunkCoords, proj.chunkSelection, out, proj.outSelection, indexer.dropAxes, storeOptions);
351353
progress += 1;
352354
progressCallback({ progress: progress, queueSize: queueSize });
353-
})();
355+
}));
354356
}
355357

356358
} else {
357359
for (const proj of indexer.iter()) {
358-
queue.add(() => this.chunkGetItem(proj.chunkCoords, proj.chunkSelection, out, proj.outSelection, indexer.dropAxes, storeOptions));
360+
allTasks.push(queue.add(() => this.chunkGetItem(proj.chunkCoords, proj.chunkSelection, out, proj.outSelection, indexer.dropAxes, storeOptions)));
359361
}
360362
}
361363

362-
// guarantees that all work on queue has finished
363-
await queue.onIdle();
364+
// guarantees that all work on queue has finished and throws if any of the tasks errored.
365+
await Promise.all(allTasks);
364366

365367
// Return scalar instead of zero-dimensional array.
366368
if (out.shape.length === 0) {
@@ -594,6 +596,8 @@ export class ZarrArray<StoreGetOptions = any> {
594596

595597
const queue = new PQueue({ concurrency: concurrencyLimit });
596598

599+
const allTasks = [];
600+
597601
if (progressCallback) {
598602

599603
let queueSize = 0;
@@ -603,24 +607,24 @@ export class ZarrArray<StoreGetOptions = any> {
603607
progressCallback({ progress: 0, queueSize: queueSize });
604608
for (const proj of indexer.iter()) {
605609
const chunkValue = this.getChunkValue(proj, indexer, value, selectionShape);
606-
(async () => {
607-
await queue.add(() => this.chunkSetItem(proj.chunkCoords, proj.chunkSelection, chunkValue));
610+
allTasks.push(queue.add(async () => {
611+
await this.chunkSetItem(proj.chunkCoords, proj.chunkSelection, chunkValue);
608612
progress += 1;
609613
progressCallback({ progress: progress, queueSize: queueSize });
610-
})();
614+
}));
611615
}
612616

613617
} else {
614618

615619
for (const proj of indexer.iter()) {
616620
const chunkValue = this.getChunkValue(proj, indexer, value, selectionShape);
617-
queue.add(() => this.chunkSetItem(proj.chunkCoords, proj.chunkSelection, chunkValue));
621+
allTasks.push(queue.add(() => this.chunkSetItem(proj.chunkCoords, proj.chunkSelection, chunkValue)));
618622
}
619623

620624
}
621625

622-
// guarantees that all work on queue has finished
623-
await queue.onIdle();
626+
// guarantees that all work on queue has finished and throws if any of the tasks errored.
627+
await Promise.all(allTasks);
624628
}
625629

626630
private async chunkSetItem(chunkCoords: number[], chunkSelection: DimensionSelection[], value: number | NestedArray<TypedArray>) {

0 commit comments

Comments
 (0)