diff --git a/autocomplete/autocomplete.go b/autocomplete/autocomplete.go index 4e50c54b..440fb122 100644 --- a/autocomplete/autocomplete.go +++ b/autocomplete/autocomplete.go @@ -367,9 +367,11 @@ func (h *Handler) ServeTags(w http.ResponseWriter, r *http.Request) { h.config.ClickHouse.URL, sql, clickhouse.Options{ - TLSConfig: h.config.ClickHouse.TLSConfig, - Timeout: h.config.ClickHouse.IndexTimeout, - ConnectTimeout: h.config.ClickHouse.ConnectTimeout, + TLSConfig: h.config.ClickHouse.TLSConfig, + Timeout: h.config.ClickHouse.IndexTimeout, + ConnectTimeout: h.config.ClickHouse.ConnectTimeout, + CheckRequestProgress: h.config.FeatureFlags.LogQueryProgress, + ProgressSendingInterval: h.config.ClickHouse.ProgressSendingInterval, }, nil, ) @@ -639,9 +641,11 @@ func (h *Handler) ServeValues(w http.ResponseWriter, r *http.Request) { h.config.ClickHouse.URL, sql, clickhouse.Options{ - TLSConfig: h.config.ClickHouse.TLSConfig, - Timeout: h.config.ClickHouse.IndexTimeout, - ConnectTimeout: h.config.ClickHouse.ConnectTimeout, + TLSConfig: h.config.ClickHouse.TLSConfig, + Timeout: h.config.ClickHouse.IndexTimeout, + ConnectTimeout: h.config.ClickHouse.ConnectTimeout, + CheckRequestProgress: h.config.FeatureFlags.LogQueryProgress, + ProgressSendingInterval: h.config.ClickHouse.ProgressSendingInterval, }, nil, ) diff --git a/config/config.go b/config/config.go index d6fb297a..ea60891e 100644 --- a/config/config.go +++ b/config/config.go @@ -123,6 +123,7 @@ type Common struct { type FeatureFlags struct { UseCarbonBehavior bool `toml:"use-carbon-behaviour" json:"use-carbon-behaviour" comment:"if true, prefers carbon's behaviour on how tags are treated"` DontMatchMissingTags bool `toml:"dont-match-missing-tags" json:"dont-match-missing-tags" comment:"if true, seriesByTag terms containing '!=' or '!=~' operators will not match metrics that don't have the tag at all"` + LogQueryProgress bool `toml:"log-query-progress" json:"log-query-progress" comment:"if true, gch will log affected rows count by clickhouse query"` } // IndexReverseRule contains rules to use direct or reversed request to index table @@ -210,6 +211,8 @@ type ClickHouse struct { DataTimeout time.Duration `toml:"data-timeout" json:"data-timeout" comment:"default total timeout to fetch data, can be overwritten with query-params"` QueryParams []QueryParam `toml:"query-params" json:"query-params" comment:"customized query params (url, data timeout, limiters) for durations greater or equal"` + ProgressSendingInterval time.Duration `toml:"progress-sending-interval" json:"progress-sending-interval" comment:"time interval for ch query progress sending, it's equal to http_headers_progress_interval_ms header"` + RenderMaxQueries int `toml:"render-max-queries" json:"render-max-queries" comment:"Max queries to render queiries"` RenderConcurrentQueries int `toml:"render-concurrent-queries" json:"render-concurrent-queries" comment:"Concurrent queries to render queiries"` RenderAdaptiveQueries int `toml:"render-adaptive-queries" json:"render-adaptive-queries" comment:"Render adaptive queries (based on load average) for increase/decrease concurrent queries"` @@ -390,24 +393,25 @@ func New() *Config { DegragedLoad: 1.0, }, ClickHouse: ClickHouse{ - URL: "http://localhost:8123?cancel_http_readonly_queries_on_client_close=1", - DataTimeout: time.Minute, - IndexTable: "graphite_index", - IndexUseDaily: true, - TaggedUseDaily: true, - IndexReverse: "auto", - IndexReverses: IndexReverses{}, - IndexTimeout: time.Minute, - TaggedTable: "graphite_tagged", - TaggedAutocompleDays: 7, - ExtraPrefix: "", - ConnectTimeout: time.Second, - DataTableLegacy: "", - RollupConfLegacy: "auto", - MaxDataPoints: 1048576, - InternalAggregation: true, - FindLimiter: limiter.NoopLimiter{}, - TagsLimiter: limiter.NoopLimiter{}, + URL: "http://localhost:8123?cancel_http_readonly_queries_on_client_close=1", + DataTimeout: time.Minute, + ProgressSendingInterval: 10 * time.Second, + IndexTable: "graphite_index", + IndexUseDaily: true, + TaggedUseDaily: true, + IndexReverse: "auto", + IndexReverses: IndexReverses{}, + IndexTimeout: time.Minute, + TaggedTable: "graphite_tagged", + TaggedAutocompleDays: 7, + ExtraPrefix: "", + ConnectTimeout: time.Second, + DataTableLegacy: "", + RollupConfLegacy: "auto", + MaxDataPoints: 1048576, + InternalAggregation: true, + FindLimiter: limiter.NoopLimiter{}, + TagsLimiter: limiter.NoopLimiter{}, }, Tags: Tags{ Threads: 1, diff --git a/config/config_test.go b/config/config_test.go index c67ec336..4718891c 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -368,26 +368,27 @@ sample-thereafter = 12 Limiter: limiter.NoopLimiter{}, }, }, - FindLimiter: limiter.NoopLimiter{}, - TagsLimiter: limiter.NoopLimiter{}, - IndexTable: "graphite_index", - IndexReverse: "direct", - IndexReverses: make(IndexReverses, 2), - IndexTimeout: 4000000000, - TaggedTable: "graphite_tags", - TaggedAutocompleDays: 5, - TreeTable: "tree", - ReverseTreeTable: "reversed_tree", - DateTreeTable: "data_tree", - DateTreeTableVersion: 2, - TreeTimeout: 5000000000, - TagTable: "tag_table", - ExtraPrefix: "tum.pu-dum", - ConnectTimeout: 2000000000, - DataTableLegacy: "data", - RollupConfLegacy: "none", - MaxDataPoints: 8000, - InternalAggregation: true, + ProgressSendingInterval: 10 * time.Second, + FindLimiter: limiter.NoopLimiter{}, + TagsLimiter: limiter.NoopLimiter{}, + IndexTable: "graphite_index", + IndexReverse: "direct", + IndexReverses: make(IndexReverses, 2), + IndexTimeout: 4000000000, + TaggedTable: "graphite_tags", + TaggedAutocompleDays: 5, + TreeTable: "tree", + ReverseTreeTable: "reversed_tree", + DateTreeTable: "data_tree", + DateTreeTableVersion: 2, + TreeTimeout: 5000000000, + TagTable: "tag_table", + ExtraPrefix: "tum.pu-dum", + ConnectTimeout: 2000000000, + DataTableLegacy: "data", + RollupConfLegacy: "none", + MaxDataPoints: 8000, + InternalAggregation: true, } expected.ClickHouse.IndexReverses[0] = &IndexReverseRule{"suf", "pref", "", nil, "direct"} r, _ = regexp.Compile("^reg$") @@ -647,6 +648,7 @@ sample-thereafter = 12 Limiter: limiter.NoopLimiter{}, }, }, + ProgressSendingInterval: 10 * time.Second, RenderMaxQueries: 1000, RenderConcurrentQueries: 10, FindMaxQueries: 200, @@ -970,6 +972,7 @@ sample-thereafter = 12 AdaptiveQueries: 6, }, }, + ProgressSendingInterval: 10 * time.Second, RenderMaxQueries: 1000, RenderConcurrentQueries: 10, RenderAdaptiveQueries: 4, diff --git a/doc/config.md b/doc/config.md index 05f79ca1..f0416297 100644 --- a/doc/config.md +++ b/doc/config.md @@ -290,6 +290,8 @@ Note that this option only works for terms with '=' operator in them. use-carbon-behaviour = false # if true, seriesByTag terms containing '!=' or '!=~' operators will not match metrics that don't have the tag at all dont-match-missing-tags = false + # if true, gch will log affected rows count by clickhouse query + log-query-progress = false [metrics] # graphite relay address @@ -320,6 +322,8 @@ Note that this option only works for terms with '=' operator in them. url = "http://localhost:8123?cancel_http_readonly_queries_on_client_close=1" # default total timeout to fetch data, can be overwritten with query-params data-timeout = "1m0s" + # time interval for ch query progress sending, it's equal to http_headers_progress_interval_ms header + progress-sending-interval = "10s" # Max queries to render queiries render-max-queries = 0 # Concurrent queries to render queiries diff --git a/finder/finder.go b/finder/finder.go index 348316c0..58c71465 100644 --- a/finder/finder.go +++ b/finder/finder.go @@ -24,9 +24,11 @@ type Finder interface { func newPlainFinder(ctx context.Context, config *config.Config, query string, from int64, until int64, useCache bool) Finder { opts := clickhouse.Options{ - TLSConfig: config.ClickHouse.TLSConfig, - Timeout: config.ClickHouse.IndexTimeout, - ConnectTimeout: config.ClickHouse.ConnectTimeout, + TLSConfig: config.ClickHouse.TLSConfig, + Timeout: config.ClickHouse.IndexTimeout, + ConnectTimeout: config.ClickHouse.ConnectTimeout, + CheckRequestProgress: config.FeatureFlags.LogQueryProgress, + ProgressSendingInterval: config.ClickHouse.ProgressSendingInterval, } var f Finder @@ -121,9 +123,11 @@ func Leaf(value []byte) ([]byte, bool) { func FindTagged(ctx context.Context, config *config.Config, terms []TaggedTerm, from int64, until int64) (Result, error) { opts := clickhouse.Options{ - Timeout: config.ClickHouse.IndexTimeout, - ConnectTimeout: config.ClickHouse.ConnectTimeout, - TLSConfig: config.ClickHouse.TLSConfig, + Timeout: config.ClickHouse.IndexTimeout, + ConnectTimeout: config.ClickHouse.ConnectTimeout, + TLSConfig: config.ClickHouse.TLSConfig, + CheckRequestProgress: config.FeatureFlags.LogQueryProgress, + ProgressSendingInterval: config.ClickHouse.ProgressSendingInterval, } useCache := config.Common.FindCache != nil diff --git a/finder/tagged_test.go b/finder/tagged_test.go index 8b8d2bb2..792b2f94 100644 --- a/finder/tagged_test.go +++ b/finder/tagged_test.go @@ -642,9 +642,11 @@ func TestParseSeriesByTagWithCostsFromCountTable(t *testing.T) { srv.AddResponce(sql, response) opts := clickhouse.Options{ - Timeout: cfg.ClickHouse.IndexTimeout, - ConnectTimeout: cfg.ClickHouse.ConnectTimeout, - TLSConfig: cfg.ClickHouse.TLSConfig, + Timeout: cfg.ClickHouse.IndexTimeout, + ConnectTimeout: cfg.ClickHouse.ConnectTimeout, + TLSConfig: cfg.ClickHouse.TLSConfig, + CheckRequestProgress: cfg.FeatureFlags.LogQueryProgress, + ProgressSendingInterval: cfg.ClickHouse.ProgressSendingInterval, } taggedFinder := NewTagged( diff --git a/helper/clickhouse/clickhouse.go b/helper/clickhouse/clickhouse.go index 5f677b85..dbe7ed95 100644 --- a/helper/clickhouse/clickhouse.go +++ b/helper/clickhouse/clickhouse.go @@ -19,6 +19,7 @@ import ( "time" "github.com/lomik/graphite-clickhouse/helper/errs" + httpHelper "github.com/lomik/graphite-clickhouse/helper/http" "github.com/lomik/graphite-clickhouse/limiter" "github.com/lomik/graphite-clickhouse/pkg/scope" @@ -39,6 +40,11 @@ const ( ContentEncodingZstd ContentEncoding = "zstd" ) +const ( + ClickHouseProgressHeader string = "X-Clickhouse-Progress" + ClickHouseSummaryHeader string = "X-Clickhouse-Summary" +) + func NewErrWithDescr(err string, data string) error { return &ErrWithDescr{err, data} } @@ -159,9 +165,11 @@ func HandleError(w http.ResponseWriter, err error) (status int, queueFail bool) } type Options struct { - TLSConfig *tls.Config - Timeout time.Duration - ConnectTimeout time.Duration + TLSConfig *tls.Config + Timeout time.Duration + ConnectTimeout time.Duration + ProgressSendingInterval time.Duration + CheckRequestProgress bool } type LoggedReader struct { @@ -203,6 +211,13 @@ func (r *LoggedReader) ChReadBytes() int64 { return r.read_bytes } +type queryStats struct { + readRows int64 + readBytes int64 + loggerFields []zapcore.Field + rawHeader string +} + func formatSQL(q string) string { s := strings.Split(q, "\n") for i := 0; i < len(s); i++ { @@ -274,7 +289,7 @@ func reader(ctx context.Context, dsn string, query string, postBody io.Reader, e // Get X-Clickhouse-Summary header // TODO: remove when https://github.com/ClickHouse/ClickHouse/issues/16207 is done q.Set("send_progress_in_http_headers", "1") - q.Set("http_headers_progress_interval_ms", "10000") + q.Set("http_headers_progress_interval_ms", strconv.FormatInt(opts.ProgressSendingInterval.Milliseconds(), 10)) p.RawQuery = q.Encode() var contentHeader string @@ -320,56 +335,47 @@ func reader(ctx context.Context, dsn string, query string, postBody io.Reader, e return nil, fmt.Errorf("unknown encoding: %s", encoding) } - client := &http.Client{ - Timeout: opts.Timeout, - Transport: &http.Transport{ - Dial: (&net.Dialer{ - Timeout: opts.ConnectTimeout, - }).Dial, - TLSClientConfig: opts.TLSConfig, - DisableKeepAlives: true, - }, + var resp *http.Response + if opts.CheckRequestProgress { + resp, err = sendRequestWithProgressCheck(req, &opts) + } else { + resp, err = sendRequestViaDefaultClient(req, &opts) } - resp, err := client.Do(req) if err != nil { + if opts.CheckRequestProgress && resp != nil { + stats, parse_err := getQueryStats(resp, ClickHouseProgressHeader) + if parse_err != nil { + logger.Warn("query", zap.Error(err), zap.String("clickhouse-progress", stats.rawHeader)) + } + + logger = logger.With(stats.loggerFields...) + } + return } // chproxy overwrite our query id. So read it again chQueryID = resp.Header.Get("X-ClickHouse-Query-Id") - summaryHeader := resp.Header.Get("X-Clickhouse-Summary") - read_rows := int64(-1) - read_bytes := int64(-1) + stats, err := getQueryStats(resp, ClickHouseSummaryHeader) + if err != nil { + summaryHeader := resp.Header.Get(ClickHouseSummaryHeader) + logger.Warn("query", + zap.Error(err), + zap.String("clickhouse-summary", summaryHeader)) - if len(summaryHeader) > 0 { - summary := make(map[string]string) - err = json.Unmarshal([]byte(summaryHeader), &summary) - - if err == nil { - // TODO: use in carbon metrics sender when it will be implemented - fields := make([]zapcore.Field, 0, len(summary)) - for k, v := range summary { - fields = append(fields, zap.String(k, v)) - - switch k { - case "read_rows": - read_rows, _ = strconv.ParseInt(v, 10, 64) - case "read_bytes": - read_bytes, _ = strconv.ParseInt(v, 10, 64) - } - } + err = nil + } - sort.Slice(fields, func(i int, j int) bool { - return fields[i].Key < fields[j].Key - }) + read_rows, read_bytes, fields := stats.readRows, stats.readBytes, stats.loggerFields - logger = logger.With(fields...) - } else { - logger.Warn("query", zap.Error(err), zap.String("clickhouse-summary", summaryHeader)) - err = nil - } + if len(fields) > 0 { + sort.Slice(fields, func(i, j int) bool { + return fields[i].Key < fields[j].Key + }) + + logger = logger.With(fields...) } // check for return 5xx error, may be 502 code if clickhouse accesed via reverse proxy @@ -399,6 +405,95 @@ func reader(ctx context.Context, dsn string, query string, postBody io.Reader, e return } +func getQueryStats(resp *http.Response, statsHeaderName string) (queryStats, error) { + read_rows := int64(-1) + read_bytes := int64(-1) + + if resp == nil { + return queryStats{ + readRows: read_rows, + readBytes: read_bytes, + loggerFields: []zapcore.Field{}, + }, nil + } + + statsHeader := "" + statsHeaders := resp.Header.Values(statsHeaderName) + + if len(statsHeaders) > 0 { + statsHeader = statsHeaders[len(statsHeaders)-1] + } else { + return queryStats{ + readRows: read_rows, + readBytes: read_bytes, + loggerFields: []zapcore.Field{}, + }, nil + } + + stats := make(map[string]string) + + err := json.Unmarshal([]byte(statsHeader), &stats) + if err != nil { + return queryStats{ + readRows: read_rows, + readBytes: read_bytes, + loggerFields: []zapcore.Field{}, + rawHeader: statsHeader, + }, err + } + + // TODO: use in carbon metrics sender when it will be implemented + fields := make([]zapcore.Field, 0, len(stats)) + for k, v := range stats { + fields = append(fields, zap.String(k, v)) + + switch k { + case "read_rows": + read_rows, _ = strconv.ParseInt(v, 10, 64) + case "read_bytes": + read_bytes, _ = strconv.ParseInt(v, 10, 64) + } + } + + sort.Slice(fields, func(i int, j int) bool { + return fields[i].Key < fields[j].Key + }) + + return queryStats{ + readRows: read_rows, + readBytes: read_bytes, + loggerFields: fields, + rawHeader: statsHeader, + }, nil +} + +func sendRequestViaDefaultClient(request *http.Request, opts *Options) (*http.Response, error) { + client := &http.Client{ + Timeout: opts.Timeout, + Transport: &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: opts.ConnectTimeout, + }).DialContext, + TLSClientConfig: opts.TLSConfig, + DisableKeepAlives: true, + }, + } + + return client.Do(request) +} + +func sendRequestWithProgressCheck(request *http.Request, opts *Options) (*http.Response, error) { + transport := &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: opts.ConnectTimeout, + }).DialContext, + TLSClientConfig: opts.TLSConfig, + DisableKeepAlives: true, + } + + return httpHelper.DoHTTPOverTCP(request.Context(), transport, request, opts.Timeout) +} + func do(ctx context.Context, dsn string, query string, postBody io.Reader, encoding ContentEncoding, opts Options, extData *ExternalData) ([]byte, int64, int64, error) { bodyReader, err := reader(ctx, dsn, query, postBody, encoding, opts, extData) if err != nil { diff --git a/helper/http/live-http-client.go b/helper/http/live-http-client.go new file mode 100644 index 00000000..110c5901 --- /dev/null +++ b/helper/http/live-http-client.go @@ -0,0 +1,60 @@ +package http + +import ( + "bufio" + "bytes" + "context" + "io" + "net" + "net/http" + "time" +) + +const TCPNetwork string = "tcp" + +func DoHTTPOverTCP(ctx context.Context, transport *http.Transport, req *http.Request, timeout time.Duration) (*http.Response, error) { + conn, err := transport.DialContext(ctx, TCPNetwork, req.URL.Host) + if err != nil { + return nil, err + } + + err = conn.SetDeadline(time.Now().Add(timeout)) + if err != nil { + return nil, err + } + + err = req.Write(conn) + if err != nil { + return nil, err + } + + var backup_buf bytes.Buffer + reader := bufio.NewReader(io.TeeReader(conn, &backup_buf)) + + for { + line, err := reader.ReadString('\n') + if err != nil { + if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + fake_body_delimer := bytes.NewBuffer([]byte{'\r', '\n', '\r', '\n'}) + + resp, err := http.ReadResponse(bufio.NewReader(io.MultiReader(&backup_buf, fake_body_delimer)), nil) + if err != nil { + return nil, err + } + + return resp, netErr + } + + return nil, err + } + + if line == "\r\n" { + break + } + } + + full_resp_stream := io.MultiReader(&backup_buf, conn) + resp, err := http.ReadResponse(bufio.NewReader(full_resp_stream), nil) + + return resp, err +} diff --git a/helper/rollup/remote.go b/helper/rollup/remote.go index 1826f7ac..13366178 100644 --- a/helper/rollup/remote.go +++ b/helper/rollup/remote.go @@ -144,9 +144,11 @@ func RemoteLoad(addr string, tlsConf *tls.Config, table string) (*Rules, error) addr, query, clickhouse.Options{ - Timeout: timeoutRulesLoad, - ConnectTimeout: timeoutRulesLoad, - TLSConfig: tlsConf, + Timeout: timeoutRulesLoad, + ConnectTimeout: timeoutRulesLoad, + TLSConfig: tlsConf, + CheckRequestProgress: false, + ProgressSendingInterval: 10 * time.Second, }, nil, ) @@ -173,9 +175,11 @@ func RemoteLoad(addr string, tlsConf *tls.Config, table string) (*Rules, error) addr, query, clickhouse.Options{ - Timeout: timeoutRulesLoad, - ConnectTimeout: timeoutRulesLoad, - TLSConfig: tlsConf, + Timeout: timeoutRulesLoad, + ConnectTimeout: timeoutRulesLoad, + TLSConfig: tlsConf, + CheckRequestProgress: false, + ProgressSendingInterval: 10 * time.Second, }, nil, ) diff --git a/index/index.go b/index/index.go index 5ab89c60..f03b5b71 100644 --- a/index/index.go +++ b/index/index.go @@ -26,8 +26,10 @@ func New(config *config.Config, ctx context.Context) (*Index, error) { var err error opts := clickhouse.Options{ - TLSConfig: config.ClickHouse.TLSConfig, - ConnectTimeout: config.ClickHouse.ConnectTimeout, + TLSConfig: config.ClickHouse.TLSConfig, + ConnectTimeout: config.ClickHouse.ConnectTimeout, + CheckRequestProgress: config.FeatureFlags.LogQueryProgress, + ProgressSendingInterval: config.ClickHouse.ProgressSendingInterval, } if config.ClickHouse.IndexTable != "" { opts.Timeout = config.ClickHouse.IndexTimeout diff --git a/prometheus/querier.go b/prometheus/querier.go index 32a1fee2..15e24ef2 100644 --- a/prometheus/querier.go +++ b/prometheus/querier.go @@ -50,9 +50,11 @@ func (q *Querier) LabelValues(ctx context.Context, label string, hints *storage. q.config.ClickHouse.URL, sql, clickhouse.Options{ - TLSConfig: q.config.ClickHouse.TLSConfig, - Timeout: q.config.ClickHouse.IndexTimeout, - ConnectTimeout: q.config.ClickHouse.ConnectTimeout, + TLSConfig: q.config.ClickHouse.TLSConfig, + Timeout: q.config.ClickHouse.IndexTimeout, + ConnectTimeout: q.config.ClickHouse.ConnectTimeout, + CheckRequestProgress: q.config.FeatureFlags.LogQueryProgress, + ProgressSendingInterval: q.config.ClickHouse.ProgressSendingInterval, }, nil, ) @@ -85,9 +87,11 @@ func (q *Querier) LabelNames(ctx context.Context, hints *storage.LabelHints, mat q.config.ClickHouse.URL, sql, clickhouse.Options{ - Timeout: q.config.ClickHouse.IndexTimeout, - ConnectTimeout: q.config.ClickHouse.ConnectTimeout, - TLSConfig: q.config.ClickHouse.TLSConfig, + Timeout: q.config.ClickHouse.IndexTimeout, + ConnectTimeout: q.config.ClickHouse.ConnectTimeout, + TLSConfig: q.config.ClickHouse.TLSConfig, + CheckRequestProgress: q.config.FeatureFlags.LogQueryProgress, + ProgressSendingInterval: q.config.ClickHouse.ProgressSendingInterval, }, nil, ) diff --git a/render/data/query.go b/render/data/query.go index 150e74fd..d65d92a8 100644 --- a/render/data/query.go +++ b/render/data/query.go @@ -58,10 +58,12 @@ type query struct { chTLSConfig *tls.Config chQueryParams []config.QueryParam - chConnectTimeout time.Duration - debugDir string - debugExtDataPerm os.FileMode - lock sync.RWMutex + chConnectTimeout time.Duration + chProgressSendingInterval time.Duration + debugDir string + debugExtDataPerm os.FileMode + featureFlags *config.FeatureFlags + lock sync.RWMutex } type conditions struct { @@ -107,14 +109,16 @@ func newQuery(cfg *config.Config, targets int) *query { } query := &query{ - CHResponses: make([]CHResponse, 0, targets), - cStep: cStep, - chQueryParams: cfg.ClickHouse.QueryParams, - chConnectTimeout: cfg.ClickHouse.ConnectTimeout, - chTLSConfig: cfg.ClickHouse.TLSConfig, - debugDir: cfg.Debug.Directory, - debugExtDataPerm: cfg.Debug.ExternalDataPerm, - lock: sync.RWMutex{}, + CHResponses: make([]CHResponse, 0, targets), + cStep: cStep, + chQueryParams: cfg.ClickHouse.QueryParams, + chConnectTimeout: cfg.ClickHouse.ConnectTimeout, + chProgressSendingInterval: cfg.ClickHouse.ProgressSendingInterval, + chTLSConfig: cfg.ClickHouse.TLSConfig, + debugDir: cfg.Debug.Directory, + debugExtDataPerm: cfg.Debug.ExternalDataPerm, + featureFlags: &cfg.FeatureFlags, + lock: sync.RWMutex{}, } return query @@ -188,9 +192,11 @@ func (q *query) getDataPoints(ctx context.Context, cond *conditions) error { chURL, query, clickhouse.Options{ - Timeout: chDataTimeout, - ConnectTimeout: q.chConnectTimeout, - TLSConfig: q.chTLSConfig, + Timeout: chDataTimeout, + ConnectTimeout: q.chConnectTimeout, + TLSConfig: q.chTLSConfig, + CheckRequestProgress: q.featureFlags.LogQueryProgress, + ProgressSendingInterval: q.chProgressSendingInterval, }, extData, ) diff --git a/tagger/tagger.go b/tagger/tagger.go index 83ca2a84..e80acfb7 100644 --- a/tagger/tagger.go +++ b/tagger/tagger.go @@ -79,9 +79,11 @@ func Make(cfg *config.Config) error { logger := zapwriter.Logger("tagger") chOpts := clickhouse.Options{ - TLSConfig: cfg.ClickHouse.TLSConfig, - Timeout: cfg.ClickHouse.IndexTimeout, - ConnectTimeout: cfg.ClickHouse.ConnectTimeout, + TLSConfig: cfg.ClickHouse.TLSConfig, + Timeout: cfg.ClickHouse.IndexTimeout, + ConnectTimeout: cfg.ClickHouse.ConnectTimeout, + CheckRequestProgress: cfg.FeatureFlags.LogQueryProgress, + ProgressSendingInterval: cfg.ClickHouse.ProgressSendingInterval, } begin := func(b string, fields ...zapcore.Field) {