Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .release-please-manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"hooks/open-telemetry": "0.3.6",
"hooks/validator": "0.1.7",
"providers/configcat": "0.2.3",
"providers/flagd": "0.3.0",
"providers/flagd": "0.3.1",
"providers/flipt": "0.1.5",
"providers/from-env": "0.1.6",
"providers/go-feature-flag": "0.2.6",
Expand All @@ -14,7 +14,7 @@
"providers/statsig": "0.0.4",
"providers/ofrep": "0.1.7",
"providers/prefab": "0.0.4",
"tests/flagd": "1.5.1",
"tests/flagd": "1.6.0",
"providers/go-feature-flag-in-process": "0.1.2",
"providers/multi-provider": "0.0.5",
"tools/flagd-http-connector": "0.0.2",
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ test:

# call with TESTCONTAINERS_RYUK_DISABLED="true" to avoid problems with podman on Macs
e2e:
go clean -testcache && go list -f '{{.Dir}}/...' -m | xargs -I{} go test -timeout=2m -tags=e2e {}
go clean -testcache && go list -f '{{.Dir}}/...' -m | xargs -I{} go test -timeout=3m -tags=e2e {}

lint:
go install -v github.com/golangci/golangci-lint/v2/cmd/golangci-lint@$(GOLANGCI_LINT_VERSION)
Expand Down
28 changes: 28 additions & 0 deletions providers/flagd/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,33 @@
# Changelog

## [0.3.1](https://github.com/open-feature/go-sdk-contrib/compare/providers/flagd/v0.3.0...providers/flagd/v0.3.1) (2025-11-25)


### 🐛 Bug Fixes

* **deps:** bump open-feature/go-sdk from v1.11 to v1.15 ([#686](https://github.com/open-feature/go-sdk-contrib/issues/686)) ([ce87102](https://github.com/open-feature/go-sdk-contrib/commit/ce871021d0c45d3c992bb00b33c8b7a8e337e9a3))
* **deps:** update golang.org/x/exp digest to b7579e2 ([#679](https://github.com/open-feature/go-sdk-contrib/issues/679)) ([a6372f9](https://github.com/open-feature/go-sdk-contrib/commit/a6372f91b262d2f81b90bfa9e76d722ad480378b))
* **deps:** update jsonlogic module to fix race detection ([#691](https://github.com/open-feature/go-sdk-contrib/issues/691)) ([21f3de0](https://github.com/open-feature/go-sdk-contrib/commit/21f3de0d39a6d23000957bd6f278df466af385e4))
* **deps:** update module buf.build/gen/go/open-feature/flagd/connectrpc/go to v1.18.1-20250529171031-ebdc14163473.1 ([#699](https://github.com/open-feature/go-sdk-contrib/issues/699)) ([6c7044d](https://github.com/open-feature/go-sdk-contrib/commit/6c7044de8bf10d12ed07f4c66335e297c444a6fe))
* **deps:** update module buf.build/gen/go/open-feature/flagd/grpc/go to v1.5.1-20250529171031-ebdc14163473.2 ([#700](https://github.com/open-feature/go-sdk-contrib/issues/700)) ([4747395](https://github.com/open-feature/go-sdk-contrib/commit/474739580f3c7f72e031929b99ab0b86ba4812bb))
* **deps:** update module buf.build/gen/go/open-feature/flagd/protocolbuffers/go to v1.36.6-20250529171031-ebdc14163473.1 ([#706](https://github.com/open-feature/go-sdk-contrib/issues/706)) ([902021b](https://github.com/open-feature/go-sdk-contrib/commit/902021be1083336d9a53c1fd8388cbaaa8dc7959))
* **deps:** update module github.com/open-feature/flagd/core to v0.11.5 ([#666](https://github.com/open-feature/go-sdk-contrib/issues/666)) ([94b44c4](https://github.com/open-feature/go-sdk-contrib/commit/94b44c4aed982ac54b91bd82a2cf8400c1b622c0))
* **deps:** update module github.com/open-feature/go-sdk to v1.15.1 ([#681](https://github.com/open-feature/go-sdk-contrib/issues/681)) ([8fd544f](https://github.com/open-feature/go-sdk-contrib/commit/8fd544ff81fd25eed655a214aa1ae1906a436f0d))
* fix goroutine leaks around shutdown ([#716](https://github.com/open-feature/go-sdk-contrib/issues/716)) ([c3ea532](https://github.com/open-feature/go-sdk-contrib/commit/c3ea53271ed91d20c9a9afd762ea5e2c4c3c488a))
* **flagd:** missed error events, add e2e tests ([#760](https://github.com/open-feature/go-sdk-contrib/issues/760)) ([3750972](https://github.com/open-feature/go-sdk-contrib/commit/3750972d25d847ea56f6b9b5a7640407db67ab11))
* **security:** update module github.com/containerd/containerd/v2 to v2.1.5 [security] ([#797](https://github.com/open-feature/go-sdk-contrib/issues/797)) ([f74c0c3](https://github.com/open-feature/go-sdk-contrib/commit/f74c0c306759914c48364320f2f3a2db252f3d35))
* **security:** update module github.com/docker/compose/v2 to v2.40.2 [security] ([#785](https://github.com/open-feature/go-sdk-contrib/issues/785)) ([805823f](https://github.com/open-feature/go-sdk-contrib/commit/805823f5ded2d81359fd7663804beb50f30d52f7))
* **security:** update module golang.org/x/crypto to v0.45.0 [security] ([#803](https://github.com/open-feature/go-sdk-contrib/issues/803)) ([20b0ccd](https://github.com/open-feature/go-sdk-contrib/commit/20b0ccdf1261cacde5273f61882194b92dbd6650))
* **security:** update vulnerability-updates [security] ([#724](https://github.com/open-feature/go-sdk-contrib/issues/724)) ([629a535](https://github.com/open-feature/go-sdk-contrib/commit/629a5351c2c4b8fed00522f7453d5545920ceaaf))
* **security:** update vulnerability-updates [security] ([#773](https://github.com/open-feature/go-sdk-contrib/issues/773)) ([21628dc](https://github.com/open-feature/go-sdk-contrib/commit/21628dc0bc058c042f14c1afa45df2dfc3d93c72))


### ✨ New Features

* comprehensive flagd e2e testing framework with testcontainers integration ([#732](https://github.com/open-feature/go-sdk-contrib/issues/732)) ([e3ec17b](https://github.com/open-feature/go-sdk-contrib/commit/e3ec17bdc7140582582a5df1154b6044cbf5b640))
* **flagd:** add eventing with graceperiod for inprocess resolver ([#744](https://github.com/open-feature/go-sdk-contrib/issues/744)) ([a9fabb6](https://github.com/open-feature/go-sdk-contrib/commit/a9fabb623d22b6a1ef888722ffe68686031309b8))
* upgrade flagd dependencies to 0.12.1 ([#731](https://github.com/open-feature/go-sdk-contrib/issues/731)) ([8e8d888](https://github.com/open-feature/go-sdk-contrib/commit/8e8d888dea080a03ea2a709b79598c7de6a9eed8))

## [0.3.0](https://github.com/open-feature/go-sdk-contrib/compare/providers/flagd/v0.2.6...providers/flagd/v0.3.0) (2025-06-07)


Expand Down
8 changes: 4 additions & 4 deletions providers/flagd/e2e/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,19 @@ func TestConfiguration(t *testing.T) {
testCases := []configTestCase{
{
name: "All",
tags: "",
tags: "~@sync-port",
},
{
name: "RPC",
tags: "@rpc",
tags: "@rpc && ~@sync-port",
},
{
name: "InProcess",
tags: "@in-process",
tags: "@in-process && ~@sync-port",
},
{
name: "File",
tags: "@file",
tags: "@file && ~@sync-port",
},
}

Expand Down
7 changes: 6 additions & 1 deletion providers/flagd/e2e/inprocess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package e2e
import (
"testing"

flagd "github.com/open-feature/go-sdk-contrib/providers/flagd/pkg"

"github.com/open-feature/go-sdk-contrib/tests/flagd/testframework"
)

Expand All @@ -17,6 +19,9 @@ func TestInProcessProviderE2E(t *testing.T) {
runner := testframework.NewTestbedRunner(testframework.TestbedConfig{
ResolverType: testframework.InProcess,
TestbedConfig: "default",
ExtraOptions: []flagd.ProviderOption{
flagd.WithRetryBackoffMaxMs(3000),
},
})
defer runner.Cleanup()

Expand All @@ -26,7 +31,7 @@ func TestInProcessProviderE2E(t *testing.T) {
}

// Run tests with in-process specific tags
tags := "@in-process && ~@unixsocket && ~@metadata && ~@customCert && ~@contextEnrichment && ~@sync-payload"
tags := "@in-process && ~@unixsocket && ~@metadata && ~@customCert && ~@contextEnrichment && ~@sync-payload && ~@sync-port"

if err := runner.RunGherkinTestsWithSubtests(t, featurePaths, tags); err != nil {
t.Fatalf("Gherkin tests failed: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion providers/flagd/e2e/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestRPCProviderE2E(t *testing.T) {
}

// Run tests with RPC-specific tags - exclude unimplemented scenarios
tags := "@rpc && ~@unixsocket && ~@targetURI && ~@sync && ~@metadata && ~@grace && ~@customCert && ~@caching"
tags := "@rpc && ~@unixsocket && ~@targetURI && ~@sync && ~@metadata && ~@grace && ~@customCert && ~@caching && ~@forbidden && ~@sync-port"

if err := runner.RunGherkinTestsWithSubtests(t, featurePaths, tags); err != nil {
t.Fatalf("Gherkin tests failed: %v", err)
Expand Down
107 changes: 75 additions & 32 deletions providers/flagd/pkg/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package flagd
import (
"errors"
"fmt"
"os"
"strconv"
"strings"

"github.com/go-logr/logr"
"github.com/open-feature/flagd/core/pkg/sync"
"github.com/open-feature/go-sdk-contrib/providers/flagd/internal/cache"
"github.com/open-feature/go-sdk-contrib/providers/flagd/internal/logger"
"google.golang.org/grpc"
"os"
"strconv"
"strings"
)

type ResolverType string
Expand All @@ -26,6 +27,9 @@ const (
defaultHost = "localhost"
defaultResolver = rpc
defaultGracePeriod = 5
defaultRetryBackoffMs = 1000
defaultRetryBackoffMaxMs = 120000
defaultFatalStatusCodes = ""

rpc ResolverType = "rpc"
inProcess ResolverType = "in-process"
Expand All @@ -45,6 +49,9 @@ const (
flagdOfflinePathEnvironmentVariableName = "FLAGD_OFFLINE_FLAG_SOURCE_PATH"
flagdTargetUriEnvironmentVariableName = "FLAGD_TARGET_URI"
flagdGracePeriodVariableName = "FLAGD_RETRY_GRACE_PERIOD"
flagdRetryBackoffMsVariableName = "FLAGD_RETRY_BACKOFF_MS"
flagdRetryBackoffMaxMsVariableName = "FLAGD_RETRY_BACKOFF_MAX_MS"
flagdFatalStatusCodesVariableName = "FLAGD_FATAL_STATUS_CODES"
)

type ProviderConfiguration struct {
Expand All @@ -66,6 +73,9 @@ type ProviderConfiguration struct {
CustomSyncProviderUri string
GrpcDialOptionsOverride []grpc.DialOption
RetryGracePeriod int
RetryBackoffMs int
RetryBackoffMaxMs int
FatalStatusCodes []string

log logr.Logger
}
Expand All @@ -80,6 +90,8 @@ func newDefaultConfiguration(log logr.Logger) *ProviderConfiguration {
Resolver: defaultResolver,
Tls: defaultTLS,
RetryGracePeriod: defaultGracePeriod,
RetryBackoffMs: defaultRetryBackoffMs,
RetryBackoffMaxMs: defaultRetryBackoffMaxMs,
}

p.updateFromEnvVar()
Expand Down Expand Up @@ -130,6 +142,7 @@ func validateProviderConfiguration(p *ProviderConfiguration) error {

// updateFromEnvVar is a utility to update configurations based on current environment variables
func (cfg *ProviderConfiguration) updateFromEnvVar() {

portS := os.Getenv(flagdPortEnvironmentVariableName)
if portS != "" {
port, err := strconv.Atoi(portS)
Expand Down Expand Up @@ -159,17 +172,7 @@ func (cfg *ProviderConfiguration) updateFromEnvVar() {
cfg.CertPath = certificatePath
}

if maxCacheSizeS := os.Getenv(flagdMaxCacheSizeEnvironmentVariableName); maxCacheSizeS != "" {
maxCacheSizeFromEnv, err := strconv.Atoi(maxCacheSizeS)
if err != nil {
cfg.log.Error(err,
fmt.Sprintf("invalid env config for %s provided, using default value: %d",
flagdMaxCacheSizeEnvironmentVariableName, defaultMaxCacheSize,
))
} else {
cfg.MaxCacheSize = maxCacheSizeFromEnv
}
}
cfg.MaxCacheSize = getIntFromEnvVarOrDefault(flagdMaxCacheSizeEnvironmentVariableName, defaultMaxCacheSize, cfg.log)

if cacheValue := os.Getenv(flagdCacheEnvironmentVariableName); cacheValue != "" {
switch cache.Type(cacheValue) {
Expand All @@ -185,18 +188,8 @@ func (cfg *ProviderConfiguration) updateFromEnvVar() {
}
}

if maxEventStreamRetriesS := os.Getenv(
flagdMaxEventStreamRetriesEnvironmentVariableName); maxEventStreamRetriesS != "" {

maxEventStreamRetries, err := strconv.Atoi(maxEventStreamRetriesS)
if err != nil {
cfg.log.Error(err,
fmt.Sprintf("invalid env config for %s provided, using default value: %d",
flagdMaxEventStreamRetriesEnvironmentVariableName, defaultMaxEventStreamRetries))
} else {
cfg.EventStreamConnectionMaxAttempts = maxEventStreamRetries
}
}
cfg.EventStreamConnectionMaxAttempts = getIntFromEnvVarOrDefault(
flagdMaxEventStreamRetriesEnvironmentVariableName, defaultMaxEventStreamRetries, cfg.log)

if resolver := os.Getenv(flagdResolverEnvironmentVariableName); resolver != "" {
switch strings.ToLower(resolver) {
Expand Down Expand Up @@ -227,15 +220,43 @@ func (cfg *ProviderConfiguration) updateFromEnvVar() {
if targetUri := os.Getenv(flagdTargetUriEnvironmentVariableName); targetUri != "" {
cfg.TargetUri = targetUri
}
if gracePeriod := os.Getenv(flagdGracePeriodVariableName); gracePeriod != "" {
if seconds, err := strconv.Atoi(gracePeriod); err == nil {
cfg.RetryGracePeriod = seconds
} else {
// Handle parsing error
cfg.log.Error(err, fmt.Sprintf("invalid grace period '%s'", gracePeriod))

cfg.RetryGracePeriod = getIntFromEnvVarOrDefault(flagdGracePeriodVariableName, defaultGracePeriod, cfg.log)
cfg.RetryBackoffMs = getIntFromEnvVarOrDefault(flagdRetryBackoffMsVariableName, defaultRetryBackoffMs, cfg.log)
cfg.RetryBackoffMaxMs = getIntFromEnvVarOrDefault(flagdRetryBackoffMaxMsVariableName, defaultRetryBackoffMaxMs, cfg.log)

var fatalStatusCodes string
if envVal := os.Getenv(flagdFatalStatusCodesVariableName); envVal != "" {
fatalStatusCodes = envVal
} else {
fatalStatusCodes = defaultFatalStatusCodes
}
if fatalStatusCodes == "" {
cfg.FatalStatusCodes = []string{}
} else {
fatalStatusCodesArr := strings.Split(fatalStatusCodes, ",")
for i, fatalStatusCode := range fatalStatusCodesArr {
fatalStatusCodesArr[i] = strings.TrimSpace(fatalStatusCode)
}
cfg.FatalStatusCodes = fatalStatusCodesArr
}
}

// Helper

func getIntFromEnvVarOrDefault(envVarName string, defaultValue int, log logr.Logger) int {
if valueFromEnv := os.Getenv(envVarName); valueFromEnv != "" {
intValue, err := strconv.Atoi(valueFromEnv)
if err != nil {
log.Error(err,
fmt.Sprintf("invalid env config for %s provided, using default value: %d",
envVarName, defaultValue,
))
} else {
return intValue
}
}
return defaultValue
}

// ProviderOptions
Expand Down Expand Up @@ -415,3 +436,25 @@ func WithRetryGracePeriod(gracePeriod int) ProviderOption {
p.RetryGracePeriod = gracePeriod
}
}

// WithRetryBackoffMs sets the initial backoff duration (in milliseconds) for retrying failed connections
func WithRetryBackoffMs(retryBackoffMs int) ProviderOption {
return func(p *ProviderConfiguration) {
p.RetryBackoffMs = retryBackoffMs
}
}

// WithRetryBackoffMaxMs sets the maximum backoff duration (in milliseconds) for retrying failed connections
func WithRetryBackoffMaxMs(retryBackoffMaxMs int) ProviderOption {
return func(p *ProviderConfiguration) {
p.RetryBackoffMaxMs = retryBackoffMaxMs
}
}

// WithFatalStatusCodes allows to set a list of gRPC status codes, which will cause streams to give up
// and put the provider in a PROVIDER_FATAL state
func WithFatalStatusCodes(fatalStatusCodes []string) ProviderOption {
return func(p *ProviderConfiguration) {
p.FatalStatusCodes = fatalStatusCodes
}
}
3 changes: 3 additions & 0 deletions providers/flagd/pkg/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ func NewProvider(opts ...ProviderOption) (*Provider, error) {
CustomSyncProviderUri: provider.providerConfiguration.CustomSyncProviderUri,
GrpcDialOptionsOverride: provider.providerConfiguration.GrpcDialOptionsOverride,
RetryGracePeriod: provider.providerConfiguration.RetryGracePeriod,
RetryBackOffMs: provider.providerConfiguration.RetryBackoffMs,
RetryBackOffMaxMs: provider.providerConfiguration.RetryBackoffMaxMs,
FatalStatusCodes: provider.providerConfiguration.FatalStatusCodes,
})
default:
service = process.NewInProcessService(process.Configuration{
Expand Down
66 changes: 66 additions & 0 deletions providers/flagd/pkg/service/in_process/grpc_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package process

import (
"encoding/json"
"fmt"
"google.golang.org/grpc/codes"
"time"
)

const (
// Default timeouts and retry intervals
defaultKeepaliveTime = 30 * time.Second
defaultKeepaliveTimeout = 5 * time.Second
)

type RetryPolicy struct {
MaxAttempts int `json:"MaxAttempts"`
InitialBackoff string `json:"InitialBackoff"`
MaxBackoff string `json:"MaxBackoff"`
BackoffMultiplier float64 `json:"BackoffMultiplier"`
RetryableStatusCodes []string `json:"RetryableStatusCodes"`
}

func (g *Sync) buildRetryPolicy() string {
var policy = map[string]interface{}{
"methodConfig": []map[string]interface{}{
{
"name": []map[string]string{
{"service": "flagd.sync.v1.FlagSyncService"},
},
"retryPolicy": RetryPolicy{
MaxAttempts: 3,
InitialBackoff: (time.Duration(g.RetryBackOffMs) * time.Millisecond).String(),
MaxBackoff: (time.Duration(g.RetryBackOffMaxMs) * time.Millisecond).String(),
BackoffMultiplier: 2.0,
RetryableStatusCodes: []string{"UNKNOWN", "UNAVAILABLE"},
},
},
},
}
retryPolicyBytes, _ := json.Marshal(policy)
retryPolicy := string(retryPolicyBytes)

return retryPolicy
}

// Set of non-retryable gRPC status codes for faster lookup
var nonRetryableCodes map[codes.Code]struct{}

// initNonRetryableStatusCodesSet initializes the set of non-retryable gRPC status codes for quick lookup
func (g *Sync) initNonRetryableStatusCodesSet() {
nonRetryableCodes = make(map[codes.Code]struct{})

for _, codeStr := range g.FatalStatusCodes {
// Wrap the string in quotes to match the expected JSON format
jsonStr := fmt.Sprintf(`"%s"`, codeStr)

var code codes.Code
if err := code.UnmarshalJSON([]byte(jsonStr)); err != nil {
g.Logger.Warn(fmt.Sprintf("unknown status code: %s, error: %v", codeStr, err))
continue
}

nonRetryableCodes[code] = struct{}{}
}
}
Loading
Loading