Skip to content
This repository was archived by the owner on Sep 23, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions datadog.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ type Options struct {
// GlobalTags holds a set of tags that will automatically be applied to all
// exported spans.
GlobalTags map[string]interface{}

// DisableCountPerBuckets specifies whether to emit count_per_bucket metrics
DisableCountPerBuckets bool

// HistogramPercentiles given a list of percentiles ["0.5", "0.95", "0.99"], for each one will estimate the
// percentile from the Distribution metric and emit a unique metric for each
HistogramPercentiles []string
}

func (o *Options) onError(err error) {
Expand Down
45 changes: 45 additions & 0 deletions datadog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,3 +272,48 @@ func TestHistogram(t *testing.T) {
t.Errorf("Expected: %v, Got: %v\n", vd, actual)
}
}

func TestPercentile_buildMetricNameForPercentile(t *testing.T) {
testCases := []struct {
Percentile float64
Expected string
}{
{
0.5,
"50percentile",
},
{
0.75,
"75percentile",
},
{
0.92,
"92percentile",
},
{
0.95,
"95percentile",
},
{
0.99,
"99percentile",
},
{
0.995,
"995percentile",
},
{
0.999,
"999percentile",
},
}

for _, tc := range testCases {
t.Run(fmt.Sprintf("%f", tc.Percentile), func(t *testing.T) {
got := buildMetricNameForPercentile(tc.Percentile)
if got != tc.Expected {
t.Errorf("Expected: %v, Got %v\n", tc.Expected, got)
}
})
}
}
68 changes: 57 additions & 11 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package datadog

import (
"fmt"
"math"
"strconv"
"sync"

"github.com/DataDog/datadog-go/statsd"
Expand All @@ -26,10 +28,11 @@ const (

// collector implements statsd.Client
type statsExporter struct {
opts Options
client *statsd.Client
mu sync.Mutex // mu guards viewData
viewData map[string]*view.Data
opts Options
client *statsd.Client
mu sync.Mutex // mu guards viewData
viewData map[string]*view.Data
percentiles []float64
}

func newStatsExporter(o Options) (*statsExporter, error) {
Expand All @@ -43,10 +46,23 @@ func newStatsExporter(o Options) (*statsExporter, error) {
return nil, err
}

percentiles := make([]float64, len(o.HistogramPercentiles))
for n, percentileStr := range o.HistogramPercentiles {
percentile, err := strconv.ParseFloat(percentileStr, 64)
if err != nil {
return nil, fmt.Errorf("'HistogramPercentiles' must be in float format: Received %s", percentileStr)
}
if percentile < 0 || percentile > 1 {
return nil, fmt.Errorf("'HistogramPercentiles' must be between 0 and 1: Received %f", percentile)
}
percentiles[n] = percentile
}

return &statsExporter{
opts: o,
viewData: make(map[string]*view.Data),
client: client,
opts: o,
viewData: make(map[string]*view.Data),
client: client,
percentiles: percentiles,
}, nil
}

Expand Down Expand Up @@ -92,17 +108,47 @@ func (s *statsExporter) submitMetric(v *view.View, row *view.Row, metricName str
"avg": data.Mean,
"squared_dev_sum": data.SumOfSquaredDev,
}
for _, percentile := range s.percentiles {
metrics[buildMetricNameForPercentile(percentile)] = calculatePercentile(percentile, v.Aggregation.Buckets, data.CountPerBucket)
}

for name, value := range metrics {
err = client.Gauge(metricName+"."+name, value, opt.tagMetrics(row.Tags, tags), rate)
}

for x := range data.CountPerBucket {
addlTags := []string{"bucket_idx:" + fmt.Sprint(x)}
err = client.Gauge(metricName+".count_per_bucket", float64(data.CountPerBucket[x]), opt.tagMetrics(row.Tags, addlTags), rate)
if !s.opts.DisableCountPerBuckets {
for x := range data.CountPerBucket {
addlTags := []string{"bucket_idx:" + fmt.Sprint(x)}
err = client.Gauge(metricName+".count_per_bucket", float64(data.CountPerBucket[x]), opt.tagMetrics(row.Tags, addlTags), rate)
}
}
return err
default:
return fmt.Errorf("aggregation %T is not supported", v.Aggregation)
}
}

func calculatePercentile(percentile float64, buckets []float64, countPerBucket []int64) float64 {
cumulativePerBucket := make([]int64, len(countPerBucket))
var sum int64
for n, count := range countPerBucket {
sum += count
cumulativePerBucket[n] = sum
}
atBin := int64(math.Floor(percentile * float64(sum)))

var previousCount int64
for n, count := range cumulativePerBucket {
if atBin >= previousCount && atBin <= count {
return buckets[n]
}
previousCount = count
}
return buckets[len(buckets)-1]
}

func buildMetricNameForPercentile(percentile float64) string {
if percentile > 0.99 {
return fmt.Sprintf("%dpercentile", int64(percentile*1000+0.5))
}
return fmt.Sprintf("%dpercentile", int64(percentile*100+0.5))
}
75 changes: 75 additions & 0 deletions stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package datadog

import (
"fmt"
"math"
"math/rand"
"testing"
"time"

Expand Down Expand Up @@ -103,3 +105,76 @@ func TestNilAggregation(t *testing.T) {
t.Errorf("Expected: %v, Got: %v", fmt.Errorf("aggregation *view.Aggregation is not supported"), actual)
}
}

func Test_calculatePercentile(t *testing.T) {
var buckets []float64
for i := float64(-100); i < 100; i += 0.1 {
buckets = append(buckets, i)
}

// Calculate a normal distribution with a standard deviation of 1.
normalDistribution := calculateNormalDistribution(buckets, 0, 1)

// The following tests can be confirmed using the Cumulative Standard Normal table (https://en.wikipedia.org/wiki/Standard_normal_table#Cumulative).
tsts := []struct {
expected float64
percentile float64
buckets []float64
countsPerBucket []int64
}{
{
0,
0.5,
buckets,
normalDistribution,
},
{
0.69,
0.75,
buckets,
normalDistribution,
},
{
1.67,
0.95,
buckets,
normalDistribution,
},
{
2.33,
0.99,
buckets,
normalDistribution,
},
}

for _, tst := range tsts {
t.Run(fmt.Sprintf("%v", tst.percentile), func(t *testing.T) {
got := calculatePercentile(tst.percentile, tst.buckets, tst.countsPerBucket)

if math.Abs(tst.expected-got) > 0.1 {
t.Errorf("Expected: %v to be within 0.1 of %v", tst.expected, got)
}
})

}
}

// Given a seed and a set of latency buckets, uses rand.NormFloat64 to generate a normal distribution
func calculateNormalDistribution(buckets []float64, seed int64, standardDeviation float64) []int64 {
r := rand.New(rand.NewSource(seed))

normalDistribution := make([]int64, len(buckets))
for n := 0; n < 1e6; n++ {
rnd := r.NormFloat64() * standardDeviation
var previousBucket float64
for bidx, bucket := range buckets {
if rnd > previousBucket && rnd <= bucket {
normalDistribution[bidx]++
break
}
previousBucket = bucket
}
}
return normalDistribution
}