Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 50 additions & 12 deletions rust/cubestore/cubestore/src/queryplanner/udfs.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::udf_xirr::XirrAccumulator;
use crate::queryplanner::coalesce::{coalesce, SUPPORTED_COALESCE_TYPES};
use crate::queryplanner::hll::{Hll, HllUnion};
use crate::queryplanner::udf_xirr::create_xirr_udaf;
Expand All @@ -18,8 +19,7 @@ use serde_derive::{Deserialize, Serialize};
use smallvec::smallvec;
use smallvec::SmallVec;
use std::sync::Arc;

use super::udf_xirr::XirrAccumulator;
use std::time::SystemTime;

#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
pub enum CubeScalarUDFKind {
Expand Down Expand Up @@ -159,19 +159,39 @@ impl CubeScalarUDF for Now {
}

fn descriptor(&self) -> ScalarUDF {
return ScalarUDF {
ScalarUDF {
name: self.name().to_string(),
signature: Self::signature(),
return_type: Arc::new(|inputs| {
assert!(inputs.is_empty());
Ok(Arc::new(DataType::Timestamp(TimeUnit::Nanosecond, None)))
}),
fun: Arc::new(|_| {
Err(DataFusionError::Internal(
"NOW() was not optimized away".to_string(),
))
let t = match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
Ok(t) => t,
Err(e) => {
return Err(DataFusionError::Internal(format!(
"Failed to get current timestamp: {}",
e
)))
}
};

let nanos = match i64::try_from(t.as_nanos()) {
Ok(t) => t,
Err(e) => {
return Err(DataFusionError::Internal(format!(
"Failed to convert timestamp to i64: {}",
e
)))
}
};

Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
Some(nanos),
)))
}),
};
}
}
}

Expand All @@ -191,19 +211,37 @@ impl CubeScalarUDF for UnixTimestamp {
}

fn descriptor(&self) -> ScalarUDF {
return ScalarUDF {
ScalarUDF {
name: self.name().to_string(),
signature: Self::signature(),
return_type: Arc::new(|inputs| {
assert!(inputs.is_empty());
Ok(Arc::new(DataType::Int64))
}),
fun: Arc::new(|_| {
Err(DataFusionError::Internal(
"UNIX_TIMESTAMP() was not optimized away".to_string(),
))
let t = match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
Ok(t) => t,
Err(e) => {
return Err(DataFusionError::Internal(format!(
"Failed to get current timestamp: {}",
e
)))
}
};

let seconds = match i64::try_from(t.as_secs()) {
Ok(t) => t,
Err(e) => {
return Err(DataFusionError::Internal(format!(
"Failed to convert timestamp to i64: {}",
e
)))
}
};

Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(seconds))))
}),
};
}
}
}

Expand Down
Loading