Skip to content

Conversation

@sandeshkr419
Copy link
Member

@sandeshkr419 sandeshkr419 commented Nov 14, 2025

Description

Enables partial mode for average aggregations.

Basically introduces a physical plan optimizer which checks for average aggregation and enables it as Partial mode.

The goal is to achieve something similar with distinct count case, where we can retrieve hll sketch as an byte array in a follow-up.

Note: This might fail with current sql as sql is sending avg as sum & count in a single query. Although should not fail as it will still use sum/count individual aggregation execs in data fusion. For my local testing, I am sending avg as itself and not sending 2 different sum & count requests as we are doing it from sql presently.

How am I achieving it:

For aggregation exec:

  • Identifying average aggregation exec methods in physical plan
  • Replacing them with partial mode equivalent execs
    For projection exec:
  • Identifying whether alias name needs to come in for final column names
  • Creating a new projection exec to avoid schema conflict, this is required if we are changing modes of aggregation execs

Testing:

Check out the query testing & physical plans as they were executed (printed from logs):

avg:

% curl -s -X POST "http://localhost:9200/_plugins/_ppl" -H 'Content-Type: application/json' --data-raw '{
    "query": "source=index-7 | stats avg(message) as dis"
}'

{
  "schema": [
    {
      "name": "dis",
      "type": "double"
    }
  ],
  "datarows": [
    [
      5.7
    ]
  ],
  "total": 1,
  "size": 1
}%

Physical Plan:
ProjectionExec: expr=[avg(index-7.message)[count]@0 as dis[count], avg(index-7.message)[sum]@1 as dis[sum]]
  GlobalLimitExec: skip=0, fetch=10000
    AggregateExec: mode=Partial, gby=[], aggr=[avg(index-7.message)]
      ProjectionExec: expr=[message@0 as message, ___row_id@1 as ___row_id, row_base@2 as row_base]
        DataSourceExec: file_groups={1 group: [[Users/kusandes/workplace/opensearch/OpenSearch/build/testclusters/runTask-0/data/nodes/0/indices/VySNgH-IR96hd0pjbiQxwQ/0/_parquet_file_generation_0.parquet]]}, projection=[message, ___row_id, row_base], file_type=parquet


[2025-11-14T15:55:45,006][INFO ][o.o.d.DatafusionEngine   ] [runTask-0] dis[sum]: [114.0]
[2025-11-14T15:55:45,007][INFO ][o.o.d.DatafusionEngine   ] [runTask-0] dis[count]: [20]
[2025-11-14T15:55:45,008][INFO ][o.o.a.s.SearchPhaseController] [runTask-0] Final reduced aggregations: {dis={"dis":{"value":5.7}}}

min:

% curl -s -X POST "http://localhost:9200/_plugins/_ppl" -H 'Content-Type: application/json' --data-raw '{
    "query": "source=index-7 | stats min(message) as dis"
}'

{
  "schema": [
    {
      "name": "dis",
      "type": "bigint"
    }
  ],
  "datarows": [
    [
      1
    ]
  ],
  "total": 1,
  "size": 1
}%

ProjectionExec: expr=[min(index-7.message)@0 as dis]
  GlobalLimitExec: skip=0, fetch=10000
    AggregateExec: mode=Single, gby=[], aggr=[min(index-7.message)]
      ProjectionExec: expr=[message@0 as message, ___row_id@1 as ___row_id, row_base@2 as row_base]
        DataSourceExec: file_groups={1 group: [[Users/kusandes/workplace/opensearch/OpenSearch/build/testclusters/runTask-0/data/nodes/0/indices/VySNgH-IR96hd0pjbiQxwQ/0/_parquet_file_generation_0.parquet]]}, projection=[message, ___row_id, row_base], file_type=parquet


[2025-11-14T15:56:25,082][INFO ][o.o.d.DatafusionEngine   ] [runTask-0] dis: [1]
[2025-11-14T15:56:25,083][INFO ][o.o.a.s.SearchPhaseController] [runTask-0] Final reduced aggregations: {dis={"dis":{"value":1.0}}}

max:


% curl -s -X POST "http://localhost:9200/_plugins/_ppl" -H 'Content-Type: application/json' --data-raw '{
    "query": "source=index-7 | stats max(message) as dis"
}'

{
  "schema": [
    {
      "name": "dis",
      "type": "bigint"
    }
  ],
  "datarows": [
    [
      12
    ]
  ],
  "total": 1,
  "size": 1
}%


Physical Plan:
ProjectionExec: expr=[max(index-7.message)@0 as dis]
  GlobalLimitExec: skip=0, fetch=10000
    AggregateExec: mode=Single, gby=[], aggr=[max(index-7.message)]
      ProjectionExec: expr=[message@0 as message, ___row_id@1 as ___row_id, row_base@2 as row_base]
        DataSourceExec: file_groups={1 group: [[Users/kusandes/workplace/opensearch/OpenSearch/build/testclusters/runTask-0/data/nodes/0/indices/VySNgH-IR96hd0pjbiQxwQ/0/_parquet_file_generation_0.parquet]]}, projection=[message, ___row_id, row_base], file_type=parquet

Rust: Overall query setup time in milliseconds: 6
[2025-11-14T15:40:50,562][INFO ][o.o.d.DatafusionEngine   ] [runTask-0] Final Results:
[2025-11-14T15:40:50,562][INFO ][o.o.d.DatafusionEngine   ] [runTask-0] dis: [12]
[2025-11-14T15:40:50,564][INFO ][o.o.a.s.SearchPhaseController] [runTask-0] Final reduced aggregations: {dis={"dis":{"value":12.0}}}

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Sandesh Kumar <[email protected]>
@github-actions
Copy link
Contributor

❌ Gradle check result for b371237: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants