diff --git a/connectorx/src/destinations/arrow/arrow_assoc.rs b/connectorx/src/destinations/arrow/arrow_assoc.rs index f9b40c370..713c95b04 100644 --- a/connectorx/src/destinations/arrow/arrow_assoc.rs +++ b/connectorx/src/destinations/arrow/arrow_assoc.rs @@ -288,9 +288,7 @@ fn naive_date_to_arrow(nd: NaiveDate) -> i32 { } fn naive_datetime_to_arrow(nd: NaiveDateTime) -> i64 { - nd.and_utc() - .timestamp_nanos_opt() - .unwrap_or_else(|| panic!("out of range DateTime")) + nd.and_utc().timestamp_micros() } impl ArrowAssoc for Option { @@ -328,10 +326,10 @@ impl ArrowAssoc for NaiveDate { } impl ArrowAssoc for Option { - type Builder = TimestampNanosecondBuilder; + type Builder = TimestampMicrosecondBuilder; fn builder(nrows: usize) -> Self::Builder { - TimestampNanosecondBuilder::with_capacity(nrows) + TimestampMicrosecondBuilder::with_capacity(nrows) } fn append(builder: &mut Self::Builder, value: Option) -> Result<()> { @@ -342,17 +340,17 @@ impl ArrowAssoc for Option { fn field(header: &str) -> Field { Field::new( header, - ArrowDataType::Timestamp(TimeUnit::Nanosecond, None), + ArrowDataType::Timestamp(TimeUnit::Microsecond, None), true, ) } } impl ArrowAssoc for NaiveDateTime { - type Builder = TimestampNanosecondBuilder; + type Builder = TimestampMicrosecondBuilder; fn builder(nrows: usize) -> Self::Builder { - TimestampNanosecondBuilder::with_capacity(nrows) + TimestampMicrosecondBuilder::with_capacity(nrows) } fn append(builder: &mut Self::Builder, value: NaiveDateTime) -> Result<()> { @@ -363,7 +361,7 @@ impl ArrowAssoc for NaiveDateTime { fn field(header: &str) -> Field { Field::new( header, - ArrowDataType::Timestamp(TimeUnit::Nanosecond, None), + ArrowDataType::Timestamp(TimeUnit::Microsecond, None), false, ) } diff --git a/connectorx/src/destinations/arrowstream/arrow_assoc.rs b/connectorx/src/destinations/arrowstream/arrow_assoc.rs index 8a5f7372c..d4c36a665 100644 --- a/connectorx/src/destinations/arrowstream/arrow_assoc.rs +++ b/connectorx/src/destinations/arrowstream/arrow_assoc.rs @@ -1,10 +1,11 @@ use super::errors::{ArrowDestinationError, Result}; +use super::typesystem::{DateTimeWrapperMicro, NaiveDateTimeWrapperMicro, NaiveTimeWrapperMicro}; use crate::constants::{DEFAULT_ARROW_DECIMAL, DEFAULT_ARROW_DECIMAL_SCALE, SECONDS_IN_DAY}; use crate::utils::decimal_to_i128; use arrow::array::{ - ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, Decimal128Builder, Float32Builder, + ArrayBuilder, BooleanBuilder, Date32Builder, Decimal128Builder, Float32Builder, Float64Builder, Int32Builder, Int64Builder, LargeBinaryBuilder, LargeListBuilder, - StringBuilder, Time64NanosecondBuilder, TimestampNanosecondBuilder, UInt32Builder, + StringBuilder, Time64NanosecondBuilder, Time64MicrosecondBuilder, TimestampNanosecondBuilder, TimestampMicrosecondBuilder, UInt32Builder, UInt64Builder, }; use arrow::datatypes::Field; @@ -230,6 +231,48 @@ impl ArrowAssoc for Option> { } } +impl ArrowAssoc for DateTimeWrapperMicro { + type Builder = TimestampMicrosecondBuilder; + + fn builder(nrows: usize) -> Self::Builder { + TimestampMicrosecondBuilder::with_capacity(nrows).with_timezone("+00:00") + } + + #[throws(ArrowDestinationError)] + fn append(builder: &mut Self::Builder, value: DateTimeWrapperMicro) { + builder.append_value(value.0.timestamp_micros()); + } + + fn field(header: &str) -> Field { + Field::new( + header, + ArrowDataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + false, + ) + } +} + +impl ArrowAssoc for Option { + type Builder = TimestampMicrosecondBuilder; + + fn builder(nrows: usize) -> Self::Builder { + TimestampMicrosecondBuilder::with_capacity(nrows).with_timezone("+00:00") + } + + #[throws(ArrowDestinationError)] + fn append(builder: &mut Self::Builder, value: Option) { + builder.append_option(value.map(|x| x.0.timestamp_micros())); + } + + fn field(header: &str) -> Field { + Field::new( + header, + ArrowDataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + true, + ) + } +} + fn naive_date_to_arrow(nd: NaiveDate) -> i32 { match nd.and_hms_opt(0, 0, 0) { Some(dt) => (dt.and_utc().timestamp() / SECONDS_IN_DAY) as i32, @@ -238,7 +281,7 @@ fn naive_date_to_arrow(nd: NaiveDate) -> i32 { } fn naive_datetime_to_arrow(nd: NaiveDateTime) -> i64 { - nd.and_utc().timestamp_millis() + nd.and_utc().timestamp_micros() } impl ArrowAssoc for Option { @@ -276,10 +319,10 @@ impl ArrowAssoc for NaiveDate { } impl ArrowAssoc for Option { - type Builder = Date64Builder; + type Builder = TimestampMicrosecondBuilder; fn builder(nrows: usize) -> Self::Builder { - Date64Builder::with_capacity(nrows) + TimestampMicrosecondBuilder::with_capacity(nrows) } fn append(builder: &mut Self::Builder, value: Option) -> Result<()> { @@ -288,15 +331,19 @@ impl ArrowAssoc for Option { } fn field(header: &str) -> Field { - Field::new(header, ArrowDataType::Date64, true) + Field::new( + header, + ArrowDataType::Timestamp(TimeUnit::Microsecond, None), + true, + ) } } impl ArrowAssoc for NaiveDateTime { - type Builder = Date64Builder; + type Builder = TimestampMicrosecondBuilder; fn builder(nrows: usize) -> Self::Builder { - Date64Builder::with_capacity(nrows) + TimestampMicrosecondBuilder::with_capacity(nrows) } fn append(builder: &mut Self::Builder, value: NaiveDateTime) -> Result<()> { @@ -305,7 +352,56 @@ impl ArrowAssoc for NaiveDateTime { } fn field(header: &str) -> Field { - Field::new(header, ArrowDataType::Date64, false) + Field::new( + header, + ArrowDataType::Timestamp(TimeUnit::Microsecond, None), + false, + ) + } +} + +impl ArrowAssoc for Option { + type Builder = TimestampMicrosecondBuilder; + + fn builder(nrows: usize) -> Self::Builder { + TimestampMicrosecondBuilder::with_capacity(nrows) + } + + fn append(builder: &mut Self::Builder, value: Option) -> Result<()> { + builder.append_option(match value { + Some(v) => Some(v.0.and_utc().timestamp_micros()), + None => None, + }); + Ok(()) + } + + fn field(header: &str) -> Field { + Field::new( + header, + ArrowDataType::Timestamp(TimeUnit::Microsecond, None), + true, + ) + } +} + +impl ArrowAssoc for NaiveDateTimeWrapperMicro { + type Builder = TimestampMicrosecondBuilder; + + fn builder(nrows: usize) -> Self::Builder { + TimestampMicrosecondBuilder::with_capacity(nrows) + } + + fn append(builder: &mut Self::Builder, value: NaiveDateTimeWrapperMicro) -> Result<()> { + builder.append_value(value.0.and_utc().timestamp_micros()); + Ok(()) + } + + fn field(header: &str) -> Field { + Field::new( + header, + ArrowDataType::Timestamp(TimeUnit::Microsecond, None), + false, + ) } } @@ -349,6 +445,44 @@ impl ArrowAssoc for NaiveTime { } } +impl ArrowAssoc for Option { + type Builder = Time64MicrosecondBuilder; + + fn builder(nrows: usize) -> Self::Builder { + Time64MicrosecondBuilder::with_capacity(nrows) + } + + fn append(builder: &mut Self::Builder, value: Option) -> Result<()> { + builder.append_option(value.map(|t| { + t.0.num_seconds_from_midnight() as i64 * 1_000_000 + (t.0.nanosecond() as i64) / 1000 + })); + Ok(()) + } + + fn field(header: &str) -> Field { + Field::new(header, ArrowDataType::Time64(TimeUnit::Microsecond), true) + } +} + +impl ArrowAssoc for NaiveTimeWrapperMicro { + type Builder = Time64MicrosecondBuilder; + + fn builder(nrows: usize) -> Self::Builder { + Time64MicrosecondBuilder::with_capacity(nrows) + } + + fn append(builder: &mut Self::Builder, value: NaiveTimeWrapperMicro) -> Result<()> { + builder.append_value( + value.0.num_seconds_from_midnight() as i64 * 1_000_000 + (value.0.nanosecond() as i64) / 1000, + ); + Ok(()) + } + + fn field(header: &str) -> Field { + Field::new(header, ArrowDataType::Time64(TimeUnit::Microsecond), false) + } +} + impl ArrowAssoc for Option> { type Builder = LargeBinaryBuilder; diff --git a/connectorx/src/destinations/arrowstream/typesystem.rs b/connectorx/src/destinations/arrowstream/typesystem.rs index 15ae1dd72..a37150b64 100644 --- a/connectorx/src/destinations/arrowstream/typesystem.rs +++ b/connectorx/src/destinations/arrowstream/typesystem.rs @@ -2,6 +2,15 @@ use crate::impl_typesystem; use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc}; use rust_decimal::Decimal; +#[derive(Debug, Clone, Copy)] +pub struct DateTimeWrapperMicro(pub DateTime); + +#[derive(Debug, Clone, Copy)] +pub struct NaiveTimeWrapperMicro(pub NaiveTime); + +#[derive(Debug, Clone, Copy)] +pub struct NaiveDateTimeWrapperMicro(pub NaiveDateTime); + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] pub enum ArrowTypeSystem { Int32(bool), @@ -16,8 +25,11 @@ pub enum ArrowTypeSystem { LargeBinary(bool), Date32(bool), Date64(bool), + Date64Micro(bool), Time64(bool), + Time64Micro(bool), DateTimeTz(bool), + DateTimeTzMicro(bool), Float32Array(bool), } @@ -36,8 +48,11 @@ impl_typesystem! { { LargeBinary => Vec } { Date32 => NaiveDate } { Date64 => NaiveDateTime } + { Date64Micro => NaiveDateTimeWrapperMicro } { Time64 => NaiveTime } + { Time64Micro => NaiveTimeWrapperMicro } { DateTimeTz => DateTime } + { DateTimeTzMicro => DateTimeWrapperMicro } { Float32Array => Vec> } } } diff --git a/connectorx/src/transports/oracle_arrowstream.rs b/connectorx/src/transports/oracle_arrowstream.rs index c6f53bcc6..2673e3f37 100644 --- a/connectorx/src/transports/oracle_arrowstream.rs +++ b/connectorx/src/transports/oracle_arrowstream.rs @@ -1,6 +1,6 @@ use crate::{ destinations::arrowstream::{ - typesystem::ArrowTypeSystem, ArrowDestination, ArrowDestinationError, + typesystem::{ArrowTypeSystem, DateTimeWrapperMicro, NaiveDateTimeWrapperMicro}, ArrowDestination, ArrowDestinationError, }, impl_transport, sources::oracle::{OracleSource, OracleSourceError, OracleTypeSystem}, @@ -29,19 +29,32 @@ impl_transport!( systems = OracleTypeSystem => ArrowTypeSystem, route = OracleSource => ArrowDestination, mappings = { - { NumFloat[f64] => Float64[f64] | conversion auto } - { Float[f64] => Float64[f64] | conversion none } - { BinaryFloat[f64] => Float64[f64] | conversion none } - { BinaryDouble[f64] => Float64[f64] | conversion none } - { NumInt[i64] => Int64[i64] | conversion auto } - { Blob[Vec] => LargeBinary[Vec] | conversion auto } - { Clob[String] => LargeUtf8[String] | conversion none } - { VarChar[String] => LargeUtf8[String] | conversion auto } - { Char[String] => LargeUtf8[String] | conversion none } - { NVarChar[String] => LargeUtf8[String] | conversion none } - { NChar[String] => LargeUtf8[String] | conversion none } - { Date[NaiveDateTime] => Date64[NaiveDateTime] | conversion auto } - { Timestamp[NaiveDateTime] => Date64[NaiveDateTime] | conversion none } - { TimestampTz[DateTime] => DateTimeTz[DateTime] | conversion auto } + { NumFloat[f64] => Float64[f64] | conversion auto } + { Float[f64] => Float64[f64] | conversion none } + { BinaryFloat[f64] => Float64[f64] | conversion none } + { BinaryDouble[f64] => Float64[f64] | conversion none } + { NumInt[i64] => Int64[i64] | conversion auto } + { Blob[Vec] => LargeBinary[Vec] | conversion auto } + { Clob[String] => LargeUtf8[String] | conversion none } + { VarChar[String] => LargeUtf8[String] | conversion auto } + { Char[String] => LargeUtf8[String] | conversion none } + { NVarChar[String] => LargeUtf8[String] | conversion none } + { NChar[String] => LargeUtf8[String] | conversion none } + { Date[NaiveDateTime] => Date64Micro[NaiveDateTimeWrapperMicro] | conversion option } + { Timestamp[NaiveDateTime] => Date64Micro[NaiveDateTimeWrapperMicro] | conversion none } + { TimestampNano[NaiveDateTime] => Date64[NaiveDateTime] | conversion auto } + { TimestampTz[DateTime] => DateTimeTzMicro[DateTimeWrapperMicro] | conversion option } + { TimestampTzNano[DateTime] => DateTimeTz[DateTime] | conversion auto } } ); +impl TypeConversion for OracleArrowTransport { + fn convert(val: NaiveDateTime) -> NaiveDateTimeWrapperMicro { + NaiveDateTimeWrapperMicro(val) + } +} + +impl TypeConversion, DateTimeWrapperMicro> for OracleArrowTransport { + fn convert(val: DateTime) -> DateTimeWrapperMicro { + DateTimeWrapperMicro(val) + } +}