use std::collections::HashMap;
use std::sync::{Arc, OnceLock};
use std::time::Duration;
use arrow::array::{ArrayRef, BooleanArray};
use common_error::ext::BoxedError;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datafusion_expr::Operator;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::DataType;
use datatypes::types::cast;
use datatypes::value::Value;
use datatypes::vectors::{BooleanVector, Helper, TimestampMillisecondVector, VectorRef};
use serde::{Deserialize, Serialize};
use smallvec::smallvec;
use snafu::{ensure, OptionExt, ResultExt};
use strum::{EnumIter, IntoEnumIterator};
use substrait::df_logical_plan::consumer::name_to_op;
use crate::error::{Error, ExternalSnafu, InvalidQuerySnafu, PlanSnafu, UnexpectedSnafu};
use crate::expr::error::{
ArrowSnafu, CastValueSnafu, DataTypeSnafu, DivisionByZeroSnafu, EvalError, OverflowSnafu,
TryFromValueSnafu, TypeMismatchSnafu,
};
use crate::expr::signature::{GenericFn, Signature};
use crate::expr::{Batch, InvalidArgumentSnafu, ScalarExpr, TypedExpr, TUMBLE_END, TUMBLE_START};
use crate::repr::{self, value_to_internal_ts};
#[derive(Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Hash)]
pub enum UnmaterializableFunc {
Now,
CurrentSchema,
TumbleWindow {
ts: Box<TypedExpr>,
window_size: Duration,
start_time: Option<Timestamp>,
},
}
impl UnmaterializableFunc {
pub fn signature(&self) -> Signature {
match self {
Self::Now => Signature {
input: smallvec![],
output: ConcreteDataType::timestamp_millisecond_datatype(),
generic_fn: GenericFn::Now,
},
Self::CurrentSchema => Signature {
input: smallvec![],
output: ConcreteDataType::string_datatype(),
generic_fn: GenericFn::CurrentSchema,
},
Self::TumbleWindow { .. } => Signature {
input: smallvec![ConcreteDataType::timestamp_millisecond_datatype()],
output: ConcreteDataType::timestamp_millisecond_datatype(),
generic_fn: GenericFn::TumbleWindow,
},
}
}
pub fn is_valid_func_name(name: &str) -> bool {
matches!(
name.to_lowercase().as_str(),
"now" | "current_schema" | "tumble"
)
}
pub fn from_str_args(name: &str, _args: Vec<TypedExpr>) -> Result<Self, Error> {
match name.to_lowercase().as_str() {
"now" => Ok(Self::Now),
"current_schema" => Ok(Self::CurrentSchema),
_ => InvalidQuerySnafu {
reason: format!("Unknown unmaterializable function: {}", name),
}
.fail(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
pub enum UnaryFunc {
Not,
IsNull,
IsTrue,
IsFalse,
StepTimestamp,
Cast(ConcreteDataType),
TumbleWindowFloor {
window_size: Duration,
start_time: Option<Timestamp>,
},
TumbleWindowCeiling {
window_size: Duration,
start_time: Option<Timestamp>,
},
}
impl UnaryFunc {
pub fn signature(&self) -> Signature {
match self {
Self::IsNull => Signature {
input: smallvec![ConcreteDataType::null_datatype()],
output: ConcreteDataType::boolean_datatype(),
generic_fn: GenericFn::IsNull,
},
Self::Not | Self::IsTrue | Self::IsFalse => Signature {
input: smallvec![ConcreteDataType::boolean_datatype()],
output: ConcreteDataType::boolean_datatype(),
generic_fn: match self {
Self::Not => GenericFn::Not,
Self::IsTrue => GenericFn::IsTrue,
Self::IsFalse => GenericFn::IsFalse,
_ => unreachable!(),
},
},
Self::StepTimestamp => Signature {
input: smallvec![ConcreteDataType::timestamp_millisecond_datatype()],
output: ConcreteDataType::timestamp_millisecond_datatype(),
generic_fn: GenericFn::StepTimestamp,
},
Self::Cast(to) => Signature {
input: smallvec![ConcreteDataType::null_datatype()],
output: to.clone(),
generic_fn: GenericFn::Cast,
},
Self::TumbleWindowFloor { .. } => Signature {
input: smallvec![ConcreteDataType::timestamp_millisecond_datatype()],
output: ConcreteDataType::timestamp_millisecond_datatype(),
generic_fn: GenericFn::TumbleWindow,
},
Self::TumbleWindowCeiling { .. } => Signature {
input: smallvec![ConcreteDataType::timestamp_millisecond_datatype()],
output: ConcreteDataType::timestamp_millisecond_datatype(),
generic_fn: GenericFn::TumbleWindow,
},
}
}
pub fn is_valid_func_name(name: &str) -> bool {
matches!(
name.to_lowercase().as_str(),
"not" | "is_null" | "is_true" | "is_false" | "step_timestamp" | "cast"
)
}
pub fn from_str_and_type(
name: &str,
arg_type: Option<ConcreteDataType>,
) -> Result<Self, Error> {
match name {
"not" => Ok(Self::Not),
"is_null" => Ok(Self::IsNull),
"is_true" => Ok(Self::IsTrue),
"is_false" => Ok(Self::IsFalse),
"step_timestamp" => Ok(Self::StepTimestamp),
"cast" => {
let arg_type = arg_type.with_context(|| InvalidQuerySnafu {
reason: "cast function requires a type argument".to_string(),
})?;
Ok(UnaryFunc::Cast(arg_type))
}
_ => InvalidQuerySnafu {
reason: format!("Unknown unary function: {}", name),
}
.fail(),
}
}
pub fn eval_batch(&self, batch: &Batch, expr: &ScalarExpr) -> Result<VectorRef, EvalError> {
let arg_col = expr.eval_batch(batch)?;
match self {
Self::Not => {
let arrow_array = arg_col.to_arrow_array();
let bool_array = arrow_array
.as_any()
.downcast_ref::<BooleanArray>()
.context({
TypeMismatchSnafu {
expected: ConcreteDataType::boolean_datatype(),
actual: arg_col.data_type(),
}
})?;
let ret = arrow::compute::not(bool_array).context(ArrowSnafu { context: "not" })?;
let ret = BooleanVector::from(ret);
Ok(Arc::new(ret))
}
Self::IsNull => {
let arrow_array = arg_col.to_arrow_array();
let ret = arrow::compute::is_null(&arrow_array)
.context(ArrowSnafu { context: "is_null" })?;
let ret = BooleanVector::from(ret);
Ok(Arc::new(ret))
}
Self::IsTrue | Self::IsFalse => {
let arrow_array = arg_col.to_arrow_array();
let bool_array = arrow_array
.as_any()
.downcast_ref::<BooleanArray>()
.context({
TypeMismatchSnafu {
expected: ConcreteDataType::boolean_datatype(),
actual: arg_col.data_type(),
}
})?;
if matches!(self, Self::IsTrue) {
Ok(Arc::new(BooleanVector::from(bool_array.clone())))
} else {
let ret =
arrow::compute::not(bool_array).context(ArrowSnafu { context: "not" })?;
Ok(Arc::new(BooleanVector::from(ret)))
}
}
Self::StepTimestamp => {
let timestamp_array = get_timestamp_array(&arg_col)?;
let timestamp_array_ref = timestamp_array
.as_any()
.downcast_ref::<arrow::array::TimestampMillisecondArray>()
.context({
TypeMismatchSnafu {
expected: ConcreteDataType::boolean_datatype(),
actual: ConcreteDataType::from_arrow_type(timestamp_array.data_type()),
}
})?;
let ret = arrow::compute::unary(timestamp_array_ref, |arr| arr + 1);
let ret = TimestampMillisecondVector::from(ret);
Ok(Arc::new(ret))
}
Self::Cast(to) => {
let arrow_array = arg_col.to_arrow_array();
let ret = arrow::compute::cast(&arrow_array, &to.as_arrow_type())
.context(ArrowSnafu { context: "cast" })?;
let vector = Helper::try_into_vector(ret).context(DataTypeSnafu {
msg: "Fail to convert to Vector",
})?;
Ok(vector)
}
Self::TumbleWindowFloor {
window_size,
start_time,
} => {
let timestamp_array = get_timestamp_array(&arg_col)?;
let date_array_ref = timestamp_array
.as_any()
.downcast_ref::<arrow::array::TimestampMillisecondArray>()
.context({
TypeMismatchSnafu {
expected: ConcreteDataType::boolean_datatype(),
actual: ConcreteDataType::from_arrow_type(timestamp_array.data_type()),
}
})?;
let start_time = start_time.map(|t| t.value());
let window_size = window_size.as_millis() as repr::Duration;
let ret = arrow::compute::unary(date_array_ref, |ts| {
get_window_start(ts, window_size, start_time)
});
let ret = TimestampMillisecondVector::from(ret);
Ok(Arc::new(ret))
}
Self::TumbleWindowCeiling {
window_size,
start_time,
} => {
let timestamp_array = get_timestamp_array(&arg_col)?;
let date_array_ref = timestamp_array
.as_any()
.downcast_ref::<arrow::array::TimestampMillisecondArray>()
.context({
TypeMismatchSnafu {
expected: ConcreteDataType::boolean_datatype(),
actual: ConcreteDataType::from_arrow_type(timestamp_array.data_type()),
}
})?;
let start_time = start_time.map(|t| t.value());
let window_size = window_size.as_millis() as repr::Duration;
let ret = arrow::compute::unary(date_array_ref, |ts| {
get_window_start(ts, window_size, start_time) + window_size
});
let ret = TimestampMillisecondVector::from(ret);
Ok(Arc::new(ret))
}
}
}
pub fn from_tumble_func(name: &str, args: &[TypedExpr]) -> Result<(Self, TypedExpr), Error> {
match name.to_lowercase().as_str() {
TUMBLE_START | TUMBLE_END => {
let ts = args.first().context(InvalidQuerySnafu {
reason: "Tumble window function requires a timestamp argument",
})?;
let window_size = {
let window_size_untyped = args
.get(1)
.and_then(|expr| expr.expr.as_literal())
.context(InvalidQuerySnafu {
reason: "Tumble window function requires a window size argument",
})?;
if let Some(window_size) = window_size_untyped.as_string() {
let interval = cast(
Value::from(window_size),
&ConcreteDataType::interval_day_time_datatype(),
)
.map_err(BoxedError::new)
.context(ExternalSnafu)?
.as_interval_day_time()
.context(UnexpectedSnafu {
reason: "Expect window size arg to be interval after successful cast"
.to_string(),
})?;
Duration::from_millis(interval.as_millis() as u64)
} else if let Some(interval) = window_size_untyped.as_interval_day_time() {
Duration::from_millis(interval.as_millis() as u64)
} else {
InvalidQuerySnafu {
reason: format!(
"Tumble window function requires window size argument to be either a interval or a string describe a interval, found {:?}",
window_size_untyped
)
}.fail()?
}
};
let start_time = match args.get(2) {
Some(start_time) => {
if let Some(value) = start_time.expr.as_literal() {
let ret = cast(
value,
&ConcreteDataType::timestamp_millisecond_datatype(),
)
.map_err(BoxedError::new)
.context(ExternalSnafu)?
.as_timestamp()
.context(UnexpectedSnafu {
reason:
"Expect start time arg to be timestamp after successful cast"
.to_string(),
})?;
Some(ret)
} else {
UnexpectedSnafu {
reason: "Expect start time arg to be literal",
}
.fail()?
}
}
None => None,
};
if name == TUMBLE_START {
Ok((
Self::TumbleWindowFloor {
window_size,
start_time,
},
ts.clone(),
))
} else if name == TUMBLE_END {
Ok((
Self::TumbleWindowCeiling {
window_size,
start_time,
},
ts.clone(),
))
} else {
unreachable!()
}
}
_ => crate::error::InternalSnafu {
reason: format!("Unknown tumble kind function: {}", name),
}
.fail()?,
}
}
pub fn eval(&self, values: &[Value], expr: &ScalarExpr) -> Result<Value, EvalError> {
let arg = expr.eval(values)?;
match self {
Self::Not => {
let bool = if let Value::Boolean(bool) = arg {
Ok(bool)
} else {
TypeMismatchSnafu {
expected: ConcreteDataType::boolean_datatype(),
actual: arg.data_type(),
}
.fail()?
}?;
Ok(Value::from(!bool))
}
Self::IsNull => Ok(Value::from(arg.is_null())),
Self::IsTrue | Self::IsFalse => {
let bool = if let Value::Boolean(bool) = arg {
Ok(bool)
} else {
TypeMismatchSnafu {
expected: ConcreteDataType::boolean_datatype(),
actual: arg.data_type(),
}
.fail()?
}?;
if matches!(self, Self::IsTrue) {
Ok(Value::from(bool))
} else {
Ok(Value::from(!bool))
}
}
Self::StepTimestamp => {
let ty = arg.data_type();
if let Value::Timestamp(timestamp) = arg {
let timestamp = Timestamp::new_millisecond(timestamp.value() + 1);
Ok(Value::from(timestamp))
} else if let Ok(v) = value_to_internal_ts(arg) {
let timestamp = Timestamp::new_millisecond(v + 1);
Ok(Value::from(timestamp))
} else {
TypeMismatchSnafu {
expected: ConcreteDataType::timestamp_millisecond_datatype(),
actual: ty,
}
.fail()?
}
}
Self::Cast(to) => {
let arg_ty = arg.data_type();
cast(arg, to).context({
CastValueSnafu {
from: arg_ty,
to: to.clone(),
}
})
}
Self::TumbleWindowFloor {
window_size,
start_time,
} => {
let ts = get_ts_as_millisecond(arg)?;
let start_time = start_time.map(|t| t.value());
let window_size = window_size.as_millis() as repr::Duration;
let window_start = get_window_start(ts, window_size, start_time);
let ret = Timestamp::new_millisecond(window_start);
Ok(Value::from(ret))
}
Self::TumbleWindowCeiling {
window_size,
start_time,
} => {
let ts = get_ts_as_millisecond(arg)?;
let start_time = start_time.map(|t| t.value());
let window_size = window_size.as_millis() as repr::Duration;
let window_start = get_window_start(ts, window_size, start_time);
let window_end = window_start + window_size;
let ret = Timestamp::new_millisecond(window_end);
Ok(Value::from(ret))
}
}
}
}
fn get_timestamp_array(vector: &VectorRef) -> Result<arrow::array::ArrayRef, EvalError> {
let arrow_array = vector.to_arrow_array();
let timestamp_array = if *arrow_array.data_type()
== ConcreteDataType::timestamp_millisecond_datatype().as_arrow_type()
{
arrow_array
} else {
arrow::compute::cast(
&arrow_array,
&ConcreteDataType::timestamp_millisecond_datatype().as_arrow_type(),
)
.context(ArrowSnafu {
context: "Trying to cast to timestamp in StepTimestamp",
})?
};
Ok(timestamp_array)
}
fn get_window_start(
ts: repr::Timestamp,
window_size: repr::Duration,
start_time: Option<repr::Timestamp>,
) -> repr::Timestamp {
let start_time = start_time.unwrap_or(0);
if ts >= start_time {
start_time + (ts - start_time) / window_size * window_size
} else {
start_time + (ts - start_time) / window_size * window_size
- if ((start_time - ts) % window_size) != 0 {
window_size
} else {
0
}
}
}
#[test]
fn test_get_window_start() {
assert_eq!(get_window_start(1, 3, None), 0);
assert_eq!(get_window_start(3, 3, None), 3);
assert_eq!(get_window_start(0, 3, None), 0);
assert_eq!(get_window_start(-1, 3, None), -3);
assert_eq!(get_window_start(-3, 3, None), -3);
}
fn get_ts_as_millisecond(arg: Value) -> Result<repr::Timestamp, EvalError> {
let ts = if let Some(ts) = arg.as_timestamp() {
ts.convert_to(TimeUnit::Millisecond)
.context(OverflowSnafu)?
.value()
} else {
InvalidArgumentSnafu {
reason: "Expect input to be timestamp or datetime type",
}
.fail()?
};
Ok(ts)
}
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash, EnumIter,
)]
pub enum BinaryFunc {
Eq,
NotEq,
Lt,
Lte,
Gt,
Gte,
AddInt16,
AddInt32,
AddInt64,
AddUInt16,
AddUInt32,
AddUInt64,
AddFloat32,
AddFloat64,
SubInt16,
SubInt32,
SubInt64,
SubUInt16,
SubUInt32,
SubUInt64,
SubFloat32,
SubFloat64,
MulInt16,
MulInt32,
MulInt64,
MulUInt16,
MulUInt32,
MulUInt64,
MulFloat32,
MulFloat64,
DivInt16,
DivInt32,
DivInt64,
DivUInt16,
DivUInt32,
DivUInt64,
DivFloat32,
DivFloat64,
ModInt16,
ModInt32,
ModInt64,
ModUInt16,
ModUInt32,
ModUInt64,
}
macro_rules! generate_binary_signature {
($value:ident, { $($user_arm:tt)* },
[ $(
$auto_arm:ident=>($con_type:ident,$generic:ident)
),*
]) => {
match $value {
$($user_arm)*,
$(
Self::$auto_arm => Signature {
input: smallvec![
ConcreteDataType::$con_type(),
ConcreteDataType::$con_type(),
],
output: ConcreteDataType::$con_type(),
generic_fn: GenericFn::$generic,
},
)*
}
};
}
static SPECIALIZATION: OnceLock<HashMap<(GenericFn, ConcreteDataType), BinaryFunc>> =
OnceLock::new();
impl BinaryFunc {
pub fn signature(&self) -> Signature {
generate_binary_signature!(self, {
Self::Eq | Self::NotEq | Self::Lt | Self::Lte | Self::Gt | Self::Gte => Signature {
input: smallvec![
ConcreteDataType::null_datatype(),
ConcreteDataType::null_datatype()
],
output: ConcreteDataType::boolean_datatype(),
generic_fn: match self {
Self::Eq => GenericFn::Eq,
Self::NotEq => GenericFn::NotEq,
Self::Lt => GenericFn::Lt,
Self::Lte => GenericFn::Lte,
Self::Gt => GenericFn::Gt,
Self::Gte => GenericFn::Gte,
_ => unreachable!(),
},
}
},
[
AddInt16=>(int16_datatype,Add),
AddInt32=>(int32_datatype,Add),
AddInt64=>(int64_datatype,Add),
AddUInt16=>(uint16_datatype,Add),
AddUInt32=>(uint32_datatype,Add),
AddUInt64=>(uint64_datatype,Add),
AddFloat32=>(float32_datatype,Add),
AddFloat64=>(float64_datatype,Add),
SubInt16=>(int16_datatype,Sub),
SubInt32=>(int32_datatype,Sub),
SubInt64=>(int64_datatype,Sub),
SubUInt16=>(uint16_datatype,Sub),
SubUInt32=>(uint32_datatype,Sub),
SubUInt64=>(uint64_datatype,Sub),
SubFloat32=>(float32_datatype,Sub),
SubFloat64=>(float64_datatype,Sub),
MulInt16=>(int16_datatype,Mul),
MulInt32=>(int32_datatype,Mul),
MulInt64=>(int64_datatype,Mul),
MulUInt16=>(uint16_datatype,Mul),
MulUInt32=>(uint32_datatype,Mul),
MulUInt64=>(uint64_datatype,Mul),
MulFloat32=>(float32_datatype,Mul),
MulFloat64=>(float64_datatype,Mul),
DivInt16=>(int16_datatype,Div),
DivInt32=>(int32_datatype,Div),
DivInt64=>(int64_datatype,Div),
DivUInt16=>(uint16_datatype,Div),
DivUInt32=>(uint32_datatype,Div),
DivUInt64=>(uint64_datatype,Div),
DivFloat32=>(float32_datatype,Div),
DivFloat64=>(float64_datatype,Div),
ModInt16=>(int16_datatype,Mod),
ModInt32=>(int32_datatype,Mod),
ModInt64=>(int64_datatype,Mod),
ModUInt16=>(uint16_datatype,Mod),
ModUInt32=>(uint32_datatype,Mod),
ModUInt64=>(uint64_datatype,Mod)
]
)
}
pub fn add(input_type: ConcreteDataType) -> Result<Self, Error> {
Self::specialization(GenericFn::Add, input_type)
}
pub fn sub(input_type: ConcreteDataType) -> Result<Self, Error> {
Self::specialization(GenericFn::Sub, input_type)
}
pub fn mul(input_type: ConcreteDataType) -> Result<Self, Error> {
Self::specialization(GenericFn::Mul, input_type)
}
pub fn div(input_type: ConcreteDataType) -> Result<Self, Error> {
Self::specialization(GenericFn::Div, input_type)
}
pub fn specialization(generic: GenericFn, input_type: ConcreteDataType) -> Result<Self, Error> {
let rule = SPECIALIZATION.get_or_init(|| {
let mut spec = HashMap::new();
for func in BinaryFunc::iter() {
let sig = func.signature();
spec.insert((sig.generic_fn, sig.input[0].clone()), func);
}
spec
});
rule.get(&(generic, input_type.clone()))
.cloned()
.with_context(|| InvalidQuerySnafu {
reason: format!(
"No specialization found for binary function {:?} with input type {:?}",
generic, input_type
),
})
}
pub(crate) fn infer_type_from(
generic: GenericFn,
arg_exprs: &[ScalarExpr],
arg_types: &[Option<ConcreteDataType>],
) -> Result<ConcreteDataType, Error> {
let ret = match (arg_types[0].as_ref(), arg_types[1].as_ref()) {
(Some(t1), Some(t2)) => {
ensure!(
t1 == t2,
InvalidQuerySnafu {
reason: format!(
"Binary function {:?} requires both arguments to have the same type, left={:?}, right={:?}",
generic, t1, t2
),
}
);
t1.clone()
}
(Some(t), None) | (None, Some(t)) => t.clone(),
_ => arg_exprs[0]
.as_literal()
.map(|lit| lit.data_type())
.or_else(|| arg_exprs[1].as_literal().map(|lit| lit.data_type()))
.with_context(|| InvalidQuerySnafu {
reason: format!(
"Binary function {:?} requires at least one argument with known type",
generic
),
})?,
};
Ok(ret)
}
pub fn is_valid_func_name(name: &str) -> bool {
matches!(
name.to_lowercase().as_str(),
"eq" | "equal"
| "not_eq"
| "not_equal"
| "lt"
| "lte"
| "gt"
| "gte"
| "add"
| "sub"
| "subtract"
| "mul"
| "multiply"
| "div"
| "divide"
| "mod"
)
}
pub fn from_str_expr_and_type(
name: &str,
arg_exprs: &[ScalarExpr],
arg_types: &[Option<ConcreteDataType>],
) -> Result<(Self, Signature), Error> {
let op = name_to_op(name).with_context(|| InvalidQuerySnafu {
reason: format!("Unsupported binary function: {}", name),
})?;
let generic_fn = {
match op {
Operator::Eq => GenericFn::Eq,
Operator::NotEq => GenericFn::NotEq,
Operator::Lt => GenericFn::Lt,
Operator::LtEq => GenericFn::Lte,
Operator::Gt => GenericFn::Gt,
Operator::GtEq => GenericFn::Gte,
Operator::Plus => GenericFn::Add,
Operator::Minus => GenericFn::Sub,
Operator::Multiply => GenericFn::Mul,
Operator::Divide => GenericFn::Div,
Operator::Modulo => GenericFn::Mod,
_ => {
return InvalidQuerySnafu {
reason: format!("Unsupported binary function: {}", name),
}
.fail();
}
}
};
let need_type = matches!(
generic_fn,
GenericFn::Add | GenericFn::Sub | GenericFn::Mul | GenericFn::Div | GenericFn::Mod
);
ensure!(
arg_exprs.len() == 2 && arg_types.len() == 2,
PlanSnafu {
reason: "Binary function requires exactly 2 arguments".to_string()
}
);
let arg_type = Self::infer_type_from(generic_fn, arg_exprs, arg_types)?;
let query_input_type = if need_type {
arg_type.clone()
} else {
ConcreteDataType::null_datatype()
};
let spec_fn = Self::specialization(generic_fn, query_input_type)?;
let signature = Signature {
input: smallvec![arg_type.clone(), arg_type],
output: spec_fn.signature().output,
generic_fn,
};
Ok((spec_fn, signature))
}
pub fn eval_batch(
&self,
batch: &Batch,
expr1: &ScalarExpr,
expr2: &ScalarExpr,
) -> Result<VectorRef, EvalError> {
let left = expr1.eval_batch(batch)?;
let left = left.to_arrow_array();
let right = expr2.eval_batch(batch)?;
let right = right.to_arrow_array();
let arrow_array: ArrayRef = match self {
Self::Eq => Arc::new(
arrow::compute::kernels::cmp::eq(&left, &right)
.context(ArrowSnafu { context: "eq" })?,
),
Self::NotEq => Arc::new(
arrow::compute::kernels::cmp::neq(&left, &right)
.context(ArrowSnafu { context: "neq" })?,
),
Self::Lt => Arc::new(
arrow::compute::kernels::cmp::lt(&left, &right)
.context(ArrowSnafu { context: "lt" })?,
),
Self::Lte => Arc::new(
arrow::compute::kernels::cmp::lt_eq(&left, &right)
.context(ArrowSnafu { context: "lte" })?,
),
Self::Gt => Arc::new(
arrow::compute::kernels::cmp::gt(&left, &right)
.context(ArrowSnafu { context: "gt" })?,
),
Self::Gte => Arc::new(
arrow::compute::kernels::cmp::gt_eq(&left, &right)
.context(ArrowSnafu { context: "gte" })?,
),
Self::AddInt16
| Self::AddInt32
| Self::AddInt64
| Self::AddUInt16
| Self::AddUInt32
| Self::AddUInt64
| Self::AddFloat32
| Self::AddFloat64 => arrow::compute::kernels::numeric::add(&left, &right)
.context(ArrowSnafu { context: "add" })?,
Self::SubInt16
| Self::SubInt32
| Self::SubInt64
| Self::SubUInt16
| Self::SubUInt32
| Self::SubUInt64
| Self::SubFloat32
| Self::SubFloat64 => arrow::compute::kernels::numeric::sub(&left, &right)
.context(ArrowSnafu { context: "sub" })?,
Self::MulInt16
| Self::MulInt32
| Self::MulInt64
| Self::MulUInt16
| Self::MulUInt32
| Self::MulUInt64
| Self::MulFloat32
| Self::MulFloat64 => arrow::compute::kernels::numeric::mul(&left, &right)
.context(ArrowSnafu { context: "mul" })?,
Self::DivInt16
| Self::DivInt32
| Self::DivInt64
| Self::DivUInt16
| Self::DivUInt32
| Self::DivUInt64
| Self::DivFloat32
| Self::DivFloat64 => arrow::compute::kernels::numeric::div(&left, &right)
.context(ArrowSnafu { context: "div" })?,
Self::ModInt16
| Self::ModInt32
| Self::ModInt64
| Self::ModUInt16
| Self::ModUInt32
| Self::ModUInt64 => arrow::compute::kernels::numeric::rem(&left, &right)
.context(ArrowSnafu { context: "rem" })?,
};
let vector = Helper::try_into_vector(arrow_array).context(DataTypeSnafu {
msg: "Fail to convert to Vector",
})?;
Ok(vector)
}
pub fn eval(
&self,
values: &[Value],
expr1: &ScalarExpr,
expr2: &ScalarExpr,
) -> Result<Value, EvalError> {
let left = expr1.eval(values)?;
let right = expr2.eval(values)?;
match self {
Self::Eq => Ok(Value::from(left == right)),
Self::NotEq => Ok(Value::from(left != right)),
Self::Lt => Ok(Value::from(left < right)),
Self::Lte => Ok(Value::from(left <= right)),
Self::Gt => Ok(Value::from(left > right)),
Self::Gte => Ok(Value::from(left >= right)),
Self::AddInt16 => Ok(add::<i16>(left, right)?),
Self::AddInt32 => Ok(add::<i32>(left, right)?),
Self::AddInt64 => Ok(add::<i64>(left, right)?),
Self::AddUInt16 => Ok(add::<u16>(left, right)?),
Self::AddUInt32 => Ok(add::<u32>(left, right)?),
Self::AddUInt64 => Ok(add::<u64>(left, right)?),
Self::AddFloat32 => Ok(add::<f32>(left, right)?),
Self::AddFloat64 => Ok(add::<f64>(left, right)?),
Self::SubInt16 => Ok(sub::<i16>(left, right)?),
Self::SubInt32 => Ok(sub::<i32>(left, right)?),
Self::SubInt64 => Ok(sub::<i64>(left, right)?),
Self::SubUInt16 => Ok(sub::<u16>(left, right)?),
Self::SubUInt32 => Ok(sub::<u32>(left, right)?),
Self::SubUInt64 => Ok(sub::<u64>(left, right)?),
Self::SubFloat32 => Ok(sub::<f32>(left, right)?),
Self::SubFloat64 => Ok(sub::<f64>(left, right)?),
Self::MulInt16 => Ok(mul::<i16>(left, right)?),
Self::MulInt32 => Ok(mul::<i32>(left, right)?),
Self::MulInt64 => Ok(mul::<i64>(left, right)?),
Self::MulUInt16 => Ok(mul::<u16>(left, right)?),
Self::MulUInt32 => Ok(mul::<u32>(left, right)?),
Self::MulUInt64 => Ok(mul::<u64>(left, right)?),
Self::MulFloat32 => Ok(mul::<f32>(left, right)?),
Self::MulFloat64 => Ok(mul::<f64>(left, right)?),
Self::DivInt16 => Ok(div::<i16>(left, right)?),
Self::DivInt32 => Ok(div::<i32>(left, right)?),
Self::DivInt64 => Ok(div::<i64>(left, right)?),
Self::DivUInt16 => Ok(div::<u16>(left, right)?),
Self::DivUInt32 => Ok(div::<u32>(left, right)?),
Self::DivUInt64 => Ok(div::<u64>(left, right)?),
Self::DivFloat32 => Ok(div::<f32>(left, right)?),
Self::DivFloat64 => Ok(div::<f64>(left, right)?),
Self::ModInt16 => Ok(rem::<i16>(left, right)?),
Self::ModInt32 => Ok(rem::<i32>(left, right)?),
Self::ModInt64 => Ok(rem::<i64>(left, right)?),
Self::ModUInt16 => Ok(rem::<u16>(left, right)?),
Self::ModUInt32 => Ok(rem::<u32>(left, right)?),
Self::ModUInt64 => Ok(rem::<u64>(left, right)?),
}
}
pub fn reverse_compare(&self) -> Result<Self, Error> {
let ret = match &self {
BinaryFunc::Eq => BinaryFunc::Eq,
BinaryFunc::NotEq => BinaryFunc::NotEq,
BinaryFunc::Lt => BinaryFunc::Gt,
BinaryFunc::Lte => BinaryFunc::Gte,
BinaryFunc::Gt => BinaryFunc::Lt,
BinaryFunc::Gte => BinaryFunc::Lte,
_ => {
return InvalidQuerySnafu {
reason: format!("Expect a comparison operator, found {:?}", self),
}
.fail();
}
};
Ok(ret)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
pub enum VariadicFunc {
And,
Or,
}
impl VariadicFunc {
pub fn signature(&self) -> Signature {
Signature {
input: smallvec![ConcreteDataType::boolean_datatype()],
output: ConcreteDataType::boolean_datatype(),
generic_fn: match self {
Self::And => GenericFn::And,
Self::Or => GenericFn::Or,
},
}
}
pub fn is_valid_func_name(name: &str) -> bool {
matches!(name.to_lowercase().as_str(), "and" | "or")
}
pub fn from_str_and_types(
name: &str,
arg_types: &[Option<ConcreteDataType>],
) -> Result<Self, Error> {
let _ = arg_types;
match name {
"and" => Ok(Self::And),
"or" => Ok(Self::Or),
_ => InvalidQuerySnafu {
reason: format!("Unknown variadic function: {}", name),
}
.fail(),
}
}
pub fn eval_batch(&self, batch: &Batch, exprs: &[ScalarExpr]) -> Result<VectorRef, EvalError> {
ensure!(
!exprs.is_empty(),
InvalidArgumentSnafu {
reason: format!("Variadic function {:?} requires at least 1 arguments", self)
}
);
let args = exprs
.iter()
.map(|expr| expr.eval_batch(batch).map(|v| v.to_arrow_array()))
.collect::<Result<Vec<_>, _>>()?;
let mut iter = args.into_iter();
let first = iter.next().unwrap();
let mut left = first
.as_any()
.downcast_ref::<BooleanArray>()
.context({
TypeMismatchSnafu {
expected: ConcreteDataType::boolean_datatype(),
actual: ConcreteDataType::from_arrow_type(first.data_type()),
}
})?
.clone();
for right in iter {
let right = right.as_any().downcast_ref::<BooleanArray>().context({
TypeMismatchSnafu {
expected: ConcreteDataType::boolean_datatype(),
actual: ConcreteDataType::from_arrow_type(right.data_type()),
}
})?;
left = match self {
Self::And => {
arrow::compute::and(&left, right).context(ArrowSnafu { context: "and" })?
}
Self::Or => {
arrow::compute::or(&left, right).context(ArrowSnafu { context: "or" })?
}
}
}
Ok(Arc::new(BooleanVector::from(left)))
}
pub fn eval(&self, values: &[Value], exprs: &[ScalarExpr]) -> Result<Value, EvalError> {
match self {
VariadicFunc::And => and(values, exprs),
VariadicFunc::Or => or(values, exprs),
}
}
}
fn and(values: &[Value], exprs: &[ScalarExpr]) -> Result<Value, EvalError> {
let mut null = false;
for expr in exprs {
match expr.eval(values) {
Ok(Value::Boolean(true)) => {}
Ok(Value::Boolean(false)) => return Ok(Value::Boolean(false)), Ok(Value::Null) => null = true,
Err(this_err) => {
return Err(this_err);
} Ok(x) => InvalidArgumentSnafu {
reason: format!(
"`and()` only support boolean type, found value {:?} of type {:?}",
x,
x.data_type()
),
}
.fail()?,
}
}
match null {
true => Ok(Value::Null),
false => Ok(Value::Boolean(true)),
}
}
fn or(values: &[Value], exprs: &[ScalarExpr]) -> Result<Value, EvalError> {
let mut null = false;
for expr in exprs {
match expr.eval(values) {
Ok(Value::Boolean(true)) => return Ok(Value::Boolean(true)), Ok(Value::Boolean(false)) => {}
Ok(Value::Null) => null = true,
Err(this_err) => {
return Err(this_err);
} Ok(x) => InvalidArgumentSnafu {
reason: format!(
"`or()` only support boolean type, found value {:?} of type {:?}",
x,
x.data_type()
),
}
.fail()?,
}
}
match null {
true => Ok(Value::Null),
false => Ok(Value::Boolean(false)),
}
}
fn add<T>(left: Value, right: Value) -> Result<Value, EvalError>
where
T: TryFrom<Value, Error = datatypes::Error> + num_traits::Num,
Value: From<T>,
{
let left = T::try_from(left).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?;
let right = T::try_from(right).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?;
Ok(Value::from(left + right))
}
fn sub<T>(left: Value, right: Value) -> Result<Value, EvalError>
where
T: TryFrom<Value, Error = datatypes::Error> + num_traits::Num,
Value: From<T>,
{
let left = T::try_from(left).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?;
let right = T::try_from(right).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?;
Ok(Value::from(left - right))
}
fn mul<T>(left: Value, right: Value) -> Result<Value, EvalError>
where
T: TryFrom<Value, Error = datatypes::Error> + num_traits::Num,
Value: From<T>,
{
let left = T::try_from(left).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?;
let right = T::try_from(right).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?;
Ok(Value::from(left * right))
}
fn div<T>(left: Value, right: Value) -> Result<Value, EvalError>
where
T: TryFrom<Value, Error = datatypes::Error> + num_traits::Num,
<T as TryFrom<Value>>::Error: std::fmt::Debug,
Value: From<T>,
{
let left = T::try_from(left).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?;
let right = T::try_from(right).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?;
if right.is_zero() {
return Err(DivisionByZeroSnafu {}.build());
}
Ok(Value::from(left / right))
}
fn rem<T>(left: Value, right: Value) -> Result<Value, EvalError>
where
T: TryFrom<Value, Error = datatypes::Error> + num_traits::Num,
<T as TryFrom<Value>>::Error: std::fmt::Debug,
Value: From<T>,
{
let left = T::try_from(left).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?;
let right = T::try_from(right).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?;
Ok(Value::from(left % right))
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use datatypes::vectors::Vector;
use pretty_assertions::assert_eq;
use super::*;
#[test]
fn test_tumble_batch() {
let timestamp_vector = TimestampMillisecondVector::from_vec(vec![1, 2, 10, 13, 14, 20, 25]);
let tumble_start = UnaryFunc::TumbleWindowFloor {
window_size: Duration::from_millis(10),
start_time: None,
};
let tumble_end = UnaryFunc::TumbleWindowCeiling {
window_size: Duration::from_millis(10),
start_time: None,
};
let len = timestamp_vector.len();
let batch = Batch::try_new(vec![Arc::new(timestamp_vector)], len).unwrap();
let arg = ScalarExpr::Column(0);
let start = tumble_start.eval_batch(&batch, &arg).unwrap();
let end = tumble_end.eval_batch(&batch, &arg).unwrap();
assert_eq!(
start.to_arrow_array().as_ref(),
TimestampMillisecondVector::from_vec(vec![0, 0, 10, 10, 10, 20, 20])
.to_arrow_array()
.as_ref()
);
assert_eq!(
end.to_arrow_array().as_ref(),
TimestampMillisecondVector::from_vec(vec![10, 10, 20, 20, 20, 30, 30])
.to_arrow_array()
.as_ref()
);
let ts_ms_vector = TimestampMillisecondVector::from_vec(vec![1, 2, 10, 13, 14, 20, 25]);
let batch = Batch::try_new(vec![Arc::new(ts_ms_vector)], len).unwrap();
let start = tumble_start.eval_batch(&batch, &arg).unwrap();
let end = tumble_end.eval_batch(&batch, &arg).unwrap();
assert_eq!(
start.to_arrow_array().as_ref(),
TimestampMillisecondVector::from_vec(vec![0, 0, 10, 10, 10, 20, 20])
.to_arrow_array()
.as_ref()
);
assert_eq!(
end.to_arrow_array().as_ref(),
TimestampMillisecondVector::from_vec(vec![10, 10, 20, 20, 20, 30, 30])
.to_arrow_array()
.as_ref()
);
}
#[test]
fn test_num_ops() {
let left = Value::from(10);
let right = Value::from(3);
let res = add::<i32>(left.clone(), right.clone()).unwrap();
assert_eq!(res, Value::from(13));
let res = sub::<i32>(left.clone(), right.clone()).unwrap();
assert_eq!(res, Value::from(7));
let res = mul::<i32>(left.clone(), right.clone()).unwrap();
assert_eq!(res, Value::from(30));
let res = div::<i32>(left.clone(), right.clone()).unwrap();
assert_eq!(res, Value::from(3));
let res = rem::<i32>(left, right).unwrap();
assert_eq!(res, Value::from(1));
let values = vec![Value::from(true), Value::from(false)];
let exprs = vec![ScalarExpr::Column(0), ScalarExpr::Column(1)];
let res = and(&values, &exprs).unwrap();
assert_eq!(res, Value::from(false));
let res = or(&values, &exprs).unwrap();
assert_eq!(res, Value::from(true));
}
#[test]
fn test_binary_func_spec() {
assert_eq!(
BinaryFunc::from_str_expr_and_type(
"add",
&[ScalarExpr::Column(0), ScalarExpr::Column(0)],
&[
Some(ConcreteDataType::int32_datatype()),
Some(ConcreteDataType::int32_datatype())
]
)
.unwrap(),
(BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature())
);
assert_eq!(
BinaryFunc::from_str_expr_and_type(
"add",
&[ScalarExpr::Column(0), ScalarExpr::Column(0)],
&[Some(ConcreteDataType::int32_datatype()), None]
)
.unwrap(),
(BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature())
);
assert_eq!(
BinaryFunc::from_str_expr_and_type(
"add",
&[ScalarExpr::Column(0), ScalarExpr::Column(0)],
&[Some(ConcreteDataType::int32_datatype()), None]
)
.unwrap(),
(BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature())
);
assert_eq!(
BinaryFunc::from_str_expr_and_type(
"add",
&[ScalarExpr::Column(0), ScalarExpr::Column(0)],
&[Some(ConcreteDataType::int32_datatype()), None]
)
.unwrap(),
(BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature())
);
assert_eq!(
BinaryFunc::from_str_expr_and_type(
"add",
&[
ScalarExpr::Literal(Value::from(1i32), ConcreteDataType::int32_datatype()),
ScalarExpr::Column(0)
],
&[None, None]
)
.unwrap(),
(BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature())
);
assert_eq!(
BinaryFunc::from_str_expr_and_type(
"equal",
&[
ScalarExpr::Literal(Value::from(1i32), ConcreteDataType::int32_datatype()),
ScalarExpr::Column(0)
],
&[None, None]
)
.unwrap(),
(
BinaryFunc::Eq,
Signature {
input: smallvec![
ConcreteDataType::int32_datatype(),
ConcreteDataType::int32_datatype()
],
output: ConcreteDataType::boolean_datatype(),
generic_fn: GenericFn::Eq
}
)
);
matches!(
BinaryFunc::from_str_expr_and_type(
"add",
&[ScalarExpr::Column(0), ScalarExpr::Column(0)],
&[None, None]
),
Err(Error::InvalidQuery { .. })
);
}
#[test]
fn test_cast_int() {
let interval = cast(
Value::from("1 second"),
&ConcreteDataType::interval_day_time_datatype(),
)
.unwrap();
assert_eq!(
interval,
Value::from(common_time::IntervalDayTime::new(0, 1000))
);
}
}