-
Notifications
You must be signed in to change notification settings - Fork 13
Open
Description
Background: https://datafusion.apache.org/blog/2025/09/10/dynamic-filters/
Currently dynamic filters don't work with distributed-datafusion. In order to make them work we need upper stages to be able to update the lower stages:
┌─────────────────────────────────────────────────────────────────────────┐
│ Stage 2 │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ │ │
│ │ SortExec (TopK) │ │
│ │ ┌──────────────────────┐ │ │
│ │ │DynamicFilter [ col > │─────────────────────┼─┼─────────────┐
│ └────────────────────────┴──────────────────────┴─────────────────────┘ │ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │ │
│ │ │ │ │
│ │ │ │ │
│ │ NetworkShuffleExec │ │ │
│ │ │ │ │
│ │ │ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │ │
└─────────────────────────────────────────────────────────────────────────┘ │
Push
┌─────────────────────────────────────────────────────────────────────────┐ updates
│ Stage 1 │ across
│ │ network
│ ┌─────────────────────────────────────────────────────────────────────┐ │ boundary
│ │ │ │ │
│ │ │ │ │
│ │ RepartitionExec │ │ │
│ │ │ │ │
│ │ │ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │ │
│ │ │ │ │
│ │ │ │ │
│ │ DataSourceExec │ │ │
│ │ ┌──────────────────────┐ │ │ │
│ │ │DynamicFilter [ true ]│◀────────────────────┼─┼─────────────┘
│ └────────────────────────┴──────────────────────┴─────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
There's several moving pieces to this:
- We need to add some hooks to DataFusion's
DynamicFilterPhysicalExprto get a callback when a filter is updated. - We need some way to identify each
DynamicFilterPhysicalExprso that we can relate them across the wire. - When we serialize out we need to detect each
DynamicFilterPhysicalExprand kick off some bookkeeping. I imagine this might be something like hook into updates via (1) and then set up some sort of broadcast channel for updates. - When we deserialize we need to detect each
DynamicFilterPhysicalExprand subscribe to updates via its broadcast channel.
The main thing that is not clear in my mind is:
- Is serialization and deserialization the right place to discover all
DynamicFilterPhysicalExprs? Do we have enough information to set up the broadcast / network communication there? If not I think we'd have to add APIs to DataFusion something likeExecutionPlan::filters()to discover all of the dynamic filters. Maybe we can re-useExecutionPlan::gather_filters_for_pushdown? I'm not sure. I'm also not sure where we would do this discovery from (optimizer rule? some new place?NetworkShuffleExec::execute?) - How are we going to do the network communication? Some sort of broadcast channel? A gRPC call that just hangs until there is an update?
I also think we should be conscious of how invasive the changes to DataFusion are. It would be great to minimize how much we add APIs and complexity just to support distributed execution.
Metadata
Metadata
Assignees
Labels
No labels