Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions datafusion-flight-sql-server/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#[derive(Default)]
pub struct FlightSqlServiceConfig {
/// When true, includes table names in field metadata under the "table_name" key.
/// This allows clients to identify the source table or alias for each column in query results.
pub schema_with_metadata: bool,
}

impl FlightSqlServiceConfig {
pub fn new() -> Self {
Self {
..Default::default()
}
}
}
1 change: 1 addition & 0 deletions datafusion-flight-sql-server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod config;
pub mod service;
pub mod session;
pub mod state;
47 changes: 40 additions & 7 deletions datafusion-flight-sql-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use datafusion::{
use datafusion_substrait::{
logical_plan::consumer::from_substrait_plan, serializer::deserialize_bytes,
};

use futures::{Stream, StreamExt, TryStreamExt};
use log::info;
use once_cell::sync::Lazy;
Expand All @@ -56,6 +57,7 @@ use prost::Message;
use tonic::transport::Server;
use tonic::{Request, Response, Status, Streaming};

use super::config::FlightSqlServiceConfig;
use super::session::{SessionStateProvider, StaticSessionStateProvider};
use super::state::{CommandTicket, QueryHandle};

Expand All @@ -65,6 +67,7 @@ type Result<T, E = Status> = std::result::Result<T, E>;
pub struct FlightSqlService {
provider: Box<dyn SessionStateProvider>,
sql_options: Option<SQLOptions>,
config: FlightSqlServiceConfig,
}

impl FlightSqlService {
Expand All @@ -78,9 +81,15 @@ impl FlightSqlService {
Self {
provider,
sql_options: None,
config: FlightSqlServiceConfig::default(),
}
}

/// Replaces the FlightSqlServiceConfig with the provided config.
pub fn with_config(self, config: FlightSqlServiceConfig) -> Self {
Self { config, ..self }
}

/// Replaces the sql_options with the provided options.
/// These options are used to verify all SQL queries.
/// When None the default [`SQLOptions`] are used.
Expand Down Expand Up @@ -303,7 +312,7 @@ impl ArrowFlightSqlService for FlightSqlService {
.await
.map_err(df_error_to_status)?;

let dataset_schema = get_schema_for_plan(&plan);
let dataset_schema = get_schema_for_plan(&plan, self.config.schema_with_metadata);

// Form the response ticket (that the client will pass back to DoGet)
let ticket = CommandTicket::new(sql::Command::CommandStatementQuery(query))
Expand Down Expand Up @@ -342,7 +351,7 @@ impl ArrowFlightSqlService for FlightSqlService {

let flight_descriptor = request.into_inner();

let dataset_schema = get_schema_for_plan(&plan);
let dataset_schema = get_schema_for_plan(&plan, self.config.schema_with_metadata);

// Form the response ticket (that the client will pass back to DoGet)
let ticket = CommandTicket::new(sql::Command::CommandStatementSubstraitPlan(query))
Expand Down Expand Up @@ -381,7 +390,7 @@ impl ArrowFlightSqlService for FlightSqlService {
.await
.map_err(df_error_to_status)?;

let dataset_schema = get_schema_for_plan(&plan);
let dataset_schema = get_schema_for_plan(&plan, self.config.schema_with_metadata);

// Form the response ticket (that the client will pass back to DoGet)
let ticket = CommandTicket::new(sql::Command::CommandPreparedStatementQuery(cmd))
Expand Down Expand Up @@ -881,7 +890,7 @@ impl ArrowFlightSqlService for FlightSqlService {
.await
.map_err(df_error_to_status)?;

let dataset_schema = get_schema_for_plan(&plan);
let dataset_schema = get_schema_for_plan(&plan, self.config.schema_with_metadata);
let parameter_schema = parameter_schema_for_plan(&plan).map_err(|e| e.as_ref().clone())?;

let dataset_schema =
Expand Down Expand Up @@ -1017,9 +1026,33 @@ fn encode_schema(schema: &Schema) -> std::result::Result<Bytes, ArrowError> {
}

/// Return the schema for the specified logical plan
fn get_schema_for_plan(logical_plan: &LogicalPlan) -> SchemaRef {
// gather real schema, but only
let schema = Schema::from(logical_plan.schema().as_ref()).into();
fn get_schema_for_plan(logical_plan: &LogicalPlan, with_metadata: bool) -> SchemaRef {
let schema: SchemaRef = if with_metadata {
// Get the DFSchema which contains table qualifiers
let df_schema = logical_plan.schema();

// Convert to Arrow Schema and add table name metadata to fields
let fields_with_metadata: Vec<_> = df_schema
.iter()
.map(|(qualifier, field)| {
// If there's a table qualifier, add it as metadata
if let Some(table_ref) = qualifier {
let mut metadata = field.metadata().clone();
metadata.insert("table_name".to_string(), table_ref.to_string());
field.as_ref().clone().with_metadata(metadata)
} else {
field.as_ref().clone()
}
})
.collect();

Arc::new(Schema::new_with_metadata(
fields_with_metadata,
df_schema.as_ref().metadata().clone(),
))
} else {
Arc::new(Schema::from(logical_plan.schema().as_ref()))
};

// Use an empty FlightDataEncoder to determine the schema of the encoded flight data.
// This is necessary as the schema can change based on dictionary hydration behavior.
Expand Down
Loading