Skip to content

Push dynamic filters across network boundaries #180

@adriangb

Description

@adriangb

Background: https://datafusion.apache.org/blog/2025/09/10/dynamic-filters/

Currently dynamic filters don't work with distributed-datafusion. In order to make them work we need upper stages to be able to update the lower stages:

┌─────────────────────────────────────────────────────────────────────────┐                   
│                                Stage 2                                  │                   
│                                                                         │                   
│ ┌─────────────────────────────────────────────────────────────────────┐ │                   
│ │                                                                     │ │                   
│ │                                                                     │ │                   
│ │                             SortExec (TopK)                         │ │                   
│ │                        ┌──────────────────────┐                     │ │                   
│ │                        │DynamicFilter [ col > │─────────────────────┼─┼─────────────┐     
│ └────────────────────────┴──────────────────────┴─────────────────────┘ │             │     
│ ┌─────────────────────────────────────────────────────────────────────┐ │             │     
│ │                                                                     │ │             │     
│ │                                                                     │ │             │     
│ │                            NetworkShuffleExec                       │ │             │     
│ │                                                                     │ │             │     
│ │                                                                     │ │             │     
│ └─────────────────────────────────────────────────────────────────────┘ │             │     
└─────────────────────────────────────────────────────────────────────────┘             │     
                                                                                      Push    
┌─────────────────────────────────────────────────────────────────────────┐          updates  
│                                Stage 1                                  │          across   
│                                                                         │          network  
│ ┌─────────────────────────────────────────────────────────────────────┐ │         boundary  
│ │                                                                     │ │             │     
│ │                                                                     │ │             │     
│ │                            RepartitionExec                          │ │             │     
│ │                                                                     │ │             │     
│ │                                                                     │ │             │     
│ └─────────────────────────────────────────────────────────────────────┘ │             │     
│ ┌─────────────────────────────────────────────────────────────────────┐ │             │     
│ │                                                                     │ │             │     
│ │                                                                     │ │             │     
│ │                            DataSourceExec                           │ │             │     
│ │                        ┌──────────────────────┐                     │ │             │     
│ │                        │DynamicFilter [ true ]│◀────────────────────┼─┼─────────────┘     
│ └────────────────────────┴──────────────────────┴─────────────────────┘ │                   
└─────────────────────────────────────────────────────────────────────────┘                              

There's several moving pieces to this:

  1. We need to add some hooks to DataFusion's DynamicFilterPhysicalExpr to get a callback when a filter is updated.
  2. We need some way to identify each DynamicFilterPhysicalExpr so that we can relate them across the wire.
  3. When we serialize out we need to detect each DynamicFilterPhysicalExpr and kick off some bookkeeping. I imagine this might be something like hook into updates via (1) and then set up some sort of broadcast channel for updates.
  4. When we deserialize we need to detect each DynamicFilterPhysicalExpr and subscribe to updates via its broadcast channel.

The main thing that is not clear in my mind is:

  1. Is serialization and deserialization the right place to discover all DynamicFilterPhysicalExprs? Do we have enough information to set up the broadcast / network communication there? If not I think we'd have to add APIs to DataFusion something like ExecutionPlan::filters() to discover all of the dynamic filters. Maybe we can re-use ExecutionPlan::gather_filters_for_pushdown? I'm not sure. I'm also not sure where we would do this discovery from (optimizer rule? some new place? NetworkShuffleExec::execute?)
  2. How are we going to do the network communication? Some sort of broadcast channel? A gRPC call that just hangs until there is an update?

I also think we should be conscious of how invasive the changes to DataFusion are. It would be great to minimize how much we add APIs and complexity just to support distributed execution.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions