Skip to content

Commit 78a6521

Browse files
authored
Add ability to set statistics in the SQLExecutor (#134)
The current `SQLExecutor` does not allow us to provide statistics, meaning that the result sets using `datafusion-federation` will always have `Rows=Absent, ...`. This can be a problem when joining large amounts of data in Datafusion, as a suboptimal plan is often selected. This PR adds the `statistics` function to `SQLExecutor`, so it can be implemented to provide more information to the planner. It also adds it to `VirtualExecutionPlan` and `SchemaCastScanExec`, so it can reach Datafusion. By default, it simply returns `Statistics::new_unknown`, so existing implementations of `SQLExecutor` can continue to work as normal.
1 parent e0496b3 commit 78a6521

File tree

3 files changed

+25
-2
lines changed

3 files changed

+25
-2
lines changed

datafusion-federation/src/schema_cast/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use async_stream::stream;
22
use datafusion::arrow::datatypes::SchemaRef;
3+
use datafusion::common::Statistics;
34
use datafusion::error::{DataFusionError, Result};
45
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
56
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
@@ -113,4 +114,8 @@ impl ExecutionPlan for SchemaCastScanExec {
113114
},
114115
)))
115116
}
117+
118+
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
119+
self.input.partition_statistics(partition)
120+
}
116121
}

datafusion-federation/src/sql/executor.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use async_trait::async_trait;
22
use core::fmt;
33
use datafusion::{
44
arrow::datatypes::SchemaRef,
5+
common::Statistics,
56
error::Result,
67
logical_expr::LogicalPlan,
78
physical_plan::SendableRecordBatchStream,
@@ -43,6 +44,13 @@ pub trait SQLExecutor: Sync + Send {
4344
/// Execute a SQL query
4445
fn execute(&self, query: &str, schema: SchemaRef) -> Result<SendableRecordBatchStream>;
4546

47+
/// Returns statistics for this `SQLExecutor` node. If statistics are not available, it should
48+
/// return [`Statistics::new_unknown`] (the default), not an error. See the `ExecutionPlan`
49+
/// trait.
50+
async fn statistics(&self, plan: &LogicalPlan) -> Result<Statistics> {
51+
Ok(Statistics::new_unknown(plan.schema().as_arrow()))
52+
}
53+
4654
/// Returns the tables provided by the remote
4755
async fn table_names(&self) -> Result<Vec<String>>;
4856

datafusion-federation/src/sql/mod.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use async_trait::async_trait;
1212
use datafusion::{
1313
arrow::datatypes::{Schema, SchemaRef},
1414
common::tree_node::{Transformed, TreeNode},
15+
common::Statistics,
1516
error::{DataFusionError, Result},
1617
execution::{context::SessionState, TaskContext},
1718
logical_expr::{Extension, LogicalPlan},
@@ -140,9 +141,12 @@ impl FederationPlanner for SQLFederationPlanner {
140141
_session_state: &SessionState,
141142
) -> Result<Arc<dyn ExecutionPlan>> {
142143
let schema = Arc::new(node.plan().schema().as_arrow().clone());
144+
let plan = node.plan().clone();
145+
let statistics = self.executor.statistics(&plan).await?;
143146
let input = Arc::new(VirtualExecutionPlan::new(
144-
node.plan().clone(),
147+
plan,
145148
Arc::clone(&self.executor),
149+
statistics,
146150
));
147151
let schema_cast_exec = schema_cast::SchemaCastScanExec::new(input, schema);
148152
Ok(Arc::new(schema_cast_exec))
@@ -154,10 +158,11 @@ struct VirtualExecutionPlan {
154158
plan: LogicalPlan,
155159
executor: Arc<dyn SQLExecutor>,
156160
props: PlanProperties,
161+
statistics: Statistics,
157162
}
158163

159164
impl VirtualExecutionPlan {
160-
pub fn new(plan: LogicalPlan, executor: Arc<dyn SQLExecutor>) -> Self {
165+
pub fn new(plan: LogicalPlan, executor: Arc<dyn SQLExecutor>, statistics: Statistics) -> Self {
161166
let schema: Schema = plan.schema().as_ref().into();
162167
let props = PlanProperties::new(
163168
EquivalenceProperties::new(Arc::new(schema)),
@@ -169,6 +174,7 @@ impl VirtualExecutionPlan {
169174
plan,
170175
executor,
171176
props,
177+
statistics,
172178
}
173179
}
174180

@@ -343,6 +349,10 @@ impl ExecutionPlan for VirtualExecutionPlan {
343349
fn properties(&self) -> &PlanProperties {
344350
&self.props
345351
}
352+
353+
fn partition_statistics(&self, _partition: Option<usize>) -> Result<Statistics> {
354+
Ok(self.statistics.clone())
355+
}
346356
}
347357

348358
#[cfg(test)]

0 commit comments

Comments
 (0)