Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions config/clients/js/config.overrides.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"fossaComplianceNoticeId": "9c7d9da4-2a75-47c9-bfc9-31b301fb764f",
"useSingleRequestParameter": false,
"supportsES6": true,
"supportsStreamedListObjects": true,
"modelPropertyNaming": "original",
"openTelemetryDocumentation": "docs/opentelemetry.md",
"files": {
Expand All @@ -23,6 +24,10 @@
"npmrc.mustache": {
"destinationFilename": ".npmrc",
"templateType": "SupportingFiles"
},
"streaming.mustache": {
"destinationFilename": "streaming.ts",
"templateType": "SupportingFiles"
}
}
}
3 changes: 2 additions & 1 deletion config/clients/js/template/README_api_endpoints.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@
| [**BatchCheck**]({{apiDocsUrl}}#/Relationship%20Queries/BatchCheck) | **POST** /stores/{store_id}/batch-check | Similar to check, but accepts a list of relations to check |
| [**Expand**]({{apiDocsUrl}}#/Relationship%20Queries/Expand) | **POST** /stores/{store_id}/expand | Expand all relationships in userset tree format, and following userset rewrite rules. Useful to reason about and debug a certain relationship |
| [**ListObjects**]({{apiDocsUrl}}#/Relationship%20Queries/ListObjects) | **POST** /stores/{store_id}/list-objects | [EXPERIMENTAL] Get all objects of the given type that the user has a relation with |
| [**ReadAssertions**]({{apiDocsUrl}}#/Assertions/ReadAssertions) | **GET** /stores/{store_id}/assertions/{authorization_model_id} | Read assertions for an authorization model ID |
{{#supportsStreamedListObjects}}| [**StreamedListObjects**]({{apiDocsUrl}}#/Relationship%20Queries/StreamedListObjects) | **POST** /stores/{store_id}/streamed-list-objects | Stream all objects of the given type that the user has a relation with (Node.js only) |
{{/supportsStreamedListObjects}}| [**ReadAssertions**]({{apiDocsUrl}}#/Assertions/ReadAssertions) | **GET** /stores/{store_id}/assertions/{authorization_model_id} | Read assertions for an authorization model ID |
| [**WriteAssertions**]({{apiDocsUrl}}#/Assertions/WriteAssertions) | **PUT** /stores/{store_id}/assertions/{authorization_model_id} | Upsert assertions for an authorization model ID |
22 changes: 22 additions & 0 deletions config/clients/js/template/README_calling_api.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,28 @@ const response = await fgaClient.listObjects({
// response.objects = ["document:0192ab2a-d83f-756d-9397-c5ed9f3cb69a"]
```

{{#supportsStreamedListObjects}}
##### Streamed List Objects

List the objects of a particular type that the user has a certain relation to, using the streaming API.

> **Note**: This is a Node.js-only feature. The streaming API allows you to retrieve more than the standard 1000 object limit.

[API Documentation]({{apiDocsUrl}}#/Relationship%20Queries/StreamedListObjects)

```javascript
const fgaClient = new OpenFgaClient({ apiUrl: "http://localhost:8080", storeId: "01H0H015178Y2V4CX10C2KGHF4" });

for await (const response of fgaClient.streamedListObjects({
user: "user:anne",
relation: "owner",
type: "document"
})) {
console.log(response.object);
}
```

{{/supportsStreamedListObjects}}
##### List Relations

List the relations a user has with an object. This wraps around [BatchCheck](#batchcheck) to allow checking multiple relationships at once.
Expand Down
1 change: 1 addition & 0 deletions config/clients/js/template/index.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ export * from "./telemetry/counters";
export * from "./telemetry/histograms";
export * from "./telemetry/metrics";
export * from "./errors";
export { parseNDJSONStream } from "./streaming";

{{#withSeparateModelsAndApi}}export * from "./{{tsModelPackage}}";{{/withSeparateModelsAndApi}}
176 changes: 176 additions & 0 deletions config/clients/js/template/streaming.mustache
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
{{>partial_header}}

import type { Readable } from "node:stream";

// Helper: create async iterable from classic EventEmitter-style Readable streams
Copy link
Member

@SoulPancake SoulPancake Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need this file anymore in the generator, since this would require reverse synching it back from the SDK, we can just do the changes in the SDK directly
Unless you're using something from the generator here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/openfga/js-sdk/blob/0c2a0ab16995585d98582f32efec8b82090a1249/streaming.ts
If this is using some config/constants that should come from the generator, maybe then we can continue having it here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I included it, so that all files would come from the generator. That said, looks like the need for the generator has been simplified and this can be removed.

const createAsyncIterableFromReadable = (readable: any): AsyncIterable<any> => {
return {
[Symbol.asyncIterator](): AsyncIterator<any> {
const chunkQueue: any[] = [];
const pendings: Array<{ resolve: (v: IteratorResult<any>) => void; reject: (e?: any) => void }> = [];
let ended = false;
let error: any = null;

const onData = (chunk: any) => {
if (pendings.length > 0) {
const { resolve } = pendings.shift()!;
resolve({ value: chunk, done: false });
} else {
chunkQueue.push(chunk);
}
};

const onEnd = () => {
if (error) return; // Don't process end if error already occurred
ended = true;
while (pendings.length > 0) {
const { resolve } = pendings.shift()!;
resolve({ value: undefined, done: true });
}
};

const onError = (err: any) => {
error = err;
while (pendings.length > 0) {
const { reject } = pendings.shift()!;
reject(err);
}
cleanup();
};

readable.on("data", onData);
readable.once("end", onEnd);
readable.once("error", onError);

const cleanup = () => {
readable.off("data", onData);
readable.off("end", onEnd);
readable.off("error", onError);
};

return {
next(): Promise<IteratorResult<any>> {
if (error) {
return Promise.reject(error);
}
if (chunkQueue.length > 0) {
const value = chunkQueue.shift();
return Promise.resolve({ value, done: false });
}
if (ended) {
cleanup();
return Promise.resolve({ value: undefined, done: true });
}
return new Promise<IteratorResult<any>>((resolve, reject) => {
pendings.push({ resolve, reject });
});
},
return(): Promise<IteratorResult<any>> {
try {
cleanup();
} finally {
if (readable && typeof readable.destroy === "function") {
readable.destroy();
}
}
return Promise.resolve({ value: undefined, done: true });
},
throw(e?: any): Promise<IteratorResult<any>> {
try {
cleanup();
} finally {
if (readable && typeof readable.destroy === "function") {
readable.destroy(e);
}
}
return Promise.reject(e);
}
};
}
};
};

/**
* Parse newline-delimited JSON (NDJSON) from a Node.js readable stream
* @param stream - Node.js readable stream, AsyncIterable, string, or Buffer
* @returns AsyncGenerator that yields parsed JSON objects
*/
export async function* parseNDJSONStream(
stream: Readable | AsyncIterable<Uint8Array | string | Buffer> | string | Uint8Array | Buffer
): AsyncGenerator<any> {
const decoder = new TextDecoder("utf-8");
let buffer = "";

// If stream is actually a string or Buffer-like, handle as whole payload
const isString = typeof stream === "string";
const isBuffer = typeof Buffer !== "undefined" && Buffer.isBuffer && Buffer.isBuffer(stream);
const isUint8Array = typeof Uint8Array !== "undefined" && stream instanceof Uint8Array;

if (isString || isBuffer || isUint8Array) {
const text = isString
? (stream as string)
: decoder.decode(isBuffer ? new Uint8Array(stream as Buffer) : (stream as Uint8Array));
const lines = text.split("\n");

for (const line of lines) {
const trimmed = line.trim();
if (!trimmed) {
continue;
}

try {
yield JSON.parse(trimmed);
} catch (err) {
console.warn("Failed to parse JSON line:", err);
}
}
return;
}

const isAsyncIterable = stream && typeof (stream as any)[Symbol.asyncIterator] === "function";
const source: AsyncIterable<any> = isAsyncIterable ? (stream as any) : createAsyncIterableFromReadable(stream as any);

for await (const chunk of source) {
// Node.js streams can return Buffer or string chunks
// Convert to Uint8Array if needed for TextDecoder
const uint8Chunk = typeof chunk === "string"
? new TextEncoder().encode(chunk)
: chunk instanceof Buffer
? new Uint8Array(chunk)
: chunk;

// Append decoded chunk to buffer
buffer += decoder.decode(uint8Chunk, { stream: true });

// Split on newlines
const lines = buffer.split("\n");

// Keep the last (potentially incomplete) line in the buffer
buffer = lines.pop() || "";

// Parse and yield complete lines
for (const line of lines) {
const trimmed = line.trim();
if (trimmed) {
try {
yield JSON.parse(trimmed);
} catch (err) {
console.warn("Failed to parse JSON line:", err);
}
}
}
}

// Flush any remaining decoder state
buffer += decoder.decode();

// Handle any remaining data in buffer
if (buffer.trim()) {
try {
yield JSON.parse(buffer);
} catch (err) {
console.warn("Failed to parse final JSON buffer:", err);
}
}
}

Loading