Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions find/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func NewCached(config *config.Config, body []byte) *Find {
}
}

func New(config *config.Config, ctx context.Context, query string, stat *finder.FinderStat) (*Find, error) {
res, err := finder.Find(config, ctx, query, 0, 0, stat)
func New(config *config.Config, ctx context.Context, query string) (*Find, error) {
res, err := finder.Find(config, ctx, query, 0, 0)
if err != nil {
return nil, err
}
Expand Down
5 changes: 2 additions & 3 deletions find/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/go-graphite/carbonapi/pkg/parser"
v3pb "github.com/go-graphite/protocol/carbonapi_v3_pb"
"github.com/lomik/graphite-clickhouse/config"
"github.com/lomik/graphite-clickhouse/finder"
"github.com/lomik/graphite-clickhouse/helper/clickhouse"
"github.com/lomik/graphite-clickhouse/helper/utils"
"github.com/lomik/graphite-clickhouse/logs"
Expand Down Expand Up @@ -41,7 +40,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

var (
metricsCount int64
stat finder.FinderStat
stat metrics.FinderStat
queueFail bool
queueDuration time.Duration
findCache bool
Expand Down Expand Up @@ -195,7 +194,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}()
}

f, err := New(h.config, r.Context(), query, &stat)
f, err := New(h.config, r.Context(), query)

if entered {
// release early as possible
Expand Down
13 changes: 12 additions & 1 deletion finder/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/lomik/graphite-clickhouse/config"
"github.com/lomik/graphite-clickhouse/helper/clickhouse"
"github.com/lomik/graphite-clickhouse/metrics"
"github.com/lomik/graphite-clickhouse/pkg/scope"
"github.com/lomik/graphite-clickhouse/pkg/where"
)
Expand All @@ -20,13 +21,15 @@ type BaseFinder struct {
table string // graphite_tree table
opts clickhouse.Options // timeout, connectTimeout
body []byte // clickhouse response body
stats []metrics.FinderStat
}

func NewBase(url string, table string, opts clickhouse.Options) Finder {
return &BaseFinder{
url: url,
table: table,
opts: opts,
stats: make([]metrics.FinderStat, 0),
}
}

Expand All @@ -40,8 +43,12 @@ func (b *BaseFinder) where(query string) *where.Where {
return w
}

func (b *BaseFinder) Execute(ctx context.Context, config *config.Config, query string, from int64, until int64, stat *FinderStat) (err error) {
func (b *BaseFinder) Execute(ctx context.Context, config *config.Config, query string, from int64, until int64) (err error) {
w := b.where(query)

b.stats = append(b.stats, metrics.FinderStat{})
stat := &b.stats[len(b.stats)-1]

b.body, stat.ChReadRows, stat.ChReadBytes, err = clickhouse.Query(
scope.WithTable(ctx, b.table),
b.url,
Expand Down Expand Up @@ -101,3 +108,7 @@ func (b *BaseFinder) Abs(v []byte) []byte {
func (b *BaseFinder) Bytes() ([]byte, error) {
return b.body, nil
}

func (b *BaseFinder) Stats() []metrics.FinderStat {
return b.stats
}
9 changes: 7 additions & 2 deletions finder/blacklist.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"regexp"

"github.com/lomik/graphite-clickhouse/config"
"github.com/lomik/graphite-clickhouse/metrics"
)

type BlacklistFinder struct {
Expand All @@ -20,15 +21,15 @@ func WrapBlacklist(f Finder, blacklist []*regexp.Regexp) *BlacklistFinder {
}
}

func (p *BlacklistFinder) Execute(ctx context.Context, config *config.Config, query string, from int64, until int64, stat *FinderStat) (err error) {
func (p *BlacklistFinder) Execute(ctx context.Context, config *config.Config, query string, from int64, until int64) (err error) {
for i := 0; i < len(p.blacklist); i++ {
if p.blacklist[i].MatchString(query) {
p.matched = true
return
}
}

return p.wrapped.Execute(ctx, config, query, from, until, stat)
return p.wrapped.Execute(ctx, config, query, from, until)
}

func (p *BlacklistFinder) List() [][]byte {
Expand All @@ -55,3 +56,7 @@ func (p *BlacklistFinder) Abs(v []byte) []byte {
func (p *BlacklistFinder) Bytes() ([]byte, error) {
return nil, ErrNotImplemented
}

func (p *BlacklistFinder) Stats() []metrics.FinderStat {
return p.wrapped.Stats()
}
6 changes: 5 additions & 1 deletion finder/date.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/lomik/graphite-clickhouse/config"
"github.com/lomik/graphite-clickhouse/helper/clickhouse"
"github.com/lomik/graphite-clickhouse/metrics"
"github.com/lomik/graphite-clickhouse/pkg/scope"
"github.com/lomik/graphite-clickhouse/pkg/where"
)
Expand All @@ -30,7 +31,7 @@ func NewDateFinder(url string, table string, tableVersion int, opts clickhouse.O
return &DateFinder{b, tableVersion}
}

func (b *DateFinder) Execute(ctx context.Context, config *config.Config, query string, from int64, until int64, stat *FinderStat) (err error) {
func (b *DateFinder) Execute(ctx context.Context, config *config.Config, query string, from int64, until int64) (err error) {
w := b.where(query)

dateWhere := where.New()
Expand All @@ -40,6 +41,9 @@ func (b *DateFinder) Execute(ctx context.Context, config *config.Config, query s
time.Unix(until, 0).Format("2006-01-02"),
)

b.stats = append(b.stats, metrics.FinderStat{})
stat := &b.stats[len(b.stats)-1]

if b.tableVersion == 2 {
b.body, stat.ChReadRows, stat.ChReadBytes, err = clickhouse.Query(
scope.WithTable(ctx, b.table),
Expand Down
7 changes: 6 additions & 1 deletion finder/date_reverse.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/lomik/graphite-clickhouse/config"
"github.com/lomik/graphite-clickhouse/helper/clickhouse"
"github.com/lomik/graphite-clickhouse/helper/date"
"github.com/lomik/graphite-clickhouse/metrics"
"github.com/lomik/graphite-clickhouse/pkg/scope"
"github.com/lomik/graphite-clickhouse/pkg/where"
)
Expand Down Expand Up @@ -39,8 +40,12 @@ func (f *DateFinderV3) whereFilter(query string, from int64, until int64) (*wher
return w, dateWhere
}

func (f *DateFinderV3) Execute(ctx context.Context, config *config.Config, query string, from int64, until int64, stat *FinderStat) (err error) {
func (f *DateFinderV3) Execute(ctx context.Context, config *config.Config, query string, from int64, until int64) (err error) {
w, dateWhere := f.whereFilter(query, from, until)

f.stats = append(f.stats, metrics.FinderStat{})
stat := &f.stats[len(f.stats)-1]

f.body, stat.ChReadRows, stat.ChReadBytes, err = clickhouse.Query(
scope.WithTable(ctx, f.table),
f.url,
Expand Down
27 changes: 9 additions & 18 deletions finder/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strings"

"github.com/lomik/graphite-clickhouse/helper/clickhouse"
"github.com/lomik/graphite-clickhouse/metrics"

"github.com/lomik/graphite-clickhouse/config"
)
Expand All @@ -14,18 +15,11 @@ type Result interface {
Series() [][]byte
Abs([]byte) []byte
Bytes() ([]byte, error)
Stats() []metrics.FinderStat
}

type FinderStat struct {
ReadBytes int64
ChReadRows int64
ChReadBytes int64
Table string
}

type Finder interface {
Result
Execute(ctx context.Context, config *config.Config, query string, from int64, until int64, stat *FinderStat) error
Execute(ctx context.Context, config *config.Config, query string, from int64, until int64) error
}

func newPlainFinder(ctx context.Context, config *config.Config, query string, from int64, until int64, useCache bool) Finder {
Expand Down Expand Up @@ -108,15 +102,12 @@ func newPlainFinder(ctx context.Context, config *config.Config, query string, fr
return f
}

func Find(config *config.Config, ctx context.Context, query string, from int64, until int64, stat *FinderStat) (Result, error) {
func Find(config *config.Config, ctx context.Context, query string, from int64, until int64) (Result, error) {
fnd := newPlainFinder(ctx, config, query, from, until, config.Common.FindCache != nil)

err := fnd.Execute(ctx, config, query, from, until, stat)
if err != nil {
return nil, err
}
err := fnd.Execute(ctx, config, query, from, until)

return fnd.(Result), nil
return fnd.(Result), err
}

// Leaf strips last dot and detect IsLeaf
Expand All @@ -128,7 +119,7 @@ func Leaf(value []byte) ([]byte, bool) {
return value, true
}

func FindTagged(ctx context.Context, config *config.Config, terms []TaggedTerm, from int64, until int64, stat *FinderStat) (Result, error) {
func FindTagged(ctx context.Context, config *config.Config, terms []TaggedTerm, from int64, until int64) (Result, error) {
opts := clickhouse.Options{
Timeout: config.ClickHouse.IndexTimeout,
ConnectTimeout: config.ClickHouse.ConnectTimeout,
Expand All @@ -141,7 +132,7 @@ func FindTagged(ctx context.Context, config *config.Config, terms []TaggedTerm,
if plain != nil {
plain.wrappedPlain = newPlainFinder(ctx, config, plain.Target(), from, until, useCache)

err := plain.Execute(ctx, config, plain.Target(), from, until, stat)
err := plain.Execute(ctx, config, plain.Target(), from, until)
if err != nil {
return nil, err
}
Expand All @@ -161,7 +152,7 @@ func FindTagged(ctx context.Context, config *config.Config, terms []TaggedTerm,
config.ClickHouse.TaggedCosts,
)

err := fnd.ExecutePrepared(ctx, terms, from, until, stat)
err := fnd.ExecutePrepared(ctx, terms, from, until)
if err != nil {
return nil, err
}
Expand Down
12 changes: 11 additions & 1 deletion finder/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/lomik/graphite-clickhouse/helper/clickhouse"
"github.com/lomik/graphite-clickhouse/helper/date"
"github.com/lomik/graphite-clickhouse/helper/errs"
"github.com/lomik/graphite-clickhouse/metrics"
"github.com/lomik/graphite-clickhouse/pkg/scope"
"github.com/lomik/graphite-clickhouse/pkg/where"
)
Expand All @@ -37,6 +38,7 @@ type IndexFinder struct {
reverse uint8 // calculated in IndexFinder.useReverse only once
body []byte // clickhouse response body
rows [][]byte
stats []metrics.FinderStat
useCache bool // rotate body if needed (for store in cache)
useDaily bool
}
Expand All @@ -59,6 +61,7 @@ func NewIndex(url string, table string, dailyEnabled bool, reverse string, rever
dailyEnabled: dailyEnabled,
confReverse: config.IndexReverse[reverse],
confReverses: reverses,
stats: make([]metrics.FinderStat, 0),
useCache: useCache,
}
}
Expand Down Expand Up @@ -193,14 +196,17 @@ func validatePlainQuery(query string, wildcardMinDistance int) error {
return nil
}

func (idx *IndexFinder) Execute(ctx context.Context, config *config.Config, query string, from int64, until int64, stat *FinderStat) (err error) {
func (idx *IndexFinder) Execute(ctx context.Context, config *config.Config, query string, from int64, until int64) (err error) {
err = validatePlainQuery(query, config.ClickHouse.WildcardMinDistance)
if err != nil {
return err
}

w := idx.whereFilter(query, from, until)

idx.stats = append(idx.stats, metrics.FinderStat{})
stat := &idx.stats[len(idx.stats)-1]

idx.body, stat.ChReadRows, stat.ChReadBytes, err = clickhouse.Query(
scope.WithTable(ctx, idx.table),
idx.url,
Expand Down Expand Up @@ -292,3 +298,7 @@ func (idx *IndexFinder) Series() [][]byte {
func (idx *IndexFinder) Bytes() ([]byte, error) {
return idx.body, nil
}

func (idx *IndexFinder) Stats() []metrics.FinderStat {
return idx.stats
}
7 changes: 6 additions & 1 deletion finder/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"

"github.com/lomik/graphite-clickhouse/config"
"github.com/lomik/graphite-clickhouse/metrics"
)

// MockFinder is used for testing purposes
Expand All @@ -29,7 +30,7 @@ func NewMockTagged(result [][]byte) *MockFinder {
}

// Execute assigns given query to the query field
func (m *MockFinder) Execute(ctx context.Context, config *config.Config, query string, from int64, until int64, stat *FinderStat) (err error) {
func (m *MockFinder) Execute(ctx context.Context, config *config.Config, query string, from int64, until int64) (err error) {
m.query = query
return
}
Expand Down Expand Up @@ -58,3 +59,7 @@ func (m *MockFinder) Strings() []string {
body, _ := m.fnd.Bytes()
return strings.Split(string(body), "\n")
}

func (m *MockFinder) Stats() []metrics.FinderStat {
return m.fnd.Stats()
}
9 changes: 7 additions & 2 deletions finder/plain_from_tagged.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"

"github.com/lomik/graphite-clickhouse/config"
"github.com/lomik/graphite-clickhouse/metrics"
)

// Special finder for query plain graphite from prometheus
Expand Down Expand Up @@ -67,8 +68,8 @@ func (f *plainFromTaggedFinder) Target() string {
return f.target
}

func (f *plainFromTaggedFinder) Execute(ctx context.Context, config *config.Config, query string, from int64, until int64, stat *FinderStat) error {
return f.wrappedPlain.Execute(ctx, config, query, from, until, stat)
func (f *plainFromTaggedFinder) Execute(ctx context.Context, config *config.Config, query string, from int64, until int64) error {
return f.wrappedPlain.Execute(ctx, config, query, from, until)
}

// For Render
Expand Down Expand Up @@ -129,3 +130,7 @@ func (f *plainFromTaggedFinder) List() [][]byte {
func (f *plainFromTaggedFinder) Bytes() ([]byte, error) {
return nil, ErrNotImplemented
}

func (f *plainFromTaggedFinder) Stats() []metrics.FinderStat {
return f.wrappedPlain.Stats()
}
9 changes: 7 additions & 2 deletions finder/prefix.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"

"github.com/lomik/graphite-clickhouse/config"
"github.com/lomik/graphite-clickhouse/metrics"
"github.com/lomik/graphite-clickhouse/pkg/where"
)

Expand Down Expand Up @@ -42,7 +43,7 @@ func WrapPrefix(f Finder, prefix string) *PrefixFinder {
}
}

func (p *PrefixFinder) Execute(ctx context.Context, config *config.Config, query string, from int64, until int64, stat *FinderStat) error {
func (p *PrefixFinder) Execute(ctx context.Context, config *config.Config, query string, from int64, until int64) error {
qs := strings.Split(query, ".")

// check regexp
Expand Down Expand Up @@ -76,7 +77,7 @@ func (p *PrefixFinder) Execute(ctx context.Context, config *config.Config, query

p.matched = PrefixMatched

return p.wrapped.Execute(ctx, config, strings.Join(qs[len(ps):], "."), from, until, stat)
return p.wrapped.Execute(ctx, config, strings.Join(qs[len(ps):], "."), from, until)
}

func (p *PrefixFinder) List() [][]byte {
Expand Down Expand Up @@ -118,3 +119,7 @@ func (p *PrefixFinder) Abs(value []byte) []byte {
func (p *PrefixFinder) Bytes() ([]byte, error) {
return nil, ErrNotImplemented
}

func (p *PrefixFinder) Stats() []metrics.FinderStat {
return p.wrapped.Stats()
}
Loading
Loading