Skip to content

Commit 5c73789

Browse files
committed
feat: add common streaming utils that can be reused
1 parent 080eee1 commit 5c73789

File tree

7 files changed

+831
-806
lines changed

7 files changed

+831
-806
lines changed

.openapi-generator/FILES

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,3 +189,5 @@ model_write_authorization_model_response.go
189189
model_write_request.go
190190
model_write_request_deletes.go
191191
model_write_request_writes.go
192+
streaming.go
193+
streaming_test.go

api_open_fga.go

Lines changed: 669 additions & 669 deletions
Large diffs are not rendered by default.

example/example1/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,5 @@ require (
1919
go.opentelemetry.io/otel/trace v1.38.0 // indirect
2020
go.uber.org/atomic v1.7.0 // indirect
2121
go.uber.org/multierr v1.9.0 // indirect
22-
golang.org/x/sync v0.17.0 // indirect
22+
golang.org/x/sync v0.18.0 // indirect
2323
)

example/opentelemetry/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ require (
3131
go.uber.org/atomic v1.7.0 // indirect
3232
go.uber.org/multierr v1.9.0 // indirect
3333
golang.org/x/net v0.43.0 // indirect
34-
golang.org/x/sync v0.17.0 // indirect
34+
golang.org/x/sync v0.18.0 // indirect
3535
golang.org/x/sys v0.35.0 // indirect
3636
golang.org/x/text v0.28.0 // indirect
3737
google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5 // indirect

example/streamed_list_objects/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ require (
2525
go.uber.org/multierr v1.9.0 // indirect
2626
golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e // indirect
2727
golang.org/x/net v0.38.0 // indirect
28-
golang.org/x/sync v0.17.0 // indirect
28+
golang.org/x/sync v0.18.0 // indirect
2929
golang.org/x/sys v0.31.0 // indirect
3030
golang.org/x/text v0.23.0 // indirect
3131
google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 // indirect

streaming.go

Lines changed: 100 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,28 +23,47 @@ import (
2323
"strings"
2424
)
2525

26-
type StreamedListObjectsChannel struct {
27-
Objects chan StreamedListObjectsResponse
26+
// StreamResult represents a generic streaming result wrapper with either a result or an error
27+
type StreamResult[T any] struct {
28+
Result *T `json:"result,omitempty" yaml:"result,omitempty"`
29+
Error *Status `json:"error,omitempty" yaml:"error,omitempty"`
30+
}
31+
32+
// StreamingChannel represents a generic channel for streaming responses
33+
type StreamingChannel[T any] struct {
34+
Results chan T
2835
Errors chan error
2936
cancel context.CancelFunc
3037
}
3138

32-
func (s *StreamedListObjectsChannel) Close() {
39+
// Close cancels the streaming context and cleans up resources
40+
func (s *StreamingChannel[T]) Close() {
3341
if s.cancel != nil {
3442
s.cancel()
3543
}
3644
}
3745

38-
func ProcessStreamedListObjectsResponse(ctx context.Context, httpResponse *http.Response, bufferSize int) (*StreamedListObjectsChannel, error) {
46+
// ProcessStreamingResponse processes an HTTP response as a streaming NDJSON response
47+
// and returns a StreamingChannel with results and errors
48+
//
49+
// Parameters:
50+
// - ctx: The context for cancellation
51+
// - httpResponse: The HTTP response to process
52+
// - bufferSize: The buffer size for the channels (default 10 if <= 0)
53+
//
54+
// Returns:
55+
// - *StreamingChannel[T]: A channel containing streaming results and errors
56+
// - error: An error if the response is invalid
57+
func ProcessStreamingResponse[T any](ctx context.Context, httpResponse *http.Response, bufferSize int) (*StreamingChannel[T], error) {
3958
streamCtx, cancel := context.WithCancel(ctx)
4059

4160
// Use default buffer size of 10 if not specified or invalid
4261
if bufferSize <= 0 {
4362
bufferSize = 10
4463
}
4564

46-
channel := &StreamedListObjectsChannel{
47-
Objects: make(chan StreamedListObjectsResponse, bufferSize),
65+
channel := &StreamingChannel[T]{
66+
Results: make(chan T, bufferSize),
4867
Errors: make(chan error, 1),
4968
cancel: cancel,
5069
}
@@ -55,20 +74,16 @@ func ProcessStreamedListObjectsResponse(ctx context.Context, httpResponse *http.
5574
}
5675

5776
go func() {
58-
defer close(channel.Objects)
77+
defer close(channel.Results)
5978
defer close(channel.Errors)
6079
defer cancel()
61-
defer func(Body io.ReadCloser) {
62-
err := Body.Close()
63-
if err != nil {
64-
channel.Errors <- err
65-
}
66-
}(httpResponse.Body)
80+
defer func() { _ = httpResponse.Body.Close() }()
6781

6882
scanner := bufio.NewScanner(httpResponse.Body)
6983
// Allow large NDJSON entries (up to 10MB). Tune as needed.
7084
buf := make([]byte, 0, 64*1024)
7185
scanner.Buffer(buf, 10*1024*1024)
86+
7287
for scanner.Scan() {
7388
select {
7489
case <-streamCtx.Done():
@@ -80,7 +95,7 @@ func ProcessStreamedListObjectsResponse(ctx context.Context, httpResponse *http.
8095
continue
8196
}
8297

83-
var streamResult StreamResultOfStreamedListObjectsResponse
98+
var streamResult StreamResult[T]
8499
if err := json.Unmarshal(line, &streamResult); err != nil {
85100
channel.Errors <- err
86101
return
@@ -100,7 +115,7 @@ func ProcessStreamedListObjectsResponse(ctx context.Context, httpResponse *http.
100115
case <-streamCtx.Done():
101116
channel.Errors <- streamCtx.Err()
102117
return
103-
case channel.Objects <- *streamResult.Result:
118+
case channel.Results <- *streamResult.Result:
104119
}
105120
}
106121
}
@@ -119,16 +134,83 @@ func ProcessStreamedListObjectsResponse(ctx context.Context, httpResponse *http.
119134
return channel, nil
120135
}
121136

137+
// StreamedListObjectsChannel maintains backward compatibility with the old channel structure
138+
type StreamedListObjectsChannel struct {
139+
Objects chan StreamedListObjectsResponse
140+
Errors chan error
141+
cancel context.CancelFunc
142+
}
143+
144+
// Close cancels the streaming context and cleans up resources
145+
func (s *StreamedListObjectsChannel) Close() {
146+
if s.cancel != nil {
147+
s.cancel()
148+
}
149+
}
150+
151+
// ProcessStreamedListObjectsResponse processes a StreamedListObjects response
152+
// This is a backward compatibility wrapper around ProcessStreamingResponse
153+
func ProcessStreamedListObjectsResponse(ctx context.Context, httpResponse *http.Response, bufferSize int) (*StreamedListObjectsChannel, error) {
154+
channel, err := ProcessStreamingResponse[StreamedListObjectsResponse](ctx, httpResponse, bufferSize)
155+
if err != nil {
156+
return nil, err
157+
}
158+
159+
// Create a new channel with the old field name for backward compatibility
160+
compatChannel := &StreamedListObjectsChannel{
161+
Objects: channel.Results,
162+
Errors: channel.Errors,
163+
cancel: channel.cancel,
164+
}
165+
166+
return compatChannel, nil
167+
}
168+
169+
// ExecuteStreamedListObjects executes a StreamedListObjects request
122170
func ExecuteStreamedListObjects(client *APIClient, ctx context.Context, storeId string, body ListObjectsRequest, options RequestOptions) (*StreamedListObjectsChannel, error) {
123171
return ExecuteStreamedListObjectsWithBufferSize(client, ctx, storeId, body, options, 0)
124172
}
125173

174+
// ExecuteStreamedListObjectsWithBufferSize executes a StreamedListObjects request with a custom buffer size
126175
func ExecuteStreamedListObjectsWithBufferSize(client *APIClient, ctx context.Context, storeId string, body ListObjectsRequest, options RequestOptions, bufferSize int) (*StreamedListObjectsChannel, error) {
127-
path := "/stores/{store_id}/streamed-list-objects"
176+
channel, err := executeStreamingRequest[ListObjectsRequest, StreamedListObjectsResponse](
177+
client,
178+
ctx,
179+
"/stores/{store_id}/streamed-list-objects",
180+
storeId,
181+
body,
182+
options,
183+
bufferSize,
184+
"StreamedListObjects",
185+
)
186+
if err != nil {
187+
return nil, err
188+
}
189+
190+
// Convert to backward-compatible channel structure
191+
return &StreamedListObjectsChannel{
192+
Objects: channel.Results,
193+
Errors: channel.Errors,
194+
cancel: channel.cancel,
195+
}, nil
196+
}
197+
198+
// executeStreamingRequest is a generic function to execute streaming requests
199+
func executeStreamingRequest[TReq any, TRes any](
200+
client *APIClient,
201+
ctx context.Context,
202+
pathTemplate string,
203+
storeId string,
204+
body TReq,
205+
options RequestOptions,
206+
bufferSize int,
207+
operationName string,
208+
) (*StreamingChannel[TRes], error) {
128209
if storeId == "" {
129210
return nil, reportError("storeId is required and must be specified")
130211
}
131212

213+
path := pathTemplate
132214
path = strings.ReplaceAll(path, "{"+"store_id"+"}", url.PathEscape(parameterToString(storeId, "")))
133215

134216
localVarHeaderParams := make(map[string]string)
@@ -158,9 +240,9 @@ func ExecuteStreamedListObjectsWithBufferSize(client *APIClient, ctx context.Con
158240
if readErr != nil {
159241
return nil, readErr
160242
}
161-
err = client.handleAPIError(httpResponse, responseBody, body, "StreamedListObjects", storeId)
243+
err = client.handleAPIError(httpResponse, responseBody, body, operationName, storeId)
162244
return nil, err
163245
}
164246

165-
return ProcessStreamedListObjectsResponse(ctx, httpResponse, bufferSize)
247+
return ProcessStreamingResponse[TRes](ctx, httpResponse, bufferSize)
166248
}

0 commit comments

Comments
 (0)