Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
"types": "./dist/core/payments/index.d.ts",
"default": "./dist/core/payments/index.js"
},
"./core/piece": {
"types": "./dist/core/piece/index.d.ts",
"default": "./dist/core/piece/index.js"
},
"./core/synapse": {
"types": "./dist/core/synapse/index.d.ts",
"default": "./dist/core/synapse/index.js"
Expand Down Expand Up @@ -109,8 +113,8 @@
"homepage": "https://github.com/filecoin-project/filecoin-pin#readme",
"dependencies": {
"@clack/prompts": "^0.11.0",
"@filoz/synapse-core": "^0.1.3",
"@filoz/synapse-sdk": "^0.36.0",
"@filoz/synapse-core": "^0.1.4",
"@filoz/synapse-sdk": "^0.36.1",
"@helia/unixfs": "^6.0.1",
"@ipld/car": "^5.4.2",
"commander": "^14.0.1",
Expand Down
2 changes: 2 additions & 0 deletions src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { addCommand } from './commands/add.js'
import { dataSetCommand } from './commands/data-set.js'
import { importCommand } from './commands/import.js'
import { paymentsCommand } from './commands/payments.js'
import { rmCommand } from './commands/rm.js'
import { serverCommand } from './commands/server.js'
import { checkForUpdate, type UpdateCheckStatus } from './common/version-check.js'
import { version as packageVersion } from './core/utils/version.js'
Expand All @@ -24,6 +25,7 @@ program.addCommand(paymentsCommand)
program.addCommand(dataSetCommand)
program.addCommand(importCommand)
program.addCommand(addCommand)
program.addCommand(rmCommand)

// Default action - show help if no command specified
program.action(() => {
Expand Down
19 changes: 19 additions & 0 deletions src/commands/rm.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { Command } from 'commander'
import { runRmPiece } from '../rm/index.js'
import { addAuthOptions } from '../utils/cli-options.js'

export const rmCommand = new Command('rm')
.description('Remove a Piece from a DataSet')
.requiredOption('--piece <cid>', 'Piece CID to remove')
.requiredOption('--data-set <id>', 'DataSet ID to remove the piece from')
.option('--wait-for-confirmation', 'Wait for transaction confirmation before exiting')
.action(async (options) => {
try {
await runRmPiece(options)
} catch {
// Error already displayed by clack UI in runRmPiece
process.exit(1)
}
})

addAuthOptions(rmCommand)
88 changes: 86 additions & 2 deletions src/core/data-set/get-data-set-pieces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,15 @@
*/

import { getSizeFromPieceCID } from '@filoz/synapse-core/piece'
import { METADATA_KEYS, type StorageContext, type Synapse, WarmStorageService } from '@filoz/synapse-sdk'
import {
type DataSetPieceData,
METADATA_KEYS,
PDPServer,
PDPVerifier,
type StorageContext,
type Synapse,
WarmStorageService,
} from '@filoz/synapse-sdk'
import { isStorageContextWithDataSetId } from './type-guards.js'
import type {
DataSetPiecesResult,
Expand All @@ -16,6 +24,7 @@ import type {
PieceInfo,
StorageContextWithDataSetId,
} from './types.js'
import { PieceStatus } from './types.js'

/**
* Get all pieces for a dataset from a StorageContext
Expand Down Expand Up @@ -59,13 +68,57 @@ export async function getDataSetPieces(
const pieces: PieceInfo[] = []
const warnings: DataSetWarning[] = []

// call PDPVerifier.getScheduledRemovals to get the list of pieces that are scheduled for removal
let scheduledRemovals: number[] = []
let pdpServerPieces: DataSetPieceData[] | null = null
try {
const warmStorage = await WarmStorageService.create(synapse.getProvider(), synapse.getWarmStorageAddress())
const pdpVerifier = new PDPVerifier(synapse.getProvider(), warmStorage.getPDPVerifierAddress())
scheduledRemovals = await pdpVerifier.getScheduledRemovals(storageContext.dataSetId)
try {
const providerInfo = await synapse.getProviderInfo(storageContext.provider.serviceProvider)
const pdpServer = new PDPServer(null, providerInfo.products?.PDP?.data?.serviceURL ?? '')
const dataSet = await pdpServer.getDataSet(storageContext.dataSetId)
pdpServerPieces = dataSet.pieces
} catch (error) {
logger?.warn({ error }, 'Failed to fetch provider data for scheduled removals and orphan detection')
warnings.push({
code: 'PROVIDER_DATA_UNAVAILABLE',
message: 'Failed to fetch provider data; orphan detection disabled',
context: { dataSetId: storageContext.dataSetId, error: String(error) },
})
}
} catch (error) {
logger?.warn({ error }, 'Failed to get scheduled removals')
warnings.push({
code: 'SCHEDULED_REMOVALS_UNAVAILABLE',
message: 'Failed to get scheduled removals',
context: { dataSetId: storageContext.dataSetId, error: String(error) },
})
}

// Use the async generator to fetch all pieces
try {
const getPiecesOptions = { ...(signal && { signal }) }
const providerPiecesById = pdpServerPieces ? new Map(pdpServerPieces.map((piece) => [piece.pieceId, piece])) : null
for await (const piece of storageContext.getPieces(getPiecesOptions)) {
const pieceId = piece.pieceId
const pieceCid = piece.pieceCid
const pieceInfo: PieceInfo = { pieceId, pieceCid: pieceCid.toString() }
const status = getPieceStatus(pieceId, scheduledRemovals, providerPiecesById)
const pieceInfo: PieceInfo = {
pieceId,
pieceCid: pieceCid.toString(),
status,
}
if (status === PieceStatus.ONCHAIN_ORPHANED) {
warnings.push({
code: 'ONCHAIN_ORPHANED',
message: 'Piece is on-chain but the provider does not report it',
context: { pieceId, pieceCid },
})
} else if (status === PieceStatus.ACTIVE && providerPiecesById) {
providerPiecesById.delete(pieceId)
}

// Calculate piece size from CID
try {
Expand All @@ -79,6 +132,22 @@ export async function getDataSetPieces(

pieces.push(pieceInfo)
}
if (providerPiecesById !== null) {
for (const piece of providerPiecesById.values()) {
// add the rest of the pieces to the pieces list
pieces.push({
pieceId: piece.pieceId,
pieceCid: piece.pieceCid.toString(),
status: PieceStatus.OFFCHAIN_ORPHANED,
})
warnings.push({
code: 'OFFCHAIN_ORPHANED',
message: 'Piece is reported by provider but not on-chain',
context: { pieceId: piece.pieceId, pieceCid: piece.pieceCid.toString() },
})
}
}
pieces.sort((a, b) => a.pieceId - b.pieceId)
} catch (error) {
if (error instanceof Error && error.name === 'AbortError') {
throw error
Expand Down Expand Up @@ -109,6 +178,21 @@ export async function getDataSetPieces(
return result
}

function getPieceStatus(
pieceId: number,
scheduledRemovals: number[],
providerPiecesById: Map<DataSetPieceData['pieceId'], DataSetPieceData> | null
): PieceStatus {
if (scheduledRemovals.includes(pieceId)) {
return PieceStatus.PENDING_REMOVAL
}
if (providerPiecesById === null || providerPiecesById.has(pieceId)) {
// if we were unable to get the provider pieces, or the provider knows about the piece, we will consider the piece active
return PieceStatus.ACTIVE
}
return PieceStatus.ONCHAIN_ORPHANED
}

/**
* Internal helper: Enrich pieces with metadata from WarmStorage
*
Expand Down
18 changes: 18 additions & 0 deletions src/core/data-set/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,23 @@
import type { EnhancedDataSetInfo, ProviderInfo, StorageContext } from '@filoz/synapse-sdk'
import type { Logger } from 'pino'

/**
* Status of the piece, e.g. "pending removal", "active", "orphaned"
*
* - "pending-removal": the piece is scheduled for deletion, but still showing on chain
* - "active": the piece is active, onchain and known by the provider
* - "onchain-orphaned": the piece is not known by the provider, but still on chain
* - "offchain-orphaned": the piece is known by the provider, but not on chain
*
* The orphaned states should not happen, but have been observed and should be logged and displayed to the user.
*/
export enum PieceStatus {
ACTIVE = 'ACTIVE',
PENDING_REMOVAL = 'PENDING_REMOVAL',
ONCHAIN_ORPHANED = 'ONCHAIN_ORPHANED',
OFFCHAIN_ORPHANED = 'OFFCHAIN_ORPHANED',
}

/**
* Information about a single piece in a dataset
*/
Expand All @@ -19,6 +36,7 @@ export interface PieceInfo {
pieceId: number
/** Piece Commitment (CommP) as string */
pieceCid: string
status: PieceStatus
/** Root IPFS CID (from metadata, if available) */
rootIpfsCid?: string
/** Piece size in bytes (if available) */
Expand Down
1 change: 1 addition & 0 deletions src/core/piece/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './remove-piece.js'
Loading