Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pkg/experiment/local/assignment.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package local

import (
"github.com/amplitude/experiment-go-server/pkg/experiment"
"sort"
"strings"
"time"

"github.com/amplitude/experiment-go-server/pkg/experiment"
)

// assignment represents an assignment event.
// Deprecated: Assignment tracking is deprecated. Use exposure with ExposureService instead.
type assignment struct {
user *experiment.User
results map[string]experiment.Variant
Expand Down
5 changes: 4 additions & 1 deletion pkg/experiment/local/assignment_filter.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package local

import (
"github.com/amplitude/experiment-go-server/internal/cache"
"sync"

"github.com/amplitude/experiment-go-server/internal/cache"
)

// assignmentFilter filters duplicate assignments.
// Deprecated: Assignment tracking is deprecated. Use ExposureFilter with ExposureService instead.
type assignmentFilter struct {
mu sync.Mutex
cache *cache.Cache
Expand Down
3 changes: 2 additions & 1 deletion pkg/experiment/local/assignment_filter_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package local

import (
"github.com/amplitude/experiment-go-server/pkg/experiment"
"testing"
"time"

"github.com/amplitude/experiment-go-server/pkg/experiment"
)

func TestSingleAssignment(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions pkg/experiment/local/assignment_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,30 @@ package local

import (
"fmt"

"github.com/amplitude/analytics-go/amplitude"
)

const dayMillis = 24 * 60 * 60 * 1000
const flagTypeMutualExclusionGroup = "mutual-exclusion-group"

// assignmentService handles assignment tracking.
// Deprecated: Assignment tracking is deprecated. Use ExposureService with Exposure tracking instead.
type assignmentService struct {
amplitude *amplitude.Client
filter *assignmentFilter
}

// Track tracks an assignment event.
// Deprecated: Assignment tracking is deprecated. Use ExposureService with Exposure tracking instead.
func (s *assignmentService) Track(assignment *assignment) {
if s.filter.shouldTrack(assignment) {
(*s.amplitude).Track(toEvent(assignment))
}
}

// toEvent converts an assignment to an Amplitude event.
// Deprecated: Assignment tracking is deprecated. Use Exposure tracking instead.
func toEvent(assignment *assignment) amplitude.Event {

event := amplitude.Event{
Expand Down
22 changes: 22 additions & 0 deletions pkg/experiment/local/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Client struct {
flagsMutex *sync.RWMutex
engine *evaluation.Engine
assignmentService *assignmentService
exposureService *exposureService
cohortStorage cohortStorage
flagConfigStorage flagConfigStorage
cohortLoader *cohortLoader
Expand All @@ -54,6 +55,17 @@ func Initialize(apiKey string, config *Config) *Client {
filter: newAssignmentFilter(config.AssignmentConfig.CacheCapacity),
}
}

// Exposure service is always instantiated, using deployment key if no api key provided
var es *exposureService
if config.ExposureConfig != nil && config.ExposureConfig.APIKey != "" {
exposureAmplitudeClient := amplitude.NewClient(config.ExposureConfig.Config)
es = &exposureService{
amplitude: exposureAmplitudeClient,
filter: newExposureFilter(config.ExposureConfig.CacheCapacity),
}
println("exposure service instantiated")
}
cohortStorage := newInMemoryCohortStorage()
flagConfigStorage := newInMemoryFlagConfigStorage()
var cohortLoader *cohortLoader
Expand All @@ -79,6 +91,7 @@ func Initialize(apiKey string, config *Config) *Client {
flagsMutex: &sync.RWMutex{},
engine: evaluation.NewEngine(log),
assignmentService: as,
exposureService: es,
cohortStorage: cohortStorage,
flagConfigStorage: flagConfigStorage,
cohortLoader: cohortLoader,
Expand Down Expand Up @@ -123,6 +136,11 @@ func (c *Client) Evaluate(user *experiment.User, flagKeys []string) (map[string]
}

func (c *Client) EvaluateV2(user *experiment.User, flagKeys []string) (map[string]experiment.Variant, error) {
return c.EvaluateV2WithOptions(user, &EvaluateOptions{FlagKeys: flagKeys})
}

func (c *Client) EvaluateV2WithOptions(user *experiment.User, options *EvaluateOptions) (map[string]experiment.Variant, error) {
flagKeys := options.FlagKeys
flagConfigs := c.flagConfigStorage.getFlagConfigs()
sortedFlags, err := topologicalSort(flagConfigs, flagKeys)
if err != nil {
Expand All @@ -148,6 +166,10 @@ func (c *Client) EvaluateV2(user *experiment.User, flagKeys []string) (map[strin
Metadata: result.Metadata,
}
}
if options.TracksExposure != nil && *options.TracksExposure {
c.exposureService.Track(newExposure(user, variants))
}
// Deprecated: Assignment tracking is deprecated. Use ExposureService with Exposure tracking instead.
if c.assignmentService != nil {
c.assignmentService.Track(newAssignment(user, variants))
}
Expand Down
100 changes: 100 additions & 0 deletions pkg/experiment/local/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"testing"

"github.com/amplitude/analytics-go/amplitude"
"github.com/amplitude/experiment-go-server/pkg/experiment"
"github.com/joho/godotenv"
)
Expand Down Expand Up @@ -231,3 +232,102 @@ func TestEvaluateV2GroupCohort(t *testing.T) {
t.Fatalf("Unexpected variant %v", variant)
}
}

func TestEvaluateV2WithTracksExposureTracksNonDefaultVariants(t *testing.T) {
exposureConfig := &ExposureConfig{Config: amplitude.Config{APIKey: "some_api_key"}}
clients["server-qz35UwzJ5akieoAdIgzM4m9MIiOLXLoz"] = nil
client = Initialize("server-qz35UwzJ5akieoAdIgzM4m9MIiOLXLoz",
&Config{ExposureConfig: exposureConfig})
err := client.Start()
if err != nil {
panic(err)
}

user := &experiment.User{UserId: "test_user"}

// Capture tracked events
trackedEvents := make([]amplitude.Event, 0)

// Create a mock amplitude client that captures events
mockAmplitudeClient := mockAmplitudeClientForTest{
trackedEvents: &trackedEvents,
}

oldAmplitude := client.exposureService.amplitude
client.exposureService.amplitude = &mockAmplitudeClient
defer func() {
client.exposureService.amplitude = oldAmplitude
}()

// Perform evaluation with TracksExposure=true
tracksExposure := true
options := &EvaluateOptions{
TracksExposure: &tracksExposure,
}
variants, err := client.EvaluateV2WithOptions(user, options)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}

// Verify that track was called
if len(trackedEvents) == 0 {
t.Fatalf("Expected exposure events to be tracked, but none were tracked")
}

// Count non-default variants
nonDefaultVariants := make(map[string]experiment.Variant)
for flagKey, variant := range variants {
isDefault, ok := variant.Metadata["default"].(bool)
if !ok || !isDefault {
nonDefaultVariants[flagKey] = variant
}
}

// Verify that we have one event per non-default variant
if len(trackedEvents) != len(nonDefaultVariants) {
t.Fatalf("Expected %d exposure events, got %d", len(nonDefaultVariants), len(trackedEvents))
}

// Verify each event has the correct structure
trackedFlagKeys := make(map[string]bool)
for _, event := range trackedEvents {
if event.EventType != "[Experiment] Exposure" {
t.Errorf("EventType was %s, expected %s", event.EventType, "[Experiment] Exposure")
}
if event.UserID != user.UserId {
t.Errorf("UserID was %s, expected %s", event.UserID, user.UserId)
}
flagKey, ok := event.EventProperties["[Experiment] Flag Key"].(string)
if !ok || flagKey == "" {
t.Errorf("Event should have flag key")
}
trackedFlagKeys[flagKey] = true
// Verify the variant is not default
variant, exists := variants[flagKey]
if !exists {
t.Errorf("Variant for %s should exist", flagKey)
}
isDefault, ok := variant.Metadata["default"].(bool)
if ok && isDefault {
t.Errorf("Variant for %s should not be default", flagKey)
}
}

// Verify all non-default variants were tracked
if len(trackedFlagKeys) != len(nonDefaultVariants) {
t.Errorf("Expected %d tracked flag keys, got %d", len(nonDefaultVariants), len(trackedFlagKeys))
}
for flagKey := range nonDefaultVariants {
if !trackedFlagKeys[flagKey] {
t.Errorf("Flag key %s should have been tracked", flagKey)
}
}
}

type mockAmplitudeClientForTest struct {
trackedEvents *[]amplitude.Event
}

func (m *mockAmplitudeClientForTest) Track(event amplitude.Event) {
*m.trackedEvents = append(*m.trackedEvents, event)
}
49 changes: 33 additions & 16 deletions pkg/experiment/local/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,32 @@ const (

type Config struct {
Debug bool
LogLevel logger.LogLevel
LoggerProvider logger.LoggerProvider
LogLevel logger.LogLevel
LoggerProvider logger.LoggerProvider
ServerUrl string
ServerZone ServerZone
FlagConfigPollerInterval time.Duration
FlagConfigPollerRequestTimeout time.Duration
StreamUpdates bool
StreamServerUrl string
StreamFlagConnTimeout time.Duration
AssignmentConfig *AssignmentConfig
AssignmentConfig *AssignmentConfig // Deprecated: use ExposureConfig instead
ExposureConfig *ExposureConfig
CohortSyncConfig *CohortSyncConfig
}

// AssignmentConfig is the configuration for assignment tracking.
// Deprecated: Assignment tracking is deprecated. Use ExposureConfig with ExposureService instead.
type AssignmentConfig struct {
amplitude.Config
CacheCapacity int
}

type ExposureConfig struct {
amplitude.Config
CacheCapacity int
}

type CohortSyncConfig struct {
ApiKey string
SecretKey string
Expand All @@ -47,20 +55,11 @@ type CohortSyncConfig struct {
CohortServerUrl string
}

var DefaultConfig = &Config{
Debug: false,
LogLevel: logger.Error,
LoggerProvider: logger.NewDefault(),
ServerUrl: "https://api.lab.amplitude.com/",
ServerZone: USServerZone,
FlagConfigPollerInterval: 30 * time.Second,
FlagConfigPollerRequestTimeout: 10 * time.Second,
StreamUpdates: false,
StreamServerUrl: "https://stream.lab.amplitude.com",
StreamFlagConnTimeout: 1500 * time.Millisecond,
var DefaultAssignmentConfig = &AssignmentConfig{
CacheCapacity: 524288,
}

var DefaultAssignmentConfig = &AssignmentConfig{
var DefaultExposureConfig = &ExposureConfig{
CacheCapacity: 524288,
}

Expand All @@ -70,6 +69,20 @@ var DefaultCohortSyncConfig = &CohortSyncConfig{
CohortServerUrl: "https://cohort-v2.lab.amplitude.com",
}

var DefaultConfig = &Config{
Debug: false,
LogLevel: logger.Error,
LoggerProvider: logger.NewDefault(),
ServerUrl: "https://api.lab.amplitude.com/",
ServerZone: USServerZone,
FlagConfigPollerInterval: 30 * time.Second,
FlagConfigPollerRequestTimeout: 10 * time.Second,
StreamUpdates: false,
StreamServerUrl: "https://stream.lab.amplitude.com",
StreamFlagConnTimeout: 1500 * time.Millisecond,
ExposureConfig: DefaultExposureConfig,
}

func fillConfigDefaults(c *Config) *Config {
if c == nil {
return DefaultConfig
Expand Down Expand Up @@ -101,6 +114,10 @@ func fillConfigDefaults(c *Config) *Config {
c.AssignmentConfig.CacheCapacity = DefaultAssignmentConfig.CacheCapacity
}

if c.ExposureConfig != nil && c.ExposureConfig.CacheCapacity == 0 {
c.ExposureConfig.CacheCapacity = DefaultExposureConfig.CacheCapacity
}

if c.CohortSyncConfig != nil && c.CohortSyncConfig.MaxCohortSize == 0 {
c.CohortSyncConfig.MaxCohortSize = DefaultCohortSyncConfig.MaxCohortSize
}
Expand All @@ -124,7 +141,7 @@ func fillConfigDefaults(c *Config) *Config {

if c.Debug {
c.LogLevel = logger.Debug
}
}

if c.LoggerProvider == nil {
c.LoggerProvider = logger.NewDefault()
Expand Down
9 changes: 9 additions & 0 deletions pkg/experiment/local/evaluate_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package local

// EvaluateOptions contains options for evaluating variants for a user.
type EvaluateOptions struct {
// FlagKeys are the flags to evaluate with the user. If nil or empty, all flags are evaluated.
FlagKeys []string
// TracksExposure indicates whether to track exposure event for the evaluation.
TracksExposure *bool
}
Loading
Loading