Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .openapi-generator/FILES
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ docs/WriteAuthorizationModelResponse.md
docs/WriteRequest.md
docs/WriteRequestDeletes.md
docs/WriteRequestWrites.md
src/main/java/dev/openfga/sdk/api/BaseStreamingApi.java
src/main/java/dev/openfga/sdk/api/OpenFgaApi.java
src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java
src/main/java/dev/openfga/sdk/api/model/AbortedMessageResponse.java
src/main/java/dev/openfga/sdk/api/model/AbstractOpenApiSchema.java
src/main/java/dev/openfga/sdk/api/model/Any.java
Expand Down Expand Up @@ -157,6 +159,7 @@ src/main/java/dev/openfga/sdk/api/model/RelationshipCondition.java
src/main/java/dev/openfga/sdk/api/model/SourceInfo.java
src/main/java/dev/openfga/sdk/api/model/Status.java
src/main/java/dev/openfga/sdk/api/model/Store.java
src/main/java/dev/openfga/sdk/api/model/StreamResult.java
src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java
src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java
src/main/java/dev/openfga/sdk/api/model/Tuple.java
Expand Down
4 changes: 2 additions & 2 deletions examples/streamed-list-objects/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ openfga_version=latest
language=java

build:
./gradlew -P language=$(language) build
../../gradlew -P language=$(language) build

run:
./gradlew -P language=$(language) run
../../gradlew -P language=$(language) run

run-openfga:
docker pull docker.io/openfga/openfga:${openfga_version} && \
Expand Down
182 changes: 182 additions & 0 deletions src/main/java/dev/openfga/sdk/api/BaseStreamingApi.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* OpenFGA
* A high performance and flexible authorization/permission engine built for developers and inspired by Google Zanzibar.
*
* The version of the OpenAPI document: 1.x
* Contact: [email protected]
*
* NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
* https://openapi-generator.tech
* Do not edit the class manually.
*/

package dev.openfga.sdk.api;

import static dev.openfga.sdk.util.StringUtil.isNullOrWhitespace;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import dev.openfga.sdk.api.client.ApiClient;
import dev.openfga.sdk.api.configuration.Configuration;
import dev.openfga.sdk.api.model.Status;
import dev.openfga.sdk.api.model.StreamResult;
import dev.openfga.sdk.errors.ApiException;
import dev.openfga.sdk.errors.FgaInvalidParameterException;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Stream;

/**
* Base class for handling streaming API responses.
* This class provides generic streaming functionality that can be reused across
* different streaming endpoints by handling the common streaming parsing and error handling logic.
*
* @param <T> The type of response objects in the stream
*/
public abstract class BaseStreamingApi<T> {
protected final Configuration configuration;
protected final ApiClient apiClient;
protected final ObjectMapper objectMapper;
protected final TypeReference<StreamResult<T>> streamResultTypeRef;

/**
* Constructor for BaseStreamingApi
*
* @param configuration The API configuration
* @param apiClient The API client for making HTTP requests
* @param streamResultTypeRef TypeReference for deserializing StreamResult<T>
*/
protected BaseStreamingApi(
Configuration configuration, ApiClient apiClient, TypeReference<StreamResult<T>> streamResultTypeRef) {
this.configuration = configuration;
this.apiClient = apiClient;
this.objectMapper = apiClient.getObjectMapper();
this.streamResultTypeRef = streamResultTypeRef;
}

/**
* Process a streaming response asynchronously.
* Each line in the response is parsed and delivered to the consumer callback.
*
* @param request The HTTP request to execute
* @param consumer Callback to handle each response object (invoked asynchronously)
* @param errorConsumer Optional callback to handle errors during streaming
* @return CompletableFuture<Void> that completes when streaming finishes
*/
protected CompletableFuture<Void> processStreamingResponse(
HttpRequest request, Consumer<T> consumer, Consumer<Throwable> errorConsumer) {

// Use async HTTP client with streaming body handler
// ofLines() provides line-by-line streaming
return apiClient
.getHttpClient()
.sendAsync(request, HttpResponse.BodyHandlers.ofLines())
.thenCompose(response -> {
// Check response status
int statusCode = response.statusCode();
if (statusCode < 200 || statusCode >= 300) {
ApiException apiException =
new ApiException(statusCode, "API error: " + statusCode, response.headers(), null);
return CompletableFuture.failedFuture(apiException);
}

// Process the stream - this runs on HttpClient's executor thread
try (Stream<String> lines = response.body()) {
lines.forEach(line -> {
if (!isNullOrWhitespace(line)) {
processLine(line, consumer, errorConsumer);
}
});
return CompletableFuture.completedFuture((Void) null);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
})
.handle((result, throwable) -> {
if (throwable != null) {
// Unwrap CompletionException to get the original exception
Throwable actualException = throwable;
if (throwable instanceof java.util.concurrent.CompletionException
&& throwable.getCause() != null) {
actualException = throwable.getCause();
}

if (errorConsumer != null) {
errorConsumer.accept(actualException);
}
// Re-throw to keep the CompletableFuture in failed state
if (actualException instanceof RuntimeException) {
throw (RuntimeException) actualException;
}
throw new RuntimeException(actualException);
}
return result;
});
}

/**
* Process a single line from the stream
*
* @param line The JSON line to process
* @param consumer Callback to handle the parsed result
* @param errorConsumer Optional callback to handle errors
*/
private void processLine(String line, Consumer<T> consumer, Consumer<Throwable> errorConsumer) {
try {
// Parse the JSON line to extract the object
StreamResult<T> streamResult = objectMapper.readValue(line, streamResultTypeRef);

if (streamResult.getError() != null) {
// Handle error in stream
if (errorConsumer != null) {
Status error = streamResult.getError();
String errorMessage = error.getMessage() != null
? "Stream error: " + error.getMessage()
: "Stream error: " + (error.getCode() != null ? "code " + error.getCode() : "unknown");
errorConsumer.accept(new ApiException(errorMessage));
}
} else if (streamResult.getResult() != null) {
// Deliver the response object to the consumer
T result = streamResult.getResult();
if (result != null) {
consumer.accept(result);
}
}
} catch (Exception e) {
if (errorConsumer != null) {
errorConsumer.accept(e);
}
}
}

/**
* Build an HTTP request for the streaming endpoint
*
* @param method HTTP method (e.g., "POST")
* @param path The API path
* @param body The request body
* @param configuration The configuration to use
* @return HttpRequest ready to execute
* @throws ApiException if request building fails
* @throws FgaInvalidParameterException if parameters are invalid
*/
protected HttpRequest buildHttpRequest(String method, String path, Object body, Configuration configuration)
throws ApiException, FgaInvalidParameterException {
try {
byte[] bodyBytes = objectMapper.writeValueAsBytes(body);
HttpRequest.Builder requestBuilder = ApiClient.requestBuilder(method, path, bodyBytes, configuration);

// Apply request interceptors if any
var interceptor = apiClient.getRequestInterceptor();
if (interceptor != null) {
interceptor.accept(requestBuilder);
}

return requestBuilder.build();
} catch (Exception e) {
throw new ApiException(e);
}
}
}
133 changes: 19 additions & 114 deletions src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java
Original file line number Diff line number Diff line change
@@ -1,39 +1,42 @@
/*
* OpenFGA
* A high performance and flexible authorization/permission engine built for developers and inspired by Google Zanzibar.
*
* The version of the OpenAPI document: 1.x
* Contact: [email protected]
*
* NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
* https://openapi-generator.tech
* Do not edit the class manually.
*/

package dev.openfga.sdk.api;

import static dev.openfga.sdk.util.StringUtil.isNullOrWhitespace;
import static dev.openfga.sdk.util.Validation.assertParamExists;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.core.type.TypeReference;
import dev.openfga.sdk.api.client.ApiClient;
import dev.openfga.sdk.api.configuration.Configuration;
import dev.openfga.sdk.api.configuration.ConfigurationOverride;
import dev.openfga.sdk.api.model.ListObjectsRequest;
import dev.openfga.sdk.api.model.Status;
import dev.openfga.sdk.api.model.StreamResultOfStreamedListObjectsResponse;
import dev.openfga.sdk.api.model.StreamResult;
import dev.openfga.sdk.api.model.StreamedListObjectsResponse;
import dev.openfga.sdk.errors.ApiException;
import dev.openfga.sdk.errors.FgaInvalidParameterException;
import dev.openfga.sdk.util.StringUtil;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Stream;

/**
* API layer for handling streaming responses from the streamedListObjects endpoint.
* This class provides true asynchronous streaming with consumer callbacks using CompletableFuture
* and Java 11's HttpClient async streaming capabilities.
* This class extends BaseStreamingApi to provide true asynchronous streaming with consumer callbacks
* using CompletableFuture and Java 11's HttpClient async streaming capabilities.
*/
public class StreamedListObjectsApi {
private final Configuration configuration;
private final ApiClient apiClient;
private final ObjectMapper objectMapper;
public class StreamedListObjectsApi extends BaseStreamingApi<StreamedListObjectsResponse> {

public StreamedListObjectsApi(Configuration configuration, ApiClient apiClient) {
this.configuration = configuration;
this.apiClient = apiClient;
this.objectMapper = apiClient.getObjectMapper();
super(configuration, apiClient, new TypeReference<StreamResult<StreamedListObjectsResponse>>() {});
}

/**
Expand Down Expand Up @@ -142,110 +145,12 @@ private CompletableFuture<Void> streamedListObjects(

try {
HttpRequest request = buildHttpRequest("POST", path, body, configuration);

// Use async HTTP client with streaming body handler
// ofLines() provides line-by-line streaming which is perfect for NDJSON
return apiClient
.getHttpClient()
.sendAsync(request, HttpResponse.BodyHandlers.ofLines())
.thenCompose(response -> {
// Check response status
int statusCode = response.statusCode();
if (statusCode < 200 || statusCode >= 300) {
ApiException apiException =
new ApiException(statusCode, "API error: " + statusCode, response.headers(), null);
return CompletableFuture.failedFuture(apiException);
}

// Process the stream - this runs on HttpClient's executor thread
try (Stream<String> lines = response.body()) {
lines.forEach(line -> {
if (!isNullOrWhitespace(line)) {
processLine(line, consumer, errorConsumer);
}
});
return CompletableFuture.completedFuture((Void) null);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
})
.handle((result, throwable) -> {
if (throwable != null) {
// Unwrap CompletionException to get the original exception
Throwable actualException = throwable;
if (throwable instanceof java.util.concurrent.CompletionException
&& throwable.getCause() != null) {
actualException = throwable.getCause();
}

if (errorConsumer != null) {
errorConsumer.accept(actualException);
}
// Re-throw to keep the CompletableFuture in failed state
if (actualException instanceof RuntimeException) {
throw (RuntimeException) actualException;
}
throw new RuntimeException(actualException);
}
return result;
});

return processStreamingResponse(request, consumer, errorConsumer);
} catch (Exception e) {
if (errorConsumer != null) {
errorConsumer.accept(e);
}
return CompletableFuture.failedFuture(e);
}
}

/**
* Process a single line from the NDJSON stream
*/
private void processLine(
String line, Consumer<StreamedListObjectsResponse> consumer, Consumer<Throwable> errorConsumer) {
try {
// Parse the JSON line to extract the object
StreamResultOfStreamedListObjectsResponse streamResult =
objectMapper.readValue(line, StreamResultOfStreamedListObjectsResponse.class);

if (streamResult.getError() != null) {
// Handle error in stream
if (errorConsumer != null) {
Status error = streamResult.getError();
String errorMessage = error.getMessage() != null
? "Stream error: " + error.getMessage()
: "Stream error: " + (error.getCode() != null ? "code " + error.getCode() : "unknown");
errorConsumer.accept(new ApiException(errorMessage));
}
} else if (streamResult.getResult() != null) {
// Deliver the response object to the consumer
StreamedListObjectsResponse result = streamResult.getResult();
if (result != null) {
consumer.accept(result);
}
}
} catch (Exception e) {
if (errorConsumer != null) {
errorConsumer.accept(e);
}
}
}

private HttpRequest buildHttpRequest(String method, String path, Object body, Configuration configuration)
throws ApiException, FgaInvalidParameterException {
try {
byte[] bodyBytes = objectMapper.writeValueAsBytes(body);
HttpRequest.Builder requestBuilder = ApiClient.requestBuilder(method, path, bodyBytes, configuration);

// Apply request interceptors if any
var interceptor = apiClient.getRequestInterceptor();
if (interceptor != null) {
interceptor.accept(requestBuilder);
}

return requestBuilder.build();
} catch (Exception e) {
throw new ApiException(e);
}
}
}
Loading
Loading