Skip to content

Commit 943bcd8

Browse files
authored
[SYS-370] Adding Worker Concurrency - Go SDK (#188)
* [SYS-370] Adding Worker Concurrency - Go SDK * Use OSS Inngest 1.13.5
1 parent 6a7c777 commit 943bcd8

File tree

100 files changed

+889
-5722
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

100 files changed

+889
-5722
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
# Go workspace file
2525
go.work
26+
go.work.sum
2627

2728
# End of https://www.toptal.com/developers/gitignore/api/go
2829

connect.go

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,6 @@ import (
1212
"github.com/inngest/inngestgo/pkg/env"
1313
)
1414

15-
const (
16-
defaultMaxWorkerConcurrency = 1_000
17-
)
18-
1915
type ConnectOpts struct {
2016
Apps []Client
2117

@@ -27,17 +23,14 @@ type ConnectOpts struct {
2723

2824
RewriteGatewayEndpoint func(endpoint url.URL) (url.URL, error)
2925

30-
// MaxConcurrency defines the maximum number of requests the worker can process at once.
26+
// MaxWorkerConcurrency defines the maximum number of requests the worker can process at once.
3127
// This affects goroutines available to handle connnect workloads, as well as flow control.
32-
// Defaults to 1000.
33-
MaxConcurrency int
28+
// If this value is not set we use the environment variable "INNGEST_CONNECT_MAX_WORKER_CONCURRENCY".
29+
// Defaults to 0. There is no limit if this is 0.
30+
MaxWorkerConcurrency *int64
3431
}
3532

3633
func Connect(ctx context.Context, opts ConnectOpts) (connect.WorkerConnection, error) {
37-
concurrency := opts.MaxConcurrency
38-
if concurrency < 1 {
39-
concurrency = defaultMaxWorkerConcurrency
40-
}
4134

4235
connectPlaceholder := url.URL{
4336
Scheme: "ws",
@@ -109,7 +102,7 @@ func Connect(ctx context.Context, opts ConnectOpts) (connect.WorkerConnection, e
109102
Capabilities: capabilities,
110103
HashedSigningKey: hashedKey,
111104
HashedSigningKeyFallback: hashedFallbackKey,
112-
MaxConcurrency: concurrency,
105+
MaxWorkerConcurrency: opts.MaxWorkerConcurrency,
113106
APIBaseUrl: defaultClient.h.GetAPIBaseURL(),
114107
IsDev: defaultClient.h.isDev(),
115108
DevServerUrl: env.DevServerURL(),

connect/handler.go

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"net/url"
1111
"os"
1212
"runtime"
13+
"strconv"
1314
"sync/atomic"
1415
"time"
1516

@@ -23,6 +24,13 @@ import (
2324
"golang.org/x/sync/errgroup"
2425
)
2526

27+
const (
28+
defaultMaxWorkerConcurrency = int64(0)
29+
defaultWorkerPoolSize = 1_000
30+
maxWorkerPoolSize = 100_000_000
31+
maxWorkerConcurrencyEnvKey = "INNGEST_CONNECT_MAX_WORKER_CONCURRENCY"
32+
)
33+
2634
type ConnectionState string
2735

2836
const (
@@ -63,7 +71,13 @@ func Connect(ctx context.Context, opts Opts, invokers map[string]FunctionInvoker
6371
cancelWorkerCtx: cancelDone,
6472
}
6573

66-
wp := NewWorkerPool(ctx, opts.MaxConcurrency, ch.processExecutorRequest)
74+
// define a worker pool size based on the max worker concurrency
75+
workerPoolSize := defaultWorkerPoolSize
76+
if opts.MaxWorkerConcurrency != nil && *opts.MaxWorkerConcurrency > 0 {
77+
workerPoolSize = min(int(*opts.MaxWorkerConcurrency), maxWorkerPoolSize)
78+
}
79+
80+
wp := NewWorkerPool(ctx, workerPoolSize, ch.processExecutorRequest)
6781
ch.workerPool = wp
6882

6983
defer func() {
@@ -97,7 +111,7 @@ type Opts struct {
97111
HashedSigningKey []byte
98112
HashedSigningKeyFallback []byte
99113

100-
MaxConcurrency int
114+
MaxWorkerConcurrency *int64
101115

102116
APIBaseUrl string
103117
IsDev bool
@@ -486,6 +500,28 @@ func (h *connectHandler) instanceId() string {
486500
return "<missing-instance-id>"
487501
}
488502

503+
// maxWorkerConcurrency returns the maximum number of worker concurrency to use.
504+
func (h *connectHandler) maxWorkerConcurrency() *int64 {
505+
506+
// user provided max worker concurrency
507+
if h.opts.MaxWorkerConcurrency != nil {
508+
return h.opts.MaxWorkerConcurrency
509+
}
510+
511+
// environment variable
512+
envValue := os.Getenv(maxWorkerConcurrencyEnvKey)
513+
if envValue != "" {
514+
if concurrency, err := strconv.ParseInt(envValue, 10, 64); err == nil {
515+
return &concurrency
516+
}
517+
// ignore error
518+
}
519+
520+
// default max worker concurrency
521+
concurrency := defaultMaxWorkerConcurrency
522+
return &concurrency
523+
}
524+
489525
func expBackoff(attempt int) time.Duration {
490526
backoffTimes := []time.Duration{
491527
time.Second, 2 * time.Second, 5 * time.Second, 10 * time.Second,

connect/handler_test.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package connect
2+
3+
import (
4+
"log/slog"
5+
"os"
6+
"testing"
7+
8+
"github.com/stretchr/testify/require"
9+
)
10+
11+
func TestMaxWorkerConcurrency(t *testing.T) {
12+
t.Run("returns user provided value", func(t *testing.T) {
13+
r := require.New(t)
14+
15+
maxConcurrency := int64(100)
16+
h := &connectHandler{
17+
opts: Opts{
18+
MaxWorkerConcurrency: &maxConcurrency,
19+
},
20+
logger: slog.Default(),
21+
}
22+
23+
result := h.maxWorkerConcurrency()
24+
r.NotNil(result)
25+
r.Equal(int64(100), *result)
26+
})
27+
28+
t.Run("returns environment variable value when user value not provided", func(t *testing.T) {
29+
r := require.New(t)
30+
31+
// Set environment variable
32+
t.Setenv(maxWorkerConcurrencyEnvKey, "50")
33+
34+
h := &connectHandler{
35+
opts: Opts{
36+
MaxWorkerConcurrency: nil,
37+
},
38+
logger: slog.Default(),
39+
}
40+
41+
result := h.maxWorkerConcurrency()
42+
r.NotNil(result)
43+
r.Equal(int64(50), *result)
44+
})
45+
46+
t.Run("returns default value when neither user value nor env var provided", func(t *testing.T) {
47+
r := require.New(t)
48+
49+
// Ensure environment variable is not set
50+
_ = os.Unsetenv(maxWorkerConcurrencyEnvKey)
51+
52+
h := &connectHandler{
53+
opts: Opts{
54+
MaxWorkerConcurrency: nil,
55+
},
56+
logger: slog.Default(),
57+
}
58+
59+
result := h.maxWorkerConcurrency()
60+
r.NotNil(result)
61+
r.Equal(defaultMaxWorkerConcurrency, *result)
62+
})
63+
64+
t.Run("user provided value takes precedence over environment variable", func(t *testing.T) {
65+
r := require.New(t)
66+
67+
// Set environment variable
68+
t.Setenv(maxWorkerConcurrencyEnvKey, "50")
69+
70+
maxConcurrency := int64(200)
71+
h := &connectHandler{
72+
opts: Opts{
73+
MaxWorkerConcurrency: &maxConcurrency,
74+
},
75+
logger: slog.Default(),
76+
}
77+
78+
result := h.maxWorkerConcurrency()
79+
r.NotNil(result)
80+
r.Equal(int64(200), *result)
81+
})
82+
83+
t.Run("handles invalid environment variable gracefully", func(t *testing.T) {
84+
r := require.New(t)
85+
86+
// Set invalid environment variable
87+
t.Setenv(maxWorkerConcurrencyEnvKey, "invalid")
88+
89+
h := &connectHandler{
90+
opts: Opts{
91+
MaxWorkerConcurrency: nil,
92+
},
93+
logger: slog.Default(),
94+
}
95+
96+
result := h.maxWorkerConcurrency()
97+
r.NotNil(result)
98+
// Should return default value when env var is invalid
99+
r.Equal(defaultMaxWorkerConcurrency, *result)
100+
})
101+
}

connect/handshake.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,14 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"runtime"
8+
"time"
9+
710
"github.com/coder/websocket"
811
"github.com/inngest/inngest/pkg/connect/wsproto"
912
connectproto "github.com/inngest/inngest/proto/gen/connect/v1"
1013
"google.golang.org/protobuf/proto"
1114
"google.golang.org/protobuf/types/known/timestamppb"
12-
"runtime"
13-
"time"
1415
)
1516

1617
type reconnectError struct {
@@ -59,8 +60,9 @@ func (h *connectHandler) performConnectHandshake(ctx context.Context, connection
5960
// Send connect message
6061
{
6162
data, err := proto.Marshal(&connectproto.WorkerConnectRequestData{
62-
ConnectionId: startResponse.ConnectionId,
63-
InstanceId: h.instanceId(),
63+
ConnectionId: startResponse.ConnectionId,
64+
InstanceId: h.instanceId(),
65+
MaxWorkerConcurrency: h.maxWorkerConcurrency(),
6466
AuthData: &connectproto.AuthData{
6567
SessionToken: startResponse.GetSessionToken(),
6668
SyncToken: startResponse.GetSyncToken(),

examples/connect/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ func main() {
6565
}
6666

6767
conn, err := inngestgo.Connect(ctx, inngestgo.ConnectOpts{
68-
InstanceID: inngestgo.Ptr("example-worker"),
68+
InstanceID: inngestgo.Ptr("example-worker"),
69+
MaxWorkerConcurrency: inngestgo.Ptr(int64(2)),
6970
Apps: []inngestgo.Client{
7071
c1,
7172
c2,

go.mod

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
module github.com/inngest/inngestgo
22

3-
go 1.24
3+
go 1.24.0
4+
5+
toolchain go1.24.3
46

57
require (
68
github.com/coder/websocket v1.8.12
79
github.com/fatih/structs v1.1.0
810
github.com/google/uuid v1.6.0
911
github.com/gosimple/slug v1.12.0
1012
github.com/gowebpki/jcs v1.0.0
11-
github.com/inngest/inngest v1.12.1
13+
github.com/inngest/inngest v1.13.5
1214
github.com/oklog/ulid/v2 v2.1.1
1315
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
1416
github.com/sashabaranov/go-openai v1.35.6
@@ -19,32 +21,20 @@ require (
1921
)
2022

2123
require (
22-
github.com/99designs/gqlgen v0.17.27 // indirect
23-
github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da // indirect
24-
github.com/agnivade/levenshtein v1.1.1 // indirect
2524
github.com/cespare/xxhash/v2 v2.3.0 // indirect
26-
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
2725
github.com/davecgh/go-spew v1.1.1 // indirect
28-
github.com/dmarkham/enumer v1.5.8 // indirect
2926
github.com/getsentry/sentry-go v0.27.0 // indirect
3027
github.com/gosimple/unidecode v1.0.1 // indirect
3128
github.com/hashicorp/errwrap v1.1.0 // indirect
3229
github.com/hashicorp/go-multierror v1.1.1 // indirect
3330
github.com/lmittmann/tint v1.1.0 // indirect
34-
github.com/pascaldekloe/name v1.0.1 // indirect
3531
github.com/pmezard/go-difflib v1.0.0 // indirect
36-
github.com/russross/blackfriday/v2 v2.1.0 // indirect
37-
github.com/urfave/cli/v2 v2.25.1 // indirect
38-
github.com/vektah/gqlparser/v2 v2.5.15 // indirect
39-
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
40-
golang.org/x/mod v0.25.0 // indirect
4132
golang.org/x/net v0.41.0 // indirect
4233
golang.org/x/sys v0.36.0 // indirect
4334
golang.org/x/text v0.26.0 // indirect
44-
golang.org/x/tools v0.33.0 // indirect
4535
google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect
4636
google.golang.org/grpc v1.73.0 // indirect
4737
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
4838
gopkg.in/yaml.v3 v3.0.1 // indirect
49-
lukechampine.com/frand v1.4.2 // indirect
39+
lukechampine.com/frand v1.5.1 // indirect
5040
)

0 commit comments

Comments
 (0)