Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions providers/flagd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ provider, err := flagd.NewProvider(flagd.WithInProcessResolver())
openfeature.SetProvider(provider)
```

In the above example, in-process handlers attempt to connect to a sync service on address `localhost:8013` to obtain [flag definitions](https://github.com/open-feature/schemas/blob/main/json/flagd-definitions.json).
In the above example, in-process handlers attempt to connect to a sync service on address `localhost:8015` to obtain [flag definitions](https://github.com/open-feature/schemas/blob/main/json/flagd-definitions.json).

#### Custom sync provider

Expand Down Expand Up @@ -95,7 +95,7 @@ Configuration can be provided as constructor options or as environment variables
| Option name | Environment variable name | Type & supported value | Default | Compatible resolver |
|----------------------------------------------------------|--------------------------------|-----------------------------|-----------|---------------------|
| WithHost | FLAGD_HOST | string | localhost | rpc & in-process |
| WithPort | FLAGD_PORT | number | 8013 | rpc & in-process |
| WithPort | FLAGD_PORT (rpc), FLAGD_SYNC_PORT or FLAGD_PORT (in-process) | number | 8013 (rpc), 8015 (in-process) | rpc & in-process |
| WithTargetUri | FLAGD_TARGET_URI | string | "" | in-process |
| WithTLS | FLAGD_TLS | boolean | false | rpc & in-process |
| WithSocketPath | FLAGD_SOCKET_PATH | string | "" | rpc & in-process |
Expand All @@ -106,6 +106,8 @@ Configuration can be provided as constructor options or as environment variables
| WithProviderID | FLAGD_SOURCE_PROVIDER_ID | string | "" | in-process |
| WithSelector | FLAGD_SOURCE_SELECTOR | string | "" | in-process |

> **Note:** For the in-process resolver, `FLAGD_SYNC_PORT` takes priority over `FLAGD_PORT`. The `FLAGD_PORT` environment variable is still supported for backwards compatibility.

### Overriding behavior

By default, the flagd provider will read non-empty environment variables to set its own configuration with the lowest priority.
Expand Down
17 changes: 10 additions & 7 deletions providers/flagd/e2e/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package e2e

import (
"context"
"strings"
"testing"

"github.com/cucumber/godog"
Expand All @@ -18,21 +19,17 @@ type configTestCase struct {
// TestConfiguration runs all configuration test scenarios using table-driven tests
func TestConfiguration(t *testing.T) {
testCases := []configTestCase{
{
name: "All",
tags: "",
},
{
name: "RPC",
tags: "@rpc",
tags: "@rpc && ~@forbidden",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this relates to the fatalstatuscodes, which are currently also another open pr

},
{
name: "InProcess",
tags: "@in-process",
tags: "@in-process && ~@forbidden",
},
{
name: "File",
tags: "@file",
tags: "@file && ~@forbidden",
},
}

Expand All @@ -44,6 +41,12 @@ func TestConfiguration(t *testing.T) {

testframework.InitializeConfigScenario(sc)
sc.Before(func(ctx context.Context, sc *godog.Scenario) (context.Context, error) {
options := make([]testframework.ProviderOption, 1)
options[0] = testframework.ProviderOption{
Option: "resolver",
ValueType: "string",
Value: strings.ToLower(tc.name),
}
state := &testframework.TestState{
EnvVars: make(map[string]string),
EvalContext: make(map[string]interface{}),
Expand Down
2 changes: 1 addition & 1 deletion providers/flagd/e2e/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestFileProviderE2E(t *testing.T) {

// Run tests with file-specific tags, focusing on core evaluation scenarios
// Skip complex connection-related and synchronization scenarios for file provider
tags := "@file && ~@reconnect && ~@sync && ~@grace && ~@events && ~@unixsocket && ~@metadata"
tags := "@file && ~@reconnect && ~@sync && ~@grace && ~@events && ~@unixsocket && ~@metadata && ~@forbidden"

if err := runner.RunGherkinTestsWithSubtests(t, featurePaths, tags); err != nil {
t.Fatalf("Gherkin tests failed: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion providers/flagd/e2e/inprocess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestInProcessProviderE2E(t *testing.T) {
}

// Run tests with in-process specific tags
tags := "@in-process && ~@unixsocket && ~@metadata && ~@customCert && ~@contextEnrichment && ~@sync-payload"
tags := "@in-process && ~@unixsocket && ~@metadata && ~@customCert && ~@contextEnrichment && ~@sync-payload && ~@forbidden"

if err := runner.RunGherkinTestsWithSubtests(t, featurePaths, tags); err != nil {
t.Fatalf("Gherkin tests failed: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion providers/flagd/e2e/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestRPCProviderE2E(t *testing.T) {
}

// Run tests with RPC-specific tags - exclude unimplemented scenarios
tags := "@rpc && ~@unixsocket && ~@targetURI && ~@sync && ~@metadata && ~@grace && ~@customCert && ~@caching"
tags := "@rpc && ~@unixsocket && ~@targetURI && ~@sync && ~@metadata && ~@grace && ~@customCert && ~@caching && ~@forbidden"

if err := runner.RunGherkinTestsWithSubtests(t, featurePaths, tags); err != nil {
t.Fatalf("Gherkin tests failed: %v", err)
Expand Down
53 changes: 39 additions & 14 deletions providers/flagd/pkg/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (

flagdHostEnvironmentVariableName = "FLAGD_HOST"
flagdPortEnvironmentVariableName = "FLAGD_PORT"
flagdSyncPortEnvironmentVariableName = "FLAGD_SYNC_PORT"
flagdTLSEnvironmentVariableName = "FLAGD_TLS"
flagdSocketPathEnvironmentVariableName = "FLAGD_SOCKET_PATH"
flagdServerCertPathEnvironmentVariableName = "FLAGD_SERVER_CERT_PATH"
Expand Down Expand Up @@ -98,7 +99,10 @@ func NewProviderConfiguration(opts []ProviderOption) (*ProviderConfiguration, er
opt(providerConfiguration)
}

providerConfiguration.updatePortFromEnvVar()

configureProviderConfiguration(providerConfiguration)

err := validateProviderConfiguration(providerConfiguration)

return providerConfiguration, err
Expand Down Expand Up @@ -130,20 +134,6 @@ func validateProviderConfiguration(p *ProviderConfiguration) error {

// updateFromEnvVar is a utility to update configurations based on current environment variables
func (cfg *ProviderConfiguration) updateFromEnvVar() {
portS := os.Getenv(flagdPortEnvironmentVariableName)
if portS != "" {
port, err := strconv.Atoi(portS)
if err != nil {
cfg.log.Error(err,
fmt.Sprintf(
"invalid env config for %s provided, using default value: %d or %d depending on resolver",
flagdPortEnvironmentVariableName, defaultRpcPort, defaultInProcessPort,
))
} else {
cfg.Port = uint16(port)
}
}

if host := os.Getenv(flagdHostEnvironmentVariableName); host != "" {
cfg.Host = host
}
Expand Down Expand Up @@ -238,6 +228,41 @@ func (cfg *ProviderConfiguration) updateFromEnvVar() {

}

// updatePortFromEnvVar updates the port configuration from environment variables.
// For in-process resolver, FLAGD_SYNC_PORT takes priority over FLAGD_PORT (backwards compatibility).
// For rpc resolver, only FLAGD_PORT is used.
func (cfg *ProviderConfiguration) updatePortFromEnvVar() {
if cfg.Port != 0 {
// Port is already set, no need to update from env var
return
}
var portS string
var envVarName string

if cfg.Resolver == inProcess {
portS = os.Getenv(flagdSyncPortEnvironmentVariableName)
envVarName = flagdSyncPortEnvironmentVariableName
}

if portS == "" {
portS = os.Getenv(flagdPortEnvironmentVariableName)
envVarName = flagdPortEnvironmentVariableName
}

if portS != "" {
port, err := strconv.Atoi(portS)
if err != nil {
cfg.log.Error(err,
fmt.Sprintf(
"invalid env config for %s provided, using default value: %d or %d depending on resolver",
envVarName, defaultRpcPort, defaultInProcessPort,
))
} else {
cfg.Port = uint16(port)
}
}
}

// ProviderOptions

type ProviderOption func(*ProviderConfiguration)
Expand Down
1 change: 0 additions & 1 deletion providers/flagd/pkg/service/in_process/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,6 @@ func (i *InProcess) processSyncData(data isync.DataSync) {
return
}

i.logger.Info("staletimer stop")
// Stop stale timer - we've successfully received and processed data
i.staleTimer.stop()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func TestInProcessProviderEvaluationEnvoy(t *testing.T) {
}

inProcessService := NewInProcessService(Configuration{
TargetUri: "envoy://localhost:9211/foo.service",
TargetUri: "envoy://localhost:9211/foo.service",
Selector: scope,
TLSEnabled: false,
})
Expand Down Expand Up @@ -201,7 +201,6 @@ func TestInProcessProviderEvaluationEnvoy(t *testing.T) {
}
}


// bufferedServer - a mock grpc service backed by buffered connection
type bufferedServer struct {
listener net.Listener
Expand Down
8 changes: 4 additions & 4 deletions providers/flagd/pkg/service/rpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func (s *Service) EventChannel() <-chan of.Event {
// If retrying is exhausted, an event with openfeature.ProviderError will be emitted.
func (s *Service) startEventStream(ctx context.Context) {
streamReadySignaled := false

// wraps connection with retry attempts
for s.retryCounter.retry() {
s.logger.V(logger.Debug).Info("connecting to event stream")
Expand Down Expand Up @@ -488,12 +488,12 @@ func (s *Service) startEventStream(ctx context.Context) {
// retry attempts exhausted. Disable cache and emit error event
s.cache.Disable()
connErr := fmt.Errorf("grpc connection establishment failed")

// Signal error if we haven't signaled success yet
if !streamReadySignaled {
s.signalStreamReady(connErr)
}

s.sendEvent(ctx, of.Event{
ProviderName: "flagd",
EventType: of.ProviderError,
Expand Down Expand Up @@ -521,7 +521,7 @@ func (s *Service) streamClient(ctx context.Context, streamReadySignaled *bool) e
}

s.logger.V(logger.Info).Info("connected to event stream")

// Signal successful connection to Init() - stream is now ready
if !*streamReadySignaled {
s.signalStreamReady(nil) // nil means success
Expand Down
Loading