Skip to content

Commit d1ac3d3

Browse files
authored
fix: payment status storage is accurate (#276)
* fix: payment status displays correct values * fix: remove magic numbers for floor price explainer * chore: update package-lock.json in upload-action * chore: pass bigint to formatFileSize now * fix: jsdoc comment for calculateActualStorage function * fix: cleanup unnecessary duplicate error logging * chore: remove duplicate check in getApprovedProviderInfo * test: cleanup calculate actual storage tests
1 parent 0e1ce5b commit d1ac3d3

File tree

10 files changed

+900
-51
lines changed

10 files changed

+900
-51
lines changed

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@
119119
"helia": "^6.0.1",
120120
"it-to-buffer": "^4.0.10",
121121
"multiformats": "^13.4.1",
122+
"p-queue": "^9.0.1",
122123
"picocolors": "^1.1.1",
123124
"pino": "^10.0.0",
124125
"semver": "^7.6.3"
Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
import type { Synapse } from '@filoz/synapse-sdk'
2+
import PQueue from 'p-queue'
3+
import type { Logger } from 'pino'
4+
import { createStorageContextFromDataSetId } from '../synapse/storage-context-helper.js'
5+
import type { ProgressEvent, ProgressEventHandler } from '../utils/types.js'
6+
import { getDataSetPieces } from './get-data-set-pieces.js'
7+
import type { DataSetSummary, DataSetWarning } from './types.js'
8+
9+
export interface ActualStorageResult {
10+
/** Total storage in bytes across all active data sets */
11+
totalBytes: bigint
12+
/** Number of active data sets found */
13+
dataSetCount: number
14+
/** Number of data sets successfully processed */
15+
dataSetsProcessed: number
16+
/** Total number of pieces across all data sets */
17+
pieceCount: number
18+
/** Whether the calculation timed out */
19+
timedOut?: boolean
20+
/** Non-fatal warnings encountered during calculation */
21+
warnings: DataSetWarning[]
22+
}
23+
24+
export type ActualStorageProgressEvents = ProgressEvent<
25+
'actual-storage:progress',
26+
{ dataSetsProcessed: number; dataSetCount: number; pieceCount: number; totalBytes: bigint }
27+
>
28+
29+
/**
30+
* Get a unique Provider-scoped key for a data set
31+
* @param dataSet - The data set to get the key for
32+
* @returns The unique Provider-scoped key for the data set
33+
*/
34+
const getProviderKey = ({ providerId, serviceProvider, dataSetId }: DataSetSummary): string | number => {
35+
if (providerId !== undefined) {
36+
return providerId
37+
}
38+
if (serviceProvider) {
39+
return serviceProvider
40+
}
41+
return `unknown-${dataSetId}`
42+
}
43+
44+
/**
45+
* Calculate actual storage from all active data sets for an address
46+
*
47+
* This function queries all active/live data sets and sums up the actual piece sizes.
48+
* It's more accurate than deriving storage from billing rates, but can be slow for
49+
* users with many pieces.
50+
*
51+
* The calculation respects abort signals - if aborted, it will return partial results
52+
* with a timedOut flag set to true.
53+
*
54+
* Example usage:
55+
* ```typescript
56+
* const result = await calculateActualStorage(synapse, {
57+
* address: '0x1234...',
58+
* signal: AbortSignal.timeout(30000), // 30 second timeout
59+
* logger: myLogger
60+
* })
61+
*
62+
* console.log(`Total storage: ${result.totalBytes} bytes`)
63+
* console.log(`Across ${result.dataSetsProcessed}/${result.dataSetCount} data sets`)
64+
* console.log(`Total pieces: ${result.pieceCount}`)
65+
*
66+
* if (result.timedOut) {
67+
* console.warn('Calculation was aborted, results may be incomplete')
68+
* }
69+
*
70+
* if (result.warnings.length > 0) {
71+
* console.warn('Encountered warnings:', result.warnings)
72+
* }
73+
* ```
74+
*
75+
* @param synapse - Initialized Synapse instance
76+
* @param options - Configuration options
77+
* @returns Actual storage calculation result
78+
*/
79+
export async function calculateActualStorage(
80+
synapse: Synapse,
81+
dataSets: DataSetSummary[],
82+
options?: {
83+
/** Address to calculate storage for (defaults to synapse client address) */
84+
address?: string
85+
/** Abort signal for cancellation/timeout (optional) */
86+
signal?: AbortSignal
87+
/** Logger for debugging (optional) */
88+
logger?: Logger
89+
/** Max number of providers to query in parallel (defaults to 10) */
90+
maxParallelProviders?: number
91+
/** Max concurrent datasets per provider (defaults to 10) */
92+
maxParallelPerProvider?: number
93+
/** Progress callback for UI updates */
94+
onProgress?: ProgressEventHandler<ActualStorageProgressEvents>
95+
}
96+
): Promise<ActualStorageResult> {
97+
const logger = options?.logger
98+
const address = options?.address ?? (await synapse.getClient().getAddress())
99+
const signal = options?.signal
100+
const maxParallelProviders = Math.max(1, options?.maxParallelProviders ?? 10)
101+
const maxParallelPerProvider = Math.max(1, options?.maxParallelPerProvider ?? 10)
102+
const onProgress = options?.onProgress
103+
104+
const warnings: DataSetWarning[] = []
105+
let totalBytes = 0n
106+
let pieceCount = 0
107+
let dataSetsProcessed = 0
108+
let dataSetCount = 0
109+
// Process data sets with provider-scoped concurrency (one at a time per provider)
110+
const globalQueue = new PQueue({ concurrency: maxParallelProviders })
111+
const providerQueues = new Map<string | number, PQueue>()
112+
113+
if (signal) {
114+
signal.addEventListener(
115+
'abort',
116+
() => {
117+
logger?.warn({ reason: signal.reason }, 'Abort signal received during storage calculation')
118+
globalQueue.clear()
119+
providerQueues.forEach((queue) => {
120+
queue.clear()
121+
})
122+
},
123+
{ once: true }
124+
)
125+
}
126+
127+
try {
128+
dataSetCount = dataSets.length
129+
130+
if (dataSetCount === 0) {
131+
return {
132+
totalBytes: 0n,
133+
dataSetCount,
134+
dataSetsProcessed: 0,
135+
pieceCount: 0,
136+
warnings,
137+
}
138+
}
139+
140+
logger?.info({ dataSetCount: dataSets.length, address }, 'Calculating actual storage across data sets')
141+
142+
const processDataSet = async (dataSet: (typeof dataSets)[number]): Promise<void> => {
143+
signal?.throwIfAborted()
144+
145+
try {
146+
const { storage: storageContext } = await createStorageContextFromDataSetId(synapse, dataSet.dataSetId)
147+
148+
signal?.throwIfAborted()
149+
150+
const getPiecesOptions: { logger?: Logger; signal?: AbortSignal } = {}
151+
if (logger) {
152+
getPiecesOptions.logger = logger
153+
}
154+
if (signal) {
155+
getPiecesOptions.signal = signal
156+
}
157+
const result = await getDataSetPieces(synapse, storageContext, getPiecesOptions)
158+
159+
if (result.totalSizeBytes) {
160+
totalBytes += result.totalSizeBytes
161+
}
162+
163+
pieceCount += result.pieces.length
164+
dataSetsProcessed++
165+
166+
if (result.warnings && result.warnings.length > 0) {
167+
warnings.push(...result.warnings)
168+
}
169+
170+
onProgress?.({
171+
type: 'actual-storage:progress',
172+
data: {
173+
dataSetsProcessed,
174+
dataSetCount,
175+
pieceCount,
176+
totalBytes,
177+
},
178+
})
179+
} catch (error) {
180+
if (error instanceof Error && error.name === 'AbortError') {
181+
logger?.warn('Piece retrieval aborted')
182+
throw error // Re-throw AbortError to propagate cancellation
183+
}
184+
185+
const errorMessage = error instanceof Error ? error.message : String(error)
186+
logger?.warn({ dataSetId: dataSet.dataSetId, error: errorMessage }, 'Failed to get pieces for data set')
187+
188+
warnings.push({
189+
code: 'DATA_SET_QUERY_FAILED',
190+
message: `Failed to query pieces for data set ${dataSet.dataSetId}`,
191+
context: {
192+
dataSetId: dataSet.dataSetId,
193+
error: errorMessage,
194+
},
195+
})
196+
}
197+
}
198+
199+
const scheduledPromises = dataSets.map((dataSet) => {
200+
const providerKey = getProviderKey(dataSet)
201+
let providerQueue = providerQueues.get(providerKey)
202+
if (!providerQueue) {
203+
providerQueue = new PQueue({ concurrency: maxParallelPerProvider })
204+
providerQueues.set(providerKey, providerQueue)
205+
}
206+
207+
const jobOptions: { signal?: AbortSignal } = signal ? { signal } : {}
208+
209+
return globalQueue.add(() => providerQueue.add(() => processDataSet(dataSet), jobOptions), jobOptions)
210+
})
211+
212+
await (signal
213+
? Promise.race([Promise.allSettled(scheduledPromises), waitForAbort(signal)])
214+
: Promise.allSettled(scheduledPromises))
215+
216+
// Derive timedOut from signal state
217+
const timedOut = signal?.aborted ?? false
218+
219+
if (timedOut) {
220+
logger?.warn({ dataSetsProcessed, totalDataSets: dataSets.length }, 'Calculation aborted')
221+
warnings.push({
222+
code: 'CALCULATION_ABORTED',
223+
message: `Calculation aborted after processing ${dataSetsProcessed}/${dataSetCount} data sets`,
224+
context: {
225+
dataSetsProcessed,
226+
totalDataSets: dataSetCount,
227+
},
228+
})
229+
}
230+
231+
logger?.info(
232+
{
233+
totalBytes: totalBytes.toString(),
234+
dataSetCount,
235+
dataSetsProcessed,
236+
pieceCount,
237+
timedOut,
238+
},
239+
'Actual storage calculation complete'
240+
)
241+
242+
return {
243+
totalBytes,
244+
dataSetCount,
245+
dataSetsProcessed,
246+
pieceCount,
247+
timedOut,
248+
warnings,
249+
}
250+
} catch (error) {
251+
// Check if this was an abort
252+
const isAborted = signal?.aborted || (error instanceof Error && error.name === 'AbortError')
253+
254+
if (isAborted) {
255+
logger?.warn({ error }, 'Storage calculation aborted; returning partial results')
256+
if (!warnings.some((w) => w.code === 'CALCULATION_ABORTED')) {
257+
warnings.push({
258+
code: 'CALCULATION_ABORTED',
259+
message: `Calculation aborted after processing ${dataSetsProcessed}/${dataSetCount} data sets`,
260+
context: {
261+
dataSetsProcessed,
262+
totalDataSets: dataSetCount,
263+
},
264+
})
265+
}
266+
267+
return {
268+
totalBytes,
269+
dataSetCount,
270+
dataSetsProcessed,
271+
pieceCount,
272+
timedOut: true,
273+
warnings,
274+
}
275+
}
276+
277+
const errorMessage = error instanceof Error ? error.message : String(error)
278+
logger?.error({ error: errorMessage }, 'Failed to calculate actual storage')
279+
280+
throw new Error(`Failed to calculate actual storage: ${errorMessage}`)
281+
}
282+
}
283+
284+
function waitForAbort(signal: AbortSignal): Promise<'aborted'> {
285+
return new Promise((resolve) => {
286+
if (signal.aborted) {
287+
resolve('aborted')
288+
return
289+
}
290+
signal.addEventListener(
291+
'abort',
292+
() => {
293+
resolve('aborted')
294+
},
295+
{ once: true }
296+
)
297+
})
298+
}

src/core/data-set/get-data-set-pieces.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ export async function getDataSetPieces(
8080
pieces.push(pieceInfo)
8181
}
8282
} catch (error) {
83+
if (error instanceof Error && error.name === 'AbortError') {
84+
throw error
85+
}
8386
// If getPieces fails completely, throw - this is a critical error
8487
logger?.error({ dataSetId: storageContext.dataSetId, error }, 'Failed to retrieve pieces from dataset')
8588
throw new Error(`Failed to retrieve pieces for dataset ${storageContext.dataSetId}: ${String(error)}`)

src/core/data-set/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@
88
* Key features:
99
* - List datasets with optional provider enrichment
1010
* - Get pieces from a StorageContext with optional metadata
11+
* - Calculate actual storage across all data sets
1112
* - Graceful error handling with structured warnings
1213
* - Clean separation of concerns (follows SOLID principles)
1314
*
1415
* @module core/data-set
1516
*/
1617

18+
export * from './calculate-actual-storage.js'
1719
export * from './get-data-set-pieces.js'
1820
export * from './get-detailed-data-set.js'
1921
export * from './list-data-sets.js'

0 commit comments

Comments
 (0)