Skip to content

Commit 9c6dc46

Browse files
committed
expose projection hints in parquet queryable
Signed-off-by: yeya24 <[email protected]>
1 parent 194fbcb commit 9c6dc46

File tree

7 files changed

+452
-34
lines changed

7 files changed

+452
-34
lines changed

docs/blocks-storage/querier.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,18 @@ querier:
311311
# [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.
312312
# CLI flag: -querier.parquet-queryable-shard-cache-ttl
313313
[parquet_queryable_shard_cache_ttl: <duration> | default = 24h]
314+
315+
# [Experimental] If true, parquet queryable will honor projection hints and
316+
# only materialize requested labels. Projection is only applied when all
317+
# queried blocks are parquet blocks and not querying ingesters.
318+
# CLI flag: -querier.parquet-queryable-honor-projection-hints
319+
[parquet_queryable_honor_projection_hints: <boolean> | default = false]
320+
321+
# [Experimental] Time buffer to use when checking if query overlaps with
322+
# ingester data. Projection hints are disabled if query time range overlaps
323+
# with (now - query-ingesters-within - buffer).
324+
# CLI flag: -querier.parquet-queryable-projection-hints-ingester-buffer
325+
[parquet_queryable_projection_hints_ingester_buffer: <duration> | default = 1h]
314326
```
315327
316328
### `blocks_storage_config`

docs/configuration/config-file-reference.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4828,6 +4828,18 @@ thanos_engine:
48284828
# [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.
48294829
# CLI flag: -querier.parquet-queryable-shard-cache-ttl
48304830
[parquet_queryable_shard_cache_ttl: <duration> | default = 24h]
4831+
4832+
# [Experimental] If true, parquet queryable will honor projection hints and only
4833+
# materialize requested labels. Projection is only applied when all queried
4834+
# blocks are parquet blocks and not querying ingesters.
4835+
# CLI flag: -querier.parquet-queryable-honor-projection-hints
4836+
[parquet_queryable_honor_projection_hints: <boolean> | default = false]
4837+
4838+
# [Experimental] Time buffer to use when checking if query overlaps with
4839+
# ingester data. Projection hints are disabled if query time range overlaps with
4840+
# (now - query-ingesters-within - buffer).
4841+
# CLI flag: -querier.parquet-queryable-projection-hints-ingester-buffer
4842+
[parquet_queryable_projection_hints_ingester_buffer: <duration> | default = 1h]
48314843
```
48324844
48334845
### `query_frontend_config`

integration/parquet_querier_test.go

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@ import (
77
"fmt"
88
"math/rand"
99
"path/filepath"
10+
"slices"
1011
"strconv"
1112
"testing"
1213
"time"
1314

1415
"github.com/cortexproject/promqlsmith"
1516
"github.com/prometheus/prometheus/model/labels"
17+
"github.com/prometheus/prometheus/promql"
1618
"github.com/stretchr/testify/require"
1719
"github.com/thanos-io/objstore"
1820
"github.com/thanos-io/thanos/pkg/block"
@@ -176,3 +178,190 @@ func TestParquetFuzz(t *testing.T) {
176178
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers(
177179
labels.MustNewMatcher(labels.MatchEqual, "type", "parquet"))))
178180
}
181+
182+
func TestParquetProjectionPushdown(t *testing.T) {
183+
s, err := e2e.NewScenario(networkName)
184+
require.NoError(t, err)
185+
defer s.Close()
186+
187+
consul := e2edb.NewConsulWithName("consul")
188+
memcached := e2ecache.NewMemcached()
189+
require.NoError(t, s.StartAndWaitReady(consul, memcached))
190+
191+
baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags())
192+
flags := mergeFlags(
193+
baseFlags,
194+
map[string]string{
195+
"-target": "all,parquet-converter",
196+
"-blocks-storage.tsdb.block-ranges-period": "1m,24h",
197+
"-blocks-storage.tsdb.ship-interval": "1s",
198+
"-blocks-storage.bucket-store.sync-interval": "1s",
199+
"-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl": "1s",
200+
"-blocks-storage.bucket-store.bucket-index.idle-timeout": "1s",
201+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
202+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
203+
"-querier.query-store-for-labels-enabled": "true",
204+
// compactor
205+
"-compactor.cleanup-interval": "1s",
206+
// Ingester.
207+
"-ring.store": "consul",
208+
"-consul.hostname": consul.NetworkHTTPEndpoint(),
209+
// Distributor.
210+
"-distributor.replication-factor": "1",
211+
// Store-gateway.
212+
"-store-gateway.sharding-enabled": "false",
213+
"--querier.store-gateway-addresses": "nonExistent", // Make sure we do not call Store gateways
214+
// alert manager
215+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
216+
// parquet-converter
217+
"-parquet-converter.ring.consul.hostname": consul.NetworkHTTPEndpoint(),
218+
"-parquet-converter.conversion-interval": "1s",
219+
"-parquet-converter.enabled": "true",
220+
// Querier - Enable Thanos engine with projection optimizer
221+
"-querier.thanos-engine": "true",
222+
"-querier.optimizers": "default,projection", // Enable projection optimizer
223+
"-querier.enable-parquet-queryable": "true",
224+
"-querier.parquet-queryable-honor-projection-hints": "true", // Honor projection hints
225+
// Set query-ingesters-within to 2h so queries older than 2h don't hit ingesters
226+
// Since test queries are 24-48h old, they won't query ingesters and projection will be enabled
227+
"-querier.query-ingesters-within": "2h",
228+
// Enable cache for parquet labels and chunks
229+
"-blocks-storage.bucket-store.parquet-labels-cache.backend": "inmemory,memcached",
230+
"-blocks-storage.bucket-store.parquet-labels-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
231+
"-blocks-storage.bucket-store.chunks-cache.backend": "inmemory,memcached",
232+
"-blocks-storage.bucket-store.chunks-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
233+
},
234+
)
235+
236+
// make alert manager config dir
237+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))
238+
239+
ctx := context.Background()
240+
rnd := rand.New(rand.NewSource(time.Now().Unix()))
241+
dir := filepath.Join(s.SharedDir(), "data")
242+
numSeries := 20
243+
numSamples := 100
244+
lbls := make([]labels.Labels, 0, numSeries)
245+
scrapeInterval := time.Minute
246+
statusCodes := []string{"200", "400", "404", "500", "502"}
247+
methods := []string{"GET", "POST", "PUT", "DELETE"}
248+
now := time.Now()
249+
// Make sure query time is old enough to not overlap with ingesters
250+
// With query-ingesters-within=2h, queries with maxT < now-2h won't hit ingesters
251+
// Using 24h-48h ago ensures no ingester overlap, allowing projection to be enabled
252+
start := now.Add(-time.Hour * 48)
253+
end := now.Add(-time.Hour * 24)
254+
255+
// Create series with multiple labels
256+
for i := 0; i < numSeries; i++ {
257+
lbls = append(lbls, labels.FromStrings(
258+
labels.MetricName, "http_requests_total",
259+
"job", "api-server",
260+
"instance", fmt.Sprintf("instance-%d", i%5),
261+
"status_code", statusCodes[i%len(statusCodes)],
262+
"method", methods[i%len(methods)],
263+
"path", fmt.Sprintf("/api/v1/endpoint%d", i%3),
264+
"cluster", "test-cluster",
265+
))
266+
}
267+
268+
id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), 10)
269+
require.NoError(t, err)
270+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
271+
require.NoError(t, s.StartAndWaitReady(minio))
272+
273+
cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
274+
require.NoError(t, s.StartAndWaitReady(cortex))
275+
276+
storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"])
277+
require.NoError(t, err)
278+
bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil)
279+
280+
err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc)
281+
require.NoError(t, err)
282+
283+
// Wait until we convert the blocks to parquet
284+
cortex_testutil.Poll(t, 30*time.Second, true, func() interface{} {
285+
found := false
286+
foundBucketIndex := false
287+
288+
err := bkt.Iter(context.Background(), "", func(name string) error {
289+
if name == fmt.Sprintf("parquet-markers/%v-parquet-converter-mark.json", id.String()) {
290+
found = true
291+
}
292+
if name == "bucket-index.json.gz" {
293+
foundBucketIndex = true
294+
}
295+
return nil
296+
}, objstore.WithRecursiveIter())
297+
require.NoError(t, err)
298+
return found && foundBucketIndex
299+
})
300+
301+
c, err := e2ecortex.NewClient("", cortex.HTTPEndpoint(), "", "", "user-1")
302+
require.NoError(t, err)
303+
304+
// Test queries that should use projection hints
305+
testCases := []struct {
306+
name string
307+
query string
308+
expectedLabels []string // Labels that should be present in result (besides __name__)
309+
}{
310+
{
311+
name: "simple_sum_by_job",
312+
query: `sum by (job) (http_requests_total)`,
313+
expectedLabels: []string{"job"},
314+
},
315+
{
316+
name: "rate_with_aggregation",
317+
query: `sum by (method) (rate(http_requests_total[5m]))`,
318+
expectedLabels: []string{"method"},
319+
},
320+
{
321+
name: "multiple_grouping_labels",
322+
query: `sum by (job, status_code) (http_requests_total)`,
323+
expectedLabels: []string{"job", "status_code"},
324+
},
325+
}
326+
327+
for _, tc := range testCases {
328+
t.Run(tc.name, func(t *testing.T) {
329+
t.Logf("Testing: %s", tc.query)
330+
331+
// Execute instant query
332+
result, err := c.Query(tc.query, end)
333+
require.NoError(t, err)
334+
require.NotNil(t, result)
335+
336+
// Verify we got results
337+
matrix := result.(promql.Matrix)
338+
require.NotEmpty(t, matrix, "query should return results")
339+
340+
t.Logf("Query returned %d series", len(matrix))
341+
342+
// Verify projection worked: series should only have the expected labels
343+
for i, series := range matrix {
344+
actualLabels := make(map[string]struct{})
345+
for _, label := range series.Metric {
346+
actualLabels[label.Name] = struct{}{}
347+
}
348+
349+
// Check that no unexpected labels are present
350+
for lbl := range actualLabels {
351+
if !slices.Contains(tc.expectedLabels, lbl) {
352+
require.Fail(t, "series should not have %s label", lbl)
353+
}
354+
}
355+
// Check that all expected labels are present
356+
for _, expectedLabel := range tc.expectedLabels {
357+
require.True(t, actualLabels[expectedLabel],
358+
"series should have %s label", expectedLabel)
359+
}
360+
}
361+
})
362+
}
363+
364+
// Verify that parquet blocks were queried
365+
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers(
366+
labels.MustNewMatcher(labels.MatchEqual, "type", "parquet"))))
367+
}

pkg/querier/parquet_queryable.go

Lines changed: 46 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,13 @@ func newParquetQueryableFallbackMetrics(reg prometheus.Registerer) *parquetQuery
9696
type parquetQueryableWithFallback struct {
9797
services.Service
9898

99-
fallbackDisabled bool
100-
queryStoreAfter time.Duration
101-
parquetQueryable storage.Queryable
102-
cache cacheInterface[parquet_storage.ParquetShard]
103-
blockStorageQueryable *BlocksStoreQueryable
99+
fallbackDisabled bool
100+
queryStoreAfter time.Duration
101+
queryIngestersWithin time.Duration
102+
projectionHintsIngesterBuffer time.Duration
103+
parquetQueryable storage.Queryable
104+
cache cacheInterface[parquet_storage.ParquetShard]
105+
blockStorageQueryable *BlocksStoreQueryable
104106

105107
finder BlocksFinder
106108

@@ -148,6 +150,7 @@ func NewParquetQueryable(
148150
}
149151

150152
parquetQueryableOpts := []queryable.QueryableOpts{
153+
queryable.WithHonorProjectionHints(config.ParquetQueryableHonorProjectionHints),
151154
queryable.WithRowCountLimitFunc(func(ctx context.Context) int64 {
152155
// Ignore error as this shouldn't happen.
153156
// If failed to resolve tenant we will just use the default limit value.
@@ -253,18 +256,20 @@ func NewParquetQueryable(
253256
}, constraintCacheFunc, cDecoder, parquetQueryableOpts...)
254257

255258
p := &parquetQueryableWithFallback{
256-
subservices: manager,
257-
blockStorageQueryable: blockStorageQueryable,
258-
parquetQueryable: parquetQueryable,
259-
cache: cache,
260-
queryStoreAfter: config.QueryStoreAfter,
261-
subservicesWatcher: services.NewFailureWatcher(),
262-
finder: blockStorageQueryable.finder,
263-
metrics: newParquetQueryableFallbackMetrics(reg),
264-
limits: limits,
265-
logger: logger,
266-
defaultBlockStoreType: blockStoreType(config.ParquetQueryableDefaultBlockStore),
267-
fallbackDisabled: config.ParquetQueryableFallbackDisabled,
259+
subservices: manager,
260+
blockStorageQueryable: blockStorageQueryable,
261+
parquetQueryable: parquetQueryable,
262+
cache: cache,
263+
queryStoreAfter: config.QueryStoreAfter,
264+
queryIngestersWithin: config.QueryIngestersWithin,
265+
projectionHintsIngesterBuffer: config.ParquetQueryableProjectionHintsIngesterBuffer,
266+
subservicesWatcher: services.NewFailureWatcher(),
267+
finder: blockStorageQueryable.finder,
268+
metrics: newParquetQueryableFallbackMetrics(reg),
269+
limits: limits,
270+
logger: logger,
271+
defaultBlockStoreType: blockStoreType(config.ParquetQueryableDefaultBlockStore),
272+
fallbackDisabled: config.ParquetQueryableFallbackDisabled,
268273
}
269274

270275
p.Service = services.NewBasicService(p.starting, p.running, p.stopping)
@@ -311,17 +316,19 @@ func (p *parquetQueryableWithFallback) Querier(mint, maxt int64) (storage.Querie
311316
}
312317

313318
return &parquetQuerierWithFallback{
314-
minT: mint,
315-
maxT: maxt,
316-
parquetQuerier: pq,
317-
queryStoreAfter: p.queryStoreAfter,
318-
blocksStoreQuerier: bsq,
319-
finder: p.finder,
320-
metrics: p.metrics,
321-
limits: p.limits,
322-
logger: p.logger,
323-
defaultBlockStoreType: p.defaultBlockStoreType,
324-
fallbackDisabled: p.fallbackDisabled,
319+
minT: mint,
320+
maxT: maxt,
321+
parquetQuerier: pq,
322+
queryStoreAfter: p.queryStoreAfter,
323+
queryIngestersWithin: p.queryIngestersWithin,
324+
projectionHintsIngesterBuffer: p.projectionHintsIngesterBuffer,
325+
blocksStoreQuerier: bsq,
326+
finder: p.finder,
327+
metrics: p.metrics,
328+
limits: p.limits,
329+
logger: p.logger,
330+
defaultBlockStoreType: p.defaultBlockStoreType,
331+
fallbackDisabled: p.fallbackDisabled,
325332
}, nil
326333
}
327334

@@ -335,7 +342,9 @@ type parquetQuerierWithFallback struct {
335342

336343
// If set, the querier manipulates the max time to not be greater than
337344
// "now - queryStoreAfter" so that most recent blocks are not queried.
338-
queryStoreAfter time.Duration
345+
queryStoreAfter time.Duration
346+
queryIngestersWithin time.Duration
347+
projectionHintsIngesterBuffer time.Duration
339348

340349
// metrics
341350
metrics *parquetQueryableFallbackMetrics
@@ -500,6 +509,14 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool
500509
sortSeries = true
501510
}
502511

512+
queryIngesters := q.queryIngestersWithin == 0 || maxt >= util.TimeToMillis(time.Now().Add(-q.queryIngestersWithin).Add(-q.projectionHintsIngesterBuffer))
513+
disableProjection := len(remaining) > 0 || queryIngesters
514+
// Reset projection hints if there are mixed blocks (both parquet and non-parquet) or the query needs to merge results between ingester and parquet blocks
515+
if disableProjection {
516+
hints.ProjectionLabels = nil
517+
hints.ProjectionInclude = false
518+
}
519+
503520
promises := make([]chan storage.SeriesSet, 0, 2)
504521

505522
if len(parquet) > 0 {

0 commit comments

Comments
 (0)