diff --git a/providers/flagd/README.md b/providers/flagd/README.md index f73412c7c..cb17e5862 100644 --- a/providers/flagd/README.md +++ b/providers/flagd/README.md @@ -38,7 +38,7 @@ provider, err := flagd.NewProvider(flagd.WithInProcessResolver()) openfeature.SetProvider(provider) ``` -In the above example, in-process handlers attempt to connect to a sync service on address `localhost:8013` to obtain [flag definitions](https://github.com/open-feature/schemas/blob/main/json/flagd-definitions.json). +In the above example, in-process handlers attempt to connect to a sync service on address `localhost:8015` to obtain [flag definitions](https://github.com/open-feature/schemas/blob/main/json/flagd-definitions.json). #### Custom sync provider @@ -95,7 +95,7 @@ Configuration can be provided as constructor options or as environment variables | Option name | Environment variable name | Type & supported value | Default | Compatible resolver | |----------------------------------------------------------|--------------------------------|-----------------------------|-----------|---------------------| | WithHost | FLAGD_HOST | string | localhost | rpc & in-process | -| WithPort | FLAGD_PORT | number | 8013 | rpc & in-process | +| WithPort | FLAGD_PORT (rpc), FLAGD_SYNC_PORT or FLAGD_PORT (in-process) | number | 8013 (rpc), 8015 (in-process) | rpc & in-process | | WithTargetUri | FLAGD_TARGET_URI | string | "" | in-process | | WithTLS | FLAGD_TLS | boolean | false | rpc & in-process | | WithSocketPath | FLAGD_SOCKET_PATH | string | "" | rpc & in-process | @@ -106,6 +106,8 @@ Configuration can be provided as constructor options or as environment variables | WithProviderID | FLAGD_SOURCE_PROVIDER_ID | string | "" | in-process | | WithSelector | FLAGD_SOURCE_SELECTOR | string | "" | in-process | +> **Note:** For the in-process resolver, `FLAGD_SYNC_PORT` takes priority over `FLAGD_PORT`. The `FLAGD_PORT` environment variable is still supported for backwards compatibility. + ### Overriding behavior By default, the flagd provider will read non-empty environment variables to set its own configuration with the lowest priority. diff --git a/providers/flagd/e2e/config_test.go b/providers/flagd/e2e/config_test.go index c68262eb3..2847506ed 100644 --- a/providers/flagd/e2e/config_test.go +++ b/providers/flagd/e2e/config_test.go @@ -2,6 +2,7 @@ package e2e import ( "context" + "strings" "testing" "github.com/cucumber/godog" @@ -18,21 +19,17 @@ type configTestCase struct { // TestConfiguration runs all configuration test scenarios using table-driven tests func TestConfiguration(t *testing.T) { testCases := []configTestCase{ - { - name: "All", - tags: "", - }, { name: "RPC", - tags: "@rpc", + tags: "@rpc && ~@forbidden", }, { name: "InProcess", - tags: "@in-process", + tags: "@in-process && ~@forbidden", }, { name: "File", - tags: "@file", + tags: "@file && ~@forbidden", }, } @@ -44,6 +41,12 @@ func TestConfiguration(t *testing.T) { testframework.InitializeConfigScenario(sc) sc.Before(func(ctx context.Context, sc *godog.Scenario) (context.Context, error) { + options := make([]testframework.ProviderOption, 1) + options[0] = testframework.ProviderOption{ + Option: "resolver", + ValueType: "string", + Value: strings.ToLower(tc.name), + } state := &testframework.TestState{ EnvVars: make(map[string]string), EvalContext: make(map[string]interface{}), diff --git a/providers/flagd/e2e/file_test.go b/providers/flagd/e2e/file_test.go index 0c40c6d93..390ae2d2b 100644 --- a/providers/flagd/e2e/file_test.go +++ b/providers/flagd/e2e/file_test.go @@ -41,7 +41,7 @@ func TestFileProviderE2E(t *testing.T) { // Run tests with file-specific tags, focusing on core evaluation scenarios // Skip complex connection-related and synchronization scenarios for file provider - tags := "@file && ~@reconnect && ~@sync && ~@grace && ~@events && ~@unixsocket && ~@metadata" + tags := "@file && ~@reconnect && ~@sync && ~@grace && ~@events && ~@unixsocket && ~@metadata && ~@forbidden" if err := runner.RunGherkinTestsWithSubtests(t, featurePaths, tags); err != nil { t.Fatalf("Gherkin tests failed: %v", err) diff --git a/providers/flagd/e2e/inprocess_test.go b/providers/flagd/e2e/inprocess_test.go index d4767e4e3..36c2bb186 100644 --- a/providers/flagd/e2e/inprocess_test.go +++ b/providers/flagd/e2e/inprocess_test.go @@ -26,7 +26,7 @@ func TestInProcessProviderE2E(t *testing.T) { } // Run tests with in-process specific tags - tags := "@in-process && ~@unixsocket && ~@metadata && ~@customCert && ~@contextEnrichment && ~@sync-payload" + tags := "@in-process && ~@unixsocket && ~@metadata && ~@customCert && ~@contextEnrichment && ~@sync-payload && ~@forbidden" if err := runner.RunGherkinTestsWithSubtests(t, featurePaths, tags); err != nil { t.Fatalf("Gherkin tests failed: %v", err) diff --git a/providers/flagd/e2e/rpc_test.go b/providers/flagd/e2e/rpc_test.go index f3c58b9ab..994c3627c 100644 --- a/providers/flagd/e2e/rpc_test.go +++ b/providers/flagd/e2e/rpc_test.go @@ -26,7 +26,7 @@ func TestRPCProviderE2E(t *testing.T) { } // Run tests with RPC-specific tags - exclude unimplemented scenarios - tags := "@rpc && ~@unixsocket && ~@targetURI && ~@sync && ~@metadata && ~@grace && ~@customCert && ~@caching" + tags := "@rpc && ~@unixsocket && ~@targetURI && ~@sync && ~@metadata && ~@grace && ~@customCert && ~@caching && ~@forbidden" if err := runner.RunGherkinTestsWithSubtests(t, featurePaths, tags); err != nil { t.Fatalf("Gherkin tests failed: %v", err) diff --git a/providers/flagd/flagd-testbed b/providers/flagd/flagd-testbed index b62f5dbe8..9b73b3a95 160000 --- a/providers/flagd/flagd-testbed +++ b/providers/flagd/flagd-testbed @@ -1 +1 @@ -Subproject commit b62f5dbe860ecf4f36ec757dfdc0b38f7b3dec6e +Subproject commit 9b73b3a95cd9e0885937d244b118713b26374b1d diff --git a/providers/flagd/pkg/configuration.go b/providers/flagd/pkg/configuration.go index ca758d38d..edd3c00fc 100644 --- a/providers/flagd/pkg/configuration.go +++ b/providers/flagd/pkg/configuration.go @@ -33,6 +33,7 @@ const ( flagdHostEnvironmentVariableName = "FLAGD_HOST" flagdPortEnvironmentVariableName = "FLAGD_PORT" + flagdSyncPortEnvironmentVariableName = "FLAGD_SYNC_PORT" flagdTLSEnvironmentVariableName = "FLAGD_TLS" flagdSocketPathEnvironmentVariableName = "FLAGD_SOCKET_PATH" flagdServerCertPathEnvironmentVariableName = "FLAGD_SERVER_CERT_PATH" @@ -98,7 +99,10 @@ func NewProviderConfiguration(opts []ProviderOption) (*ProviderConfiguration, er opt(providerConfiguration) } + providerConfiguration.updatePortFromEnvVar() + configureProviderConfiguration(providerConfiguration) + err := validateProviderConfiguration(providerConfiguration) return providerConfiguration, err @@ -130,20 +134,6 @@ func validateProviderConfiguration(p *ProviderConfiguration) error { // updateFromEnvVar is a utility to update configurations based on current environment variables func (cfg *ProviderConfiguration) updateFromEnvVar() { - portS := os.Getenv(flagdPortEnvironmentVariableName) - if portS != "" { - port, err := strconv.Atoi(portS) - if err != nil { - cfg.log.Error(err, - fmt.Sprintf( - "invalid env config for %s provided, using default value: %d or %d depending on resolver", - flagdPortEnvironmentVariableName, defaultRpcPort, defaultInProcessPort, - )) - } else { - cfg.Port = uint16(port) - } - } - if host := os.Getenv(flagdHostEnvironmentVariableName); host != "" { cfg.Host = host } @@ -238,6 +228,41 @@ func (cfg *ProviderConfiguration) updateFromEnvVar() { } +// updatePortFromEnvVar updates the port configuration from environment variables. +// For in-process resolver, FLAGD_SYNC_PORT takes priority over FLAGD_PORT (backwards compatibility). +// For rpc resolver, only FLAGD_PORT is used. +func (cfg *ProviderConfiguration) updatePortFromEnvVar() { + if cfg.Port != 0 { + // Port is already set, no need to update from env var + return + } + var portS string + var envVarName string + + if cfg.Resolver == inProcess { + portS = os.Getenv(flagdSyncPortEnvironmentVariableName) + envVarName = flagdSyncPortEnvironmentVariableName + } + + if portS == "" { + portS = os.Getenv(flagdPortEnvironmentVariableName) + envVarName = flagdPortEnvironmentVariableName + } + + if portS != "" { + port, err := strconv.Atoi(portS) + if err != nil { + cfg.log.Error(err, + fmt.Sprintf( + "invalid env config for %s provided, using default value: %d or %d depending on resolver", + envVarName, defaultRpcPort, defaultInProcessPort, + )) + } else { + cfg.Port = uint16(port) + } + } +} + // ProviderOptions type ProviderOption func(*ProviderConfiguration) diff --git a/providers/flagd/pkg/service/in_process/service.go b/providers/flagd/pkg/service/in_process/service.go index 6cb6d0e1b..634c6c91c 100644 --- a/providers/flagd/pkg/service/in_process/service.go +++ b/providers/flagd/pkg/service/in_process/service.go @@ -322,7 +322,6 @@ func (i *InProcess) processSyncData(data isync.DataSync) { return } - i.logger.Info("staletimer stop") // Stop stale timer - we've successfully received and processed data i.staleTimer.stop() diff --git a/providers/flagd/pkg/service/in_process/service_grpc_test.go b/providers/flagd/pkg/service/in_process/service_grpc_test.go index a1e7c1c1b..03e84e595 100644 --- a/providers/flagd/pkg/service/in_process/service_grpc_test.go +++ b/providers/flagd/pkg/service/in_process/service_grpc_test.go @@ -138,7 +138,7 @@ func TestInProcessProviderEvaluationEnvoy(t *testing.T) { } inProcessService := NewInProcessService(Configuration{ - TargetUri: "envoy://localhost:9211/foo.service", + TargetUri: "envoy://localhost:9211/foo.service", Selector: scope, TLSEnabled: false, }) @@ -201,7 +201,6 @@ func TestInProcessProviderEvaluationEnvoy(t *testing.T) { } } - // bufferedServer - a mock grpc service backed by buffered connection type bufferedServer struct { listener net.Listener diff --git a/providers/flagd/pkg/service/rpc/service.go b/providers/flagd/pkg/service/rpc/service.go index eabe452b4..7f9ef07e2 100644 --- a/providers/flagd/pkg/service/rpc/service.go +++ b/providers/flagd/pkg/service/rpc/service.go @@ -452,7 +452,7 @@ func (s *Service) EventChannel() <-chan of.Event { // If retrying is exhausted, an event with openfeature.ProviderError will be emitted. func (s *Service) startEventStream(ctx context.Context) { streamReadySignaled := false - + // wraps connection with retry attempts for s.retryCounter.retry() { s.logger.V(logger.Debug).Info("connecting to event stream") @@ -488,12 +488,12 @@ func (s *Service) startEventStream(ctx context.Context) { // retry attempts exhausted. Disable cache and emit error event s.cache.Disable() connErr := fmt.Errorf("grpc connection establishment failed") - + // Signal error if we haven't signaled success yet if !streamReadySignaled { s.signalStreamReady(connErr) } - + s.sendEvent(ctx, of.Event{ ProviderName: "flagd", EventType: of.ProviderError, @@ -521,7 +521,7 @@ func (s *Service) streamClient(ctx context.Context, streamReadySignaled *bool) e } s.logger.V(logger.Info).Info("connected to event stream") - + // Signal successful connection to Init() - stream is now ready if !*streamReadySignaled { s.signalStreamReady(nil) // nil means success