Skip to content

Commit a9e7da9

Browse files
authored
Handle sync redirecting (#180)
This commit introduces full redirect support for sync -> async conversion in API-based runs
1 parent 10769ad commit a9e7da9

File tree

4 files changed

+27
-18
lines changed

4 files changed

+27
-18
lines changed

stephttp/api_client.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ type CheckpointRun struct {
2929
AppID uuid.UUID `json:"app_id"`
3030
// RunID is the function run ID created for this execution.
3131
RunID ulid.ULID `json:"run_id"`
32+
// Token is the token to use when redirecting if this is an async checkpoint.
33+
Token string `json:"token,omitempty"`
34+
35+
// NOTE: The below are not included in the API response when checkpointing
36+
// new runs.
3237

3338
// Stack is the current stack, used when resuming requests.
3439
Stack []string

stephttp/configure.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ type AsyncResponseRedirect struct {
8888
//
8989
// Note that this accepts a token which can be used to hit the Inngest API to
9090
// block for the API result.
91-
URL func(token string) string
91+
URL func(runID ulid.ULID, token string) string
9292
}
9393

9494
func (a AsyncResponseRedirect) isAsyncResponse() {}

stephttp/request_owner.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,8 @@ func (o *requestOwner) handle(ctx context.Context) error {
131131

132132
if sdkrequest.HasAsyncOps(o.mgr.Ops(), o.run.Attempt, 0) {
133133
// Always checkpoint first, then handle the async conversion.
134-
o.handleFirstCheckpoint(ctx)
135-
return o.handleAsyncConversion(ctx)
134+
token := o.handleFirstCheckpoint(ctx)
135+
return o.handleAsyncConversion(ctx, token)
136136
}
137137

138138
// Attempt to flush the response directly to the client immediately, reducing TTFB
@@ -169,7 +169,7 @@ func (o *requestOwner) handle(ctx context.Context) error {
169169
//
170170
// We also need to handle the API response to our user, which is either a token,
171171
// a redirect, or a custom response.
172-
func (o *requestOwner) handleAsyncConversion(ctx context.Context) error {
172+
func (o *requestOwner) handleAsyncConversion(ctx context.Context, token string) error {
173173
if !sdkrequest.HasAsyncOps(o.mgr.Ops(), o.run.Attempt, 0) {
174174
return nil
175175
}
@@ -193,19 +193,19 @@ func (o *requestOwner) handleAsyncConversion(ctx context.Context) error {
193193
case AsyncResponseToken:
194194
return json.NewEncoder(o.w).Encode(asyncResponseToken{
195195
RunID: o.run.RunID,
196-
Token: redirectToken(o.run.RunID),
196+
Token: token,
197197
})
198198
case AsyncResponseCustom:
199199
v(o.w, o.r)
200200
return nil
201201
case AsyncResponseRedirect:
202202
if v.URL != nil {
203-
url = v.URL(redirectToken(o.run.RunID))
203+
url = v.URL(o.run.RunID, token)
204204
}
205205
}
206206

207207
if url == "" {
208-
url = defaultRedirectURL(o.provider.opts, o.run.RunID)
208+
url = defaultRedirectURL(o.provider.opts, o.run.RunID, token)
209209
}
210210

211211
http.Redirect(o.w, o.r, url, http.StatusSeeOther)
@@ -312,7 +312,9 @@ func (o *requestOwner) call(ctx context.Context) APIResult {
312312
// It also checkpoints the first N steps (potentially including the entire function).
313313
//
314314
// This is a blocking operation; to run this in the background use a goroutine.
315-
func (o *requestOwner) handleFirstCheckpoint(ctx context.Context) {
315+
//
316+
// This returns an optional token used when redirecting to async outputs.
317+
func (o *requestOwner) handleFirstCheckpoint(ctx context.Context) string {
316318
var (
317319
requestBody []byte
318320
err error
@@ -357,10 +359,11 @@ func (o *requestOwner) handleFirstCheckpoint(ctx context.Context) {
357359
}, o.mgr.Ops()...)
358360
if err != nil {
359361
o.provider.logger.Error("error creating new api-based inngest run", "error", err, "run_id", o.run.RunID)
360-
return
362+
return ""
361363
}
362364

363365
o.run = *resp
366+
return resp.Token
364367
}
365368

366369
// validateResumeRequestSignature validates the signature for resume requests.

stephttp/util.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package stephttp
33
import (
44
"bufio"
55
"bytes"
6+
"fmt"
67
"io"
78
"net"
89
"net/http"
@@ -11,13 +12,13 @@ import (
1112
"github.com/oklog/ulid/v2"
1213
)
1314

14-
func redirectToken(runID ulid.ULID) string {
15-
// TODO: SIGN WITH A KEY OR RANDOM TOKEN
16-
return runID.String()
17-
}
18-
19-
func defaultRedirectURL(o SetupOpts, runID ulid.ULID) string {
20-
return o.baseURL() + "/v2/public/runs/" + redirectToken(runID)
15+
func defaultRedirectURL(o SetupOpts, runID ulid.ULID, token string) string {
16+
return fmt.Sprintf(
17+
"%s/v1/http/runs/%s/output?token=%s",
18+
o.baseURL(),
19+
runID,
20+
token,
21+
)
2122
}
2223

2324
// responseWriter captures the response for storing as the API result
@@ -56,10 +57,10 @@ func (rw *responseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
5657
if !ok {
5758
return nil, nil, http.ErrNotSupported
5859
}
59-
60+
6061
// Mark as hijacked so we stop capturing response data
6162
rw.hijacked = true
62-
63+
6364
return hijacker.Hijack()
6465
}
6566

0 commit comments

Comments
 (0)