Skip to content

Commit 0c2a0ab

Browse files
feat: add streamedListObjects for unlimited object retrieval
Adds streamedListObjects method for retrieving unlimited objects via the streaming API endpoint. Node.js-only implementation with resilient NDJSON parsing, proper error handling, and automatic resource cleanup. Requires OpenFGA server v1.2.0+ Features: - Streams beyond 1000-object limit (tested with 2000 objects) - Memory-efficient incremental results via async generators - Automatic stream cleanup prevents connection leaks - Proper error propagation through async iterators - Flexible input types (Readable|AsyncIterable|string|Buffer|Uint8Array) - Telemetry maintained through streaming request helper Implementation: - streaming.ts: NDJSON parser with robust error handling - common.ts: createStreamingRequestFunction for axios streaming - client.ts: streamedListObjects() async generator wrapper - Error handling: Pending promises reject on error, onEnd guarded - Resource management: Stream destruction in return()/throw()/finally - Type safety: Wide signature eliminates unnecessary casts Testing (153/153 tests passing): - 17 streaming tests (parsing, errors, cleanup, edge cases) - 95% coverage on streaming.ts - Live tested: 3-object and 2000-object streaming verified Examples: - example/streamed-list-objects: Full model with 2000 tuples - example/streamed-list-objects-local: Minimal local setup Related: - Fixes #236 - Parent issue: openfga/sdk-generator#76 - Related PR: openfga/sdk-generator#654 (templates)
1 parent 9b81c81 commit 0c2a0ab

File tree

14 files changed

+1159
-1
lines changed

14 files changed

+1159
-1
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,14 @@
44
## [Unreleased](https://github.com/openfga/js-sdk/compare/v0.9.0...HEAD)
55

66
- feat: add support for handling Retry-After header (#267)
7+
- feat: streamedListObjects (streaming ListObjects) - Node.js only
8+
- Enables retrieving >1000 objects beyond standard listObjects limit
9+
- Requires OpenFGA server [v1.2.0+](https://github.com/openfga/openfga/releases/tag/v1.2.0)
10+
- Uses axios streaming via API layer with preserved telemetry
11+
- Resilient NDJSON parsing (supports async-iterable and event-based streams)
12+
- Parses chunked data across multiple reads; handles Buffer/string inputs
13+
- Adds example for usage: `example/streamed-list-objects`
14+
- Adds example for local usage: `example/streamed-list-objects-local`
715

816
## v0.9.0
917

api.ts

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import {
2020
serializeDataIfNeeded,
2121
toPathString,
2222
createRequestFunction,
23+
createStreamingRequestFunction,
2324
RequestArgs,
2425
CallResult,
2526
PromiseResult
@@ -383,6 +384,45 @@ export const OpenFgaApiAxiosParamCreator = function (configuration: Configuratio
383384
options: localVarRequestOptions,
384385
};
385386
},
387+
/**
388+
* The Streamed ListObjects API is very similar to the ListObjects API, with two differences:
389+
* 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected.
390+
* 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE.
391+
* @summary Stream all objects of the given type that the user has a relation with
392+
* @param {string} storeId
393+
* @param {ListObjectsRequest} body
394+
* @param {*} [options] Override http request option.
395+
* @throws { FgaError }
396+
*/
397+
streamedListObjects: (storeId: string, body: ListObjectsRequest, options: any = {}): RequestArgs => {
398+
// verify required parameter 'storeId' is not null or undefined
399+
assertParamExists("streamedListObjects", "storeId", storeId);
400+
// verify required parameter 'body' is not null or undefined
401+
assertParamExists("streamedListObjects", "body", body);
402+
const localVarPath = "/stores/{store_id}/streamed-list-objects"
403+
.replace(`{${"store_id"}}`, encodeURIComponent(String(storeId)));
404+
// use dummy base URL string because the URL constructor only accepts absolute URLs.
405+
const localVarUrlObj = new URL(localVarPath, DUMMY_BASE_URL);
406+
let baseOptions;
407+
if (configuration) {
408+
baseOptions = configuration.baseOptions;
409+
}
410+
411+
const localVarRequestOptions = { method: "POST", ...baseOptions, ...options };
412+
const localVarHeaderParameter = {} as any;
413+
const localVarQueryParameter = {} as any;
414+
415+
localVarHeaderParameter["Content-Type"] = "application/json";
416+
417+
setSearchParams(localVarUrlObj, localVarQueryParameter, options.query);
418+
localVarRequestOptions.headers = { ...localVarHeaderParameter, ...options.headers };
419+
localVarRequestOptions.data = serializeDataIfNeeded(body, localVarRequestOptions);
420+
421+
return {
422+
url: toPathString(localVarUrlObj),
423+
options: localVarRequestOptions,
424+
};
425+
},
386426
/**
387427
* Returns a paginated list of OpenFGA stores and a continuation token to get additional stores. The continuation token will be empty if there are no more stores.
388428
* @summary List all stores
@@ -912,6 +952,22 @@ export const OpenFgaApiFp = function(configuration: Configuration, credentials:
912952
...TelemetryAttributes.fromRequestBody(body)
913953
});
914954
},
955+
/**
956+
* The Streamed ListObjects API is very similar to the ListObjects API, with two differences:
957+
* 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected.
958+
* 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE.
959+
* @summary Stream all objects of the given type that the user has a relation with
960+
* @param {string} storeId
961+
* @param {ListObjectsRequest} body
962+
* @param {*} [options] Override http request option.
963+
* @throws { FgaError }
964+
*/
965+
async streamedListObjects(storeId: string, body: ListObjectsRequest, options?: any): Promise<(axios?: AxiosInstance) => Promise<any>> {
966+
const localVarAxiosArgs = localVarAxiosParamCreator.streamedListObjects(storeId, body, options);
967+
return createStreamingRequestFunction(localVarAxiosArgs, globalAxios, configuration, credentials, {
968+
[TelemetryAttribute.FgaClientRequestMethod]: "StreamedListObjects"
969+
});
970+
},
915971
/**
916972
* Returns a paginated list of OpenFGA stores and a continuation token to get additional stores. The continuation token will be empty if there are no more stores.
917973
* @summary List all stores
@@ -1156,6 +1212,19 @@ export const OpenFgaApiFactory = function (configuration: Configuration, credent
11561212
listObjects(storeId: string, body: ListObjectsRequest, options?: any): PromiseResult<ListObjectsResponse> {
11571213
return localVarFp.listObjects(storeId, body, options).then((request) => request(axios));
11581214
},
1215+
/**
1216+
* The Streamed ListObjects API is very similar to the ListObjects API, with two differences:
1217+
* 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected.
1218+
* 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE.
1219+
* @summary Stream all objects of the given type that the user has a relation with
1220+
* @param {string} storeId
1221+
* @param {ListObjectsRequest} body
1222+
* @param {*} [options] Override http request option.
1223+
* @throws { FgaError }
1224+
*/
1225+
streamedListObjects(storeId: string, body: ListObjectsRequest, options?: any): Promise<any> {
1226+
return localVarFp.streamedListObjects(storeId, body, options).then((request) => request(axios));
1227+
},
11591228
/**
11601229
* Returns a paginated list of OpenFGA stores and a continuation token to get additional stores. The continuation token will be empty if there are no more stores.
11611230
* @summary List all stores
@@ -1370,6 +1439,20 @@ export class OpenFgaApi extends BaseAPI {
13701439
return OpenFgaApiFp(this.configuration, this.credentials).listObjects(storeId, body, options).then((request) => request(this.axios));
13711440
}
13721441

1442+
/**
1443+
* The Streamed ListObjects API is very similar to the ListObjects API, with two differences:
1444+
* 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected.
1445+
* 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE.
1446+
* @summary Stream all objects of the given type that the user has a relation with
1447+
* @param {string} storeId
1448+
* @param {ListObjectsRequest} body
1449+
* @param {*} [options] Override http request option.
1450+
* @throws { FgaError }
1451+
*/
1452+
public streamedListObjects(storeId: string, body: ListObjectsRequest, options?: any): Promise<any> {
1453+
return OpenFgaApiFp(this.configuration, this.credentials).streamedListObjects(storeId, body, options).then((request) => request(this.axios));
1454+
}
1455+
13731456
/**
13741457
* Returns a paginated list of OpenFGA stores and a continuation token to get additional stores. The continuation token will be empty if there are no more stores.
13751458
* @summary List all stores

apiModel.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -860,6 +860,21 @@ export interface ListObjectsResponse {
860860
*/
861861
objects: Array<string>;
862862
}
863+
864+
/**
865+
* The response for a StreamedListObjects RPC.
866+
* @export
867+
* @interface StreamedListObjectsResponse
868+
*/
869+
export interface StreamedListObjectsResponse {
870+
/**
871+
*
872+
* @type {string}
873+
* @memberof StreamedListObjectsResponse
874+
*/
875+
object: string;
876+
}
877+
863878
/**
864879
*
865880
* @export

client.ts

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import {
2121
GetStoreResponse,
2222
ListObjectsRequest,
2323
ListObjectsResponse,
24+
StreamedListObjectsResponse,
2425
ListStoresResponse,
2526
ListUsersRequest,
2627
ListUsersResponse,
@@ -48,6 +49,7 @@ import {
4849
} from "./utils";
4950
import { isWellFormedUlidString } from "./validation";
5051
import SdkConstants from "./constants";
52+
import { parseNDJSONStream } from "./streaming";
5153

5254
export type UserClientConfigurationParams = UserConfigurationParams & {
5355
storeId?: string;
@@ -804,6 +806,51 @@ export class OpenFgaClient extends BaseAPI {
804806
}, options);
805807
}
806808

809+
/**
810+
* StreamedListObjects - Stream all objects of a particular type that the user has a certain relation to (evaluates)
811+
*
812+
* Note: This method is Node.js only. Streams are supported via the axios API layer.
813+
* The response will be streamed as newline-delimited JSON objects.
814+
*
815+
* @param {ClientListObjectsRequest} body
816+
* @param {ClientRequestOptsWithConsistency} [options]
817+
* @param {string} [options.authorizationModelId] - Overrides the authorization model id in the configuration
818+
* @param {object} [options.headers] - Custom headers to send alongside the request
819+
* @param {ConsistencyPreference} [options.consistency] - The consistency preference to use
820+
* @param {object} [options.retryParams] - Override the retry parameters for this request
821+
* @param {number} [options.retryParams.maxRetry] - Override the max number of retries on each API request
822+
* @param {number} [options.retryParams.minWaitInMs] - Override the minimum wait before a retry is initiated
823+
* @returns {AsyncGenerator<StreamedListObjectsResponse>} An async generator that yields objects as they are received
824+
*/
825+
async *streamedListObjects(body: ClientListObjectsRequest, options: ClientRequestOptsWithConsistency = {}): AsyncGenerator<StreamedListObjectsResponse> {
826+
const stream = await this.api.streamedListObjects(this.getStoreId(options)!, {
827+
authorization_model_id: this.getAuthorizationModelId(options),
828+
user: body.user,
829+
relation: body.relation,
830+
type: body.type,
831+
context: body.context,
832+
contextual_tuples: { tuple_keys: body.contextualTuples || [] },
833+
consistency: options.consistency
834+
}, options);
835+
836+
// Unwrap axios CallResult to get the raw Node.js stream when needed
837+
const source = stream?.$response?.data ?? stream;
838+
839+
// Parse the Node.js stream
840+
try {
841+
for await (const item of parseNDJSONStream(source as any)) {
842+
if (item && item.result && item.result.object) {
843+
yield { object: item.result.object } as StreamedListObjectsResponse;
844+
}
845+
}
846+
} finally {
847+
// Ensure underlying HTTP connection closes if consumer stops early
848+
if (source && typeof source.destroy === "function") {
849+
try { source.destroy(); } catch { }
850+
}
851+
}
852+
}
853+
807854
/**
808855
* ListRelations - List all the relations a user has with an object (evaluates)
809856
* @param {object} listRelationsRequest

common.ts

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,77 @@ export const createRequestFunction = function (axiosArgs: RequestArgs, axiosInst
342342
);
343343
}
344344

345+
return result;
346+
};
347+
};
348+
349+
/**
350+
* creates an axios streaming request function that returns the raw response stream
351+
* for incremental parsing (used by streamedListObjects)
352+
*/
353+
export const createStreamingRequestFunction = function (axiosArgs: RequestArgs, axiosInstance: AxiosInstance, configuration: Configuration, credentials: Credentials, methodAttributes: Record<string, string | number> = {}) {
354+
configuration.isValid();
355+
356+
const retryParams = axiosArgs.options?.retryParams ? axiosArgs.options?.retryParams : configuration.retryParams;
357+
const maxRetry: number = retryParams ? retryParams.maxRetry : 0;
358+
const minWaitInMs: number = retryParams ? retryParams.minWaitInMs : 0;
359+
360+
const start = performance.now();
361+
362+
return async (axios: AxiosInstance = axiosInstance): Promise<any> => {
363+
await setBearerAuthToObject(axiosArgs.options.headers, credentials!);
364+
365+
const url = configuration.getBasePath() + axiosArgs.url;
366+
367+
const axiosRequestArgs = { ...axiosArgs.options, responseType: "stream", url: url };
368+
const wrappedResponse = await attemptHttpRequest(axiosRequestArgs, {
369+
maxRetry,
370+
minWaitInMs,
371+
}, axios);
372+
const response = wrappedResponse?.response;
373+
374+
const result: any = response?.data; // raw stream
375+
376+
let attributes: StringIndexable = {};
377+
378+
attributes = TelemetryAttributes.fromRequest({
379+
userAgent: configuration.baseOptions?.headers["User-Agent"],
380+
httpMethod: axiosArgs.options?.method,
381+
url,
382+
resendCount: wrappedResponse?.retries,
383+
start: start,
384+
credentials: credentials,
385+
attributes: methodAttributes,
386+
});
387+
388+
attributes = TelemetryAttributes.fromResponse({
389+
response,
390+
attributes,
391+
});
392+
393+
const serverRequestDuration = attributes[TelemetryAttribute.HttpServerRequestDuration];
394+
if (configuration.telemetry?.metrics?.histogramQueryDuration && typeof serverRequestDuration !== "undefined") {
395+
configuration.telemetry.recorder.histogram(
396+
TelemetryHistograms.queryDuration,
397+
parseInt(attributes[TelemetryAttribute.HttpServerRequestDuration] as string, 10),
398+
TelemetryAttributes.prepare(
399+
attributes,
400+
configuration.telemetry.metrics.histogramQueryDuration.attributes
401+
)
402+
);
403+
}
404+
405+
if (configuration.telemetry?.metrics?.histogramRequestDuration) {
406+
configuration.telemetry.recorder.histogram(
407+
TelemetryHistograms.requestDuration,
408+
attributes[TelemetryAttribute.HttpClientRequestDuration],
409+
TelemetryAttributes.prepare(
410+
attributes,
411+
configuration.telemetry.metrics.histogramRequestDuration.attributes
412+
)
413+
);
414+
}
415+
345416
return result;
346417
};
347418
};
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Streamed List Objects (Local)
2+
3+
This example demonstrates using the js-sdk `streamedListObjects` API against a locally running OpenFGA server that you manage yourself.
4+
5+
Prerequisites:
6+
- Node.js 18+
7+
- An OpenFGA server reachable at `FGA_API_URL` (defaults to `http://localhost:8080`)
8+
9+
Run:
10+
1. From repo root, build the SDK once:
11+
- `npm run build`
12+
2. Set the API URL (optional) and run the example:
13+
- `cd example/streamed-list-objects-local`
14+
- `FGA_API_URL=http://localhost:8080 node streamedListObjectsLocal.mjs`
15+
16+
What it does:
17+
- Creates a temporary store
18+
- Writes a schema 1.1 model with an assignable relation
19+
- Inserts 3 tuples
20+
- Streams them via `streamedListObjects`
21+
- Cleans up the store
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import { ClientConfiguration, OpenFgaClient, ConsistencyPreference } from "../../dist/index.js";
2+
3+
const apiUrl = process.env.FGA_API_URL || "http://localhost:8080";
4+
5+
async function main() {
6+
const client = new OpenFgaClient(new ClientConfiguration({ apiUrl }));
7+
8+
console.log("Creating temporary store");
9+
const { id: storeId } = await client.createStore({ name: "streamed-list-objects-local" });
10+
11+
const clientWithStore = new OpenFgaClient(new ClientConfiguration({ apiUrl, storeId }));
12+
13+
const model = {
14+
schema_version: "1.1",
15+
type_definitions: [
16+
{ type: "user" },
17+
{
18+
type: "document",
19+
relations: { can_read: { this: {} } },
20+
metadata: {
21+
relations: {
22+
can_read: {
23+
directly_related_user_types: [{ type: "user" }]
24+
}
25+
}
26+
}
27+
}
28+
]
29+
};
30+
31+
console.log("Writing authorization model");
32+
const { authorization_model_id: authorizationModelId } = await clientWithStore.writeAuthorizationModel(model);
33+
34+
const fga = new OpenFgaClient(new ClientConfiguration({ apiUrl, storeId, authorizationModelId }));
35+
36+
console.log("Writing tuples");
37+
await fga.write({
38+
writes: [
39+
{ user: "user:anne", relation: "can_read", object: "document:1" },
40+
{ user: "user:anne", relation: "can_read", object: "document:2" },
41+
{ user: "user:anne", relation: "can_read", object: "document:3" }
42+
]
43+
});
44+
45+
console.log("Streaming objects...");
46+
let count = 0;
47+
for await (const _ of fga.streamedListObjects(
48+
{ user: "user:anne", relation: "can_read", type: "document" },
49+
{ consistency: ConsistencyPreference.HigherConsistency }
50+
)) {
51+
count++;
52+
}
53+
console.log(`\u2713 Streamed count: ${count}`);
54+
55+
console.log("Cleaning up...");
56+
await fga.deleteStore();
57+
console.log("Done");
58+
}
59+
60+
main().catch(_err => {
61+
process.exit(1);
62+
});

0 commit comments

Comments
 (0)