@@ -13,6 +13,7 @@ import (
1313
1414 "github.com/cortexproject/promqlsmith"
1515 "github.com/prometheus/prometheus/model/labels"
16+ "github.com/prometheus/prometheus/promql"
1617 "github.com/stretchr/testify/require"
1718 "github.com/thanos-io/objstore"
1819 "github.com/thanos-io/thanos/pkg/block"
@@ -176,3 +177,190 @@ func TestParquetFuzz(t *testing.T) {
176177 require .NoError (t , cortex .WaitSumMetricsWithOptions (e2e .Greater (0 ), []string {"cortex_parquet_queryable_blocks_queried_total" }, e2e .WithLabelMatchers (
177178 labels .MustNewMatcher (labels .MatchEqual , "type" , "parquet" ))))
178179}
180+
181+ func TestParquetProjectionPushdown (t * testing.T ) {
182+ s , err := e2e .NewScenario (networkName )
183+ require .NoError (t , err )
184+ defer s .Close ()
185+
186+ consul := e2edb .NewConsulWithName ("consul" )
187+ memcached := e2ecache .NewMemcached ()
188+ require .NoError (t , s .StartAndWaitReady (consul , memcached ))
189+
190+ baseFlags := mergeFlags (AlertmanagerLocalFlags (), BlocksStorageFlags ())
191+ flags := mergeFlags (
192+ baseFlags ,
193+ map [string ]string {
194+ "-target" : "all,parquet-converter" ,
195+ "-blocks-storage.tsdb.block-ranges-period" : "1m,24h" ,
196+ "-blocks-storage.tsdb.ship-interval" : "1s" ,
197+ "-blocks-storage.bucket-store.sync-interval" : "1s" ,
198+ "-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl" : "1s" ,
199+ "-blocks-storage.bucket-store.bucket-index.idle-timeout" : "1s" ,
200+ "-blocks-storage.bucket-store.bucket-index.enabled" : "true" ,
201+ "-blocks-storage.bucket-store.index-cache.backend" : tsdb .IndexCacheBackendInMemory ,
202+ "-querier.query-store-for-labels-enabled" : "true" ,
203+ // compactor
204+ "-compactor.cleanup-interval" : "1s" ,
205+ // Ingester.
206+ "-ring.store" : "consul" ,
207+ "-consul.hostname" : consul .NetworkHTTPEndpoint (),
208+ // Distributor.
209+ "-distributor.replication-factor" : "1" ,
210+ // Store-gateway.
211+ "-store-gateway.sharding-enabled" : "false" ,
212+ "--querier.store-gateway-addresses" : "nonExistent" , // Make sure we do not call Store gateways
213+ // alert manager
214+ "-alertmanager.web.external-url" : "http://localhost/alertmanager" ,
215+ // parquet-converter
216+ "-parquet-converter.ring.consul.hostname" : consul .NetworkHTTPEndpoint (),
217+ "-parquet-converter.conversion-interval" : "1s" ,
218+ "-parquet-converter.enabled" : "true" ,
219+ // Querier - Enable Thanos engine with projection optimizer
220+ "-querier.thanos-engine" : "true" ,
221+ "-querier.optimizers" : "default,projection" , // Enable projection optimizer
222+ "-querier.enable-parquet-queryable" : "true" ,
223+ "-querier.parquet-queryable-honor-projection-hints" : "true" , // Honor projection hints
224+ // Set query-ingesters-within to 2h so queries older than 2h don't hit ingesters
225+ // Since test queries are 24-48h old, they won't query ingesters and projection will be enabled
226+ "-querier.query-ingesters-within" : "2h"
227+ // Enable cache for parquet labels and chunks
228+ "-blocks-storage.bucket-store.parquet-labels-cache.backend" : "inmemory ,memcached ",
229+ "-blocks-storage.bucket-store.parquet-labels-cache.memcached.addresses" : "dns+" + memcached .NetworkEndpoint (e2ecache .MemcachedPort ),
230+ "-blocks-storage.bucket-store.chunks-cache.backend" : "inmemory,memcached" ,
231+ "-blocks-storage.bucket-store.chunks-cache.memcached.addresses" : "dns+" + memcached .NetworkEndpoint (e2ecache .MemcachedPort ),
232+ },
233+ )
234+
235+ // make alert manager config dir
236+ require .NoError (t , writeFileToSharedDir (s , "alertmanager_configs" , []byte {}))
237+
238+ ctx := context .Background ()
239+ rnd := rand .New (rand .NewSource (time .Now ().Unix ()))
240+ dir := filepath .Join (s .SharedDir (), "data" )
241+ numSeries := 20
242+ numSamples := 100
243+ lbls := make ([]labels.Labels , 0 , numSeries )
244+ scrapeInterval := time .Minute
245+ statusCodes := []string {"200" , "400" , "404" , "500" , "502" }
246+ methods := []string {"GET" , "POST" , "PUT" , "DELETE" }
247+ now := time .Now ()
248+ // Make sure query time is old enough to not overlap with ingesters
249+ // With query-ingesters-within=2h, queries with maxT < now-2h won't hit ingesters
250+ // Using 24h-48h ago ensures no ingester overlap, allowing projection to be enabled
251+ start := now .Add (- time .Hour * 48 )
252+ end := now .Add (- time .Hour * 24 )
253+
254+ // Create series with multiple labels
255+ for i := 0 ; i < numSeries ; i ++ {
256+ lbls = append (lbls , labels .FromStrings (
257+ labels .MetricName , "http_requests_total" ,
258+ "job" , "api-server" ,
259+ "instance" , fmt .Sprintf ("instance-%d" , i % 5 ),
260+ "status_code" , statusCodes [i % len (statusCodes )],
261+ "method" , methods [i % len (methods )],
262+ "path" , fmt .Sprintf ("/api/v1/endpoint%d" , i % 3 ),
263+ "cluster" , "test-cluster" ,
264+ ))
265+ }
266+
267+ id , err := e2e .CreateBlock (ctx , rnd , dir , lbls , numSamples , start .UnixMilli (), end .UnixMilli (), scrapeInterval .Milliseconds (), 10 )
268+ require .NoError (t , err )
269+ minio := e2edb .NewMinio (9000 , flags ["-blocks-storage.s3.bucket-name" ])
270+ require .NoError (t , s .StartAndWaitReady (minio ))
271+
272+ cortex := e2ecortex .NewSingleBinary ("cortex" , flags , "" )
273+ require .NoError (t , s .StartAndWaitReady (cortex ))
274+
275+ storage , err := e2ecortex .NewS3ClientForMinio (minio , flags ["-blocks-storage.s3.bucket-name" ])
276+ require .NoError (t , err )
277+ bkt := bucket .NewUserBucketClient ("user-1" , storage .GetBucket (), nil )
278+
279+ err = block .Upload (ctx , log .Logger , bkt , filepath .Join (dir , id .String ()), metadata .NoneFunc )
280+ require .NoError (t , err )
281+
282+ // Wait until we convert the blocks to parquet
283+ cortex_testutil .Poll (t , 30 * time .Second , true , func () interface {} {
284+ found := false
285+ foundBucketIndex := false
286+
287+ err := bkt .Iter (context .Background (), "" , func (name string ) error {
288+ if name == fmt .Sprintf ("parquet-markers/%v-parquet-converter-mark.json" , id .String ()) {
289+ found = true
290+ }
291+ if name == "bucket-index.json.gz" {
292+ foundBucketIndex = true
293+ }
294+ return nil
295+ }, objstore .WithRecursiveIter ())
296+ require .NoError (t , err )
297+ return found && foundBucketIndex
298+ })
299+
300+ c , err := e2ecortex .NewClient ("" , cortex .HTTPEndpoint (), "" , "" , "user-1" )
301+ require .NoError (t , err )
302+
303+ // Test queries that should use projection hints
304+ testCases := []struct {
305+ name string
306+ query string
307+ expectedLabels []string // Labels that should be present in result (besides __name__)
308+ }{
309+ {
310+ name : "simple_sum_by_job" ,
311+ query : `sum by (job) (http_requests_total)` ,
312+ expectedLabels : []string {"job" },
313+ },
314+ {
315+ name : "rate_with_aggregation" ,
316+ query : `sum by (method) (rate(http_requests_total[5m]))` ,
317+ expectedLabels : []string {"method" },
318+ },
319+ {
320+ name : "multiple_grouping_labels" ,
321+ query : `sum by (job, status_code) (http_requests_total)` ,
322+ expectedLabels : []string {"job" , "status_code" },
323+ },
324+ }
325+
326+ for _ , tc := range testCases {
327+ t .Run (tc .name , func (t * testing.T ) {
328+ t .Logf ("Testing: %s" , tc .query )
329+
330+ // Execute instant query
331+ result , err := c .Query (tc .query , end )
332+ require .NoError (t , err )
333+ require .NotNil (t , result )
334+
335+ // Verify we got results
336+ matrix := result .(promql.Matrix )
337+ require .NotEmpty (t , matrix , "query should return results" )
338+
339+ t .Logf ("Query returned %d series" , len (matrix ))
340+
341+ // Verify projection worked: series should only have the expected labels
342+ for i , series := range matrix {
343+ actualLabels := make (map [string ]struct {})
344+ for _ , label := range series .Metric {
345+ actualLabels [label .Name ] = struct {}{}
346+ }
347+
348+ // Check that no unexpected labels are present
349+ for lbl := range actualLabels {
350+ if ! slices .Contains (tc .expectedLabels , lbl ) {
351+ require .Fail (t , "series should not have %s label" , lbl )
352+ }
353+ }
354+ // Check that all expected labels are present
355+ for _ , expectedLabel := range tc .expectedLabels {
356+ require .True (t , actualLabels [expectedLabel ],
357+ "series should have %s label" , expectedLabel )
358+ }
359+ }
360+ })
361+ }
362+
363+ // Verify that parquet blocks were queried
364+ require .NoError (t , cortex .WaitSumMetricsWithOptions (e2e .Greater (0 ), []string {"cortex_parquet_queryable_blocks_queried_total" }, e2e .WithLabelMatchers (
365+ labels .MustNewMatcher (labels .MatchEqual , "type" , "parquet" ))))
366+ }
0 commit comments