use std::any::type_name;
use std::fmt::Display;
use common_decimal::Decimal128;
use datatypes::data_type::ConcreteDataType;
use datatypes::value::{OrderedF32, OrderedF64, OrderedFloat, Value};
use enum_dispatch::enum_dispatch;
use serde::{Deserialize, Serialize};
use snafu::ensure;
use crate::expr::error::{InternalSnafu, OverflowSnafu, TryFromValueSnafu, TypeMismatchSnafu};
use crate::expr::signature::GenericFn;
use crate::expr::{AggregateFunc, EvalError};
use crate::repr::Diff;
#[enum_dispatch]
pub trait Accumulator: Sized {
fn into_state(self) -> Vec<Value>;
fn update(
&mut self,
aggr_fn: &AggregateFunc,
value: Value,
diff: Diff,
) -> Result<(), EvalError>;
fn update_batch<I>(&mut self, aggr_fn: &AggregateFunc, value_diffs: I) -> Result<(), EvalError>
where
I: IntoIterator<Item = (Value, Diff)>,
{
for (v, d) in value_diffs {
self.update(aggr_fn, v, d)?;
}
Ok(())
}
fn eval(&self, aggr_fn: &AggregateFunc) -> Result<Value, EvalError>;
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct Bool {
trues: Diff,
falses: Diff,
}
impl Bool {
pub fn try_from_iter<I>(iter: &mut I) -> Result<Self, EvalError>
where
I: Iterator<Item = Value>,
{
Ok(Self {
trues: Diff::try_from(iter.next().ok_or_else(fail_accum::<Self>)?)
.map_err(err_try_from_val)?,
falses: Diff::try_from(iter.next().ok_or_else(fail_accum::<Self>)?)
.map_err(err_try_from_val)?,
})
}
}
impl TryFrom<Vec<Value>> for Bool {
type Error = EvalError;
fn try_from(state: Vec<Value>) -> Result<Self, Self::Error> {
ensure!(
state.len() == 2,
InternalSnafu {
reason: "Bool Accumulator state should have 2 values",
}
);
let mut iter = state.into_iter();
Self::try_from_iter(&mut iter)
}
}
impl Accumulator for Bool {
fn into_state(self) -> Vec<Value> {
vec![self.trues.into(), self.falses.into()]
}
fn update(
&mut self,
aggr_fn: &AggregateFunc,
value: Value,
diff: Diff,
) -> Result<(), EvalError> {
ensure!(
matches!(
aggr_fn,
AggregateFunc::Any
| AggregateFunc::All
| AggregateFunc::MaxBool
| AggregateFunc::MinBool
),
InternalSnafu {
reason: format!(
"Bool Accumulator does not support this aggregation function: {:?}",
aggr_fn
),
}
);
match value {
Value::Boolean(true) => self.trues += diff,
Value::Boolean(false) => self.falses += diff,
Value::Null => (), x => {
return Err(TypeMismatchSnafu {
expected: ConcreteDataType::boolean_datatype(),
actual: x.data_type(),
}
.build());
}
};
Ok(())
}
fn eval(&self, aggr_fn: &AggregateFunc) -> Result<Value, EvalError> {
match aggr_fn {
AggregateFunc::Any => Ok(Value::from(self.trues > 0)),
AggregateFunc::All => Ok(Value::from(self.falses == 0)),
AggregateFunc::MaxBool => Ok(Value::from(self.trues > 0)),
AggregateFunc::MinBool => Ok(Value::from(self.falses == 0)),
_ => Err(InternalSnafu {
reason: format!(
"Bool Accumulator does not support this aggregation function: {:?}",
aggr_fn
),
}
.build()),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct SimpleNumber {
accum: i128,
non_nulls: Diff,
}
impl SimpleNumber {
pub fn try_from_iter<I>(iter: &mut I) -> Result<Self, EvalError>
where
I: Iterator<Item = Value>,
{
Ok(Self {
accum: Decimal128::try_from(iter.next().ok_or_else(fail_accum::<Self>)?)
.map_err(err_try_from_val)?
.val(),
non_nulls: Diff::try_from(iter.next().ok_or_else(fail_accum::<Self>)?)
.map_err(err_try_from_val)?,
})
}
}
impl TryFrom<Vec<Value>> for SimpleNumber {
type Error = EvalError;
fn try_from(state: Vec<Value>) -> Result<Self, Self::Error> {
ensure!(
state.len() == 2,
InternalSnafu {
reason: "Number Accumulator state should have 2 values",
}
);
let mut iter = state.into_iter();
Self::try_from_iter(&mut iter)
}
}
impl Accumulator for SimpleNumber {
fn into_state(self) -> Vec<Value> {
vec![
Value::Decimal128(Decimal128::new(self.accum, 38, 0)),
self.non_nulls.into(),
]
}
fn update(
&mut self,
aggr_fn: &AggregateFunc,
value: Value,
diff: Diff,
) -> Result<(), EvalError> {
ensure!(
matches!(
aggr_fn,
AggregateFunc::SumInt16
| AggregateFunc::SumInt32
| AggregateFunc::SumInt64
| AggregateFunc::SumUInt16
| AggregateFunc::SumUInt32
| AggregateFunc::SumUInt64
),
InternalSnafu {
reason: format!(
"SimpleNumber Accumulator does not support this aggregation function: {:?}",
aggr_fn
),
}
);
let v = match (aggr_fn, value) {
(AggregateFunc::SumInt16, Value::Int16(x)) => i128::from(x),
(AggregateFunc::SumInt32, Value::Int32(x)) => i128::from(x),
(AggregateFunc::SumInt64, Value::Int64(x)) => i128::from(x),
(AggregateFunc::SumUInt16, Value::UInt16(x)) => i128::from(x),
(AggregateFunc::SumUInt32, Value::UInt32(x)) => i128::from(x),
(AggregateFunc::SumUInt64, Value::UInt64(x)) => i128::from(x),
(_f, Value::Null) => return Ok(()), (f, v) => {
let expected_datatype = f.signature().input;
return Err(TypeMismatchSnafu {
expected: expected_datatype[0].clone(),
actual: v.data_type(),
}
.build())?;
}
};
self.accum += v * i128::from(diff);
self.non_nulls += diff;
Ok(())
}
fn eval(&self, aggr_fn: &AggregateFunc) -> Result<Value, EvalError> {
match aggr_fn {
AggregateFunc::SumInt16 | AggregateFunc::SumInt32 | AggregateFunc::SumInt64 => {
i64::try_from(self.accum)
.map_err(|_e| OverflowSnafu {}.build())
.map(Value::from)
}
AggregateFunc::SumUInt16 | AggregateFunc::SumUInt32 | AggregateFunc::SumUInt64 => {
u64::try_from(self.accum)
.map_err(|_e| OverflowSnafu {}.build())
.map(Value::from)
}
_ => Err(InternalSnafu {
reason: format!(
"SimpleNumber Accumulator does not support this aggregation function: {:?}",
aggr_fn
),
}
.build()),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct Float {
accum: OrderedF64,
pos_infs: Diff,
neg_infs: Diff,
nans: Diff,
non_nulls: Diff,
}
impl Float {
pub fn try_from_iter<I>(iter: &mut I) -> Result<Self, EvalError>
where
I: Iterator<Item = Value>,
{
let mut ret = Self {
accum: OrderedF64::try_from(iter.next().ok_or_else(fail_accum::<Self>)?)
.map_err(err_try_from_val)?,
pos_infs: Diff::try_from(iter.next().ok_or_else(fail_accum::<Self>)?)
.map_err(err_try_from_val)?,
neg_infs: Diff::try_from(iter.next().ok_or_else(fail_accum::<Self>)?)
.map_err(err_try_from_val)?,
nans: Diff::try_from(iter.next().ok_or_else(fail_accum::<Self>)?)
.map_err(err_try_from_val)?,
non_nulls: Diff::try_from(iter.next().ok_or_else(fail_accum::<Self>)?)
.map_err(err_try_from_val)?,
};
if ret.non_nulls == 0 {
ret.accum = OrderedFloat::from(0.0);
}
Ok(ret)
}
}
impl TryFrom<Vec<Value>> for Float {
type Error = EvalError;
fn try_from(state: Vec<Value>) -> Result<Self, Self::Error> {
ensure!(
state.len() == 5,
InternalSnafu {
reason: "Float Accumulator state should have 5 values",
}
);
let mut iter = state.into_iter();
let mut ret = Self {
accum: OrderedF64::try_from(iter.next().unwrap()).map_err(err_try_from_val)?,
pos_infs: Diff::try_from(iter.next().unwrap()).map_err(err_try_from_val)?,
neg_infs: Diff::try_from(iter.next().unwrap()).map_err(err_try_from_val)?,
nans: Diff::try_from(iter.next().unwrap()).map_err(err_try_from_val)?,
non_nulls: Diff::try_from(iter.next().unwrap()).map_err(err_try_from_val)?,
};
if ret.non_nulls == 0 {
ret.accum = OrderedFloat::from(0.0);
}
Ok(ret)
}
}
impl Accumulator for Float {
fn into_state(self) -> Vec<Value> {
vec![
self.accum.into(),
self.pos_infs.into(),
self.neg_infs.into(),
self.nans.into(),
self.non_nulls.into(),
]
}
fn update(
&mut self,
aggr_fn: &AggregateFunc,
value: Value,
diff: Diff,
) -> Result<(), EvalError> {
ensure!(
matches!(
aggr_fn,
AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64
),
InternalSnafu {
reason: format!(
"Float Accumulator does not support this aggregation function: {:?}",
aggr_fn
),
}
);
let x = match (aggr_fn, value) {
(AggregateFunc::SumFloat32, Value::Float32(x)) => OrderedF64::from(*x as f64),
(AggregateFunc::SumFloat64, Value::Float64(x)) => OrderedF64::from(x),
(_f, Value::Null) => return Ok(()), (f, v) => {
let expected_datatype = f.signature().input;
return Err(TypeMismatchSnafu {
expected: expected_datatype[0].clone(),
actual: v.data_type(),
}
.build())?;
}
};
if x.is_nan() {
self.nans += diff;
} else if x.is_infinite() {
if x.is_sign_positive() {
self.pos_infs += diff;
} else {
self.neg_infs += diff;
}
} else {
self.accum += *(x * OrderedF64::from(diff as f64));
}
self.non_nulls += diff;
Ok(())
}
fn eval(&self, aggr_fn: &AggregateFunc) -> Result<Value, EvalError> {
match aggr_fn {
AggregateFunc::SumFloat32 => Ok(Value::Float32(OrderedF32::from(self.accum.0 as f32))),
AggregateFunc::SumFloat64 => Ok(Value::Float64(self.accum)),
_ => Err(InternalSnafu {
reason: format!(
"Float Accumulator does not support this aggregation function: {:?}",
aggr_fn
),
}
.build()),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct OrdValue {
val: Option<Value>,
non_nulls: Diff,
}
impl OrdValue {
pub fn try_from_iter<I>(iter: &mut I) -> Result<Self, EvalError>
where
I: Iterator<Item = Value>,
{
Ok(Self {
val: {
let v = iter.next().ok_or_else(fail_accum::<Self>)?;
if v == Value::Null {
None
} else {
Some(v)
}
},
non_nulls: Diff::try_from(iter.next().ok_or_else(fail_accum::<Self>)?)
.map_err(err_try_from_val)?,
})
}
}
impl TryFrom<Vec<Value>> for OrdValue {
type Error = EvalError;
fn try_from(state: Vec<Value>) -> Result<Self, Self::Error> {
ensure!(
state.len() == 2,
InternalSnafu {
reason: "OrdValue Accumulator state should have 2 values",
}
);
let mut iter = state.into_iter();
Ok(Self {
val: {
let v = iter.next().unwrap();
if v == Value::Null {
None
} else {
Some(v)
}
},
non_nulls: Diff::try_from(iter.next().unwrap()).map_err(err_try_from_val)?,
})
}
}
impl Accumulator for OrdValue {
fn into_state(self) -> Vec<Value> {
vec![self.val.unwrap_or(Value::Null), self.non_nulls.into()]
}
fn update(
&mut self,
aggr_fn: &AggregateFunc,
value: Value,
diff: Diff,
) -> Result<(), EvalError> {
ensure!(
aggr_fn.is_max() || aggr_fn.is_min() || matches!(aggr_fn, AggregateFunc::Count),
InternalSnafu {
reason: format!(
"OrdValue Accumulator does not support this aggregation function: {:?}",
aggr_fn
),
}
);
if diff <= 0 && (aggr_fn.is_max() || aggr_fn.is_min()) {
return Err(InternalSnafu {
reason: "OrdValue Accumulator does not support non-monotonic input for min/max aggregation".to_string(),
}.build());
}
let check_type_aggr_fn_and_arg_value =
ty_eq_without_precision(value.data_type(), aggr_fn.signature().input[0].clone())
|| matches!(aggr_fn, AggregateFunc::Count)
|| value.is_null();
let check_type_aggr_fn_and_self_val = self
.val
.as_ref()
.map(|zelf| {
ty_eq_without_precision(zelf.data_type(), aggr_fn.signature().input[0].clone())
})
.unwrap_or(true)
|| matches!(aggr_fn, AggregateFunc::Count);
if !check_type_aggr_fn_and_arg_value {
return Err(TypeMismatchSnafu {
expected: aggr_fn.signature().input[0].clone(),
actual: value.data_type(),
}
.build());
} else if !check_type_aggr_fn_and_self_val {
return Err(TypeMismatchSnafu {
expected: aggr_fn.signature().input[0].clone(),
actual: self
.val
.as_ref()
.map(|v| v.data_type())
.unwrap_or(ConcreteDataType::null_datatype()),
}
.build());
}
let is_null = value.is_null();
if is_null {
return Ok(());
}
if !is_null {
self.non_nulls += diff;
match aggr_fn.signature().generic_fn {
GenericFn::Max => {
self.val = self
.val
.clone()
.map(|v| v.max(value.clone()))
.or_else(|| Some(value))
}
GenericFn::Min => {
self.val = self
.val
.clone()
.map(|v| v.min(value.clone()))
.or_else(|| Some(value))
}
GenericFn::Count => (),
_ => unreachable!("already checked by ensure!"),
}
};
Ok(())
}
fn eval(&self, aggr_fn: &AggregateFunc) -> Result<Value, EvalError> {
if aggr_fn.is_max() || aggr_fn.is_min() {
Ok(self.val.clone().unwrap_or(Value::Null))
} else if matches!(aggr_fn, AggregateFunc::Count) {
Ok(self.non_nulls.into())
} else {
Err(InternalSnafu {
reason: format!(
"OrdValue Accumulator does not support this aggregation function: {:?}",
aggr_fn
),
}
.build())
}
}
}
#[enum_dispatch(Accumulator)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum Accum {
Bool(Bool),
SimpleNumber(SimpleNumber),
Float(Float),
OrdValue(OrdValue),
}
impl Accum {
pub fn new_accum(aggr_fn: &AggregateFunc) -> Result<Self, EvalError> {
Ok(match aggr_fn {
AggregateFunc::Any
| AggregateFunc::All
| AggregateFunc::MaxBool
| AggregateFunc::MinBool => Self::from(Bool {
trues: 0,
falses: 0,
}),
AggregateFunc::SumInt16
| AggregateFunc::SumInt32
| AggregateFunc::SumInt64
| AggregateFunc::SumUInt16
| AggregateFunc::SumUInt32
| AggregateFunc::SumUInt64 => Self::from(SimpleNumber {
accum: 0,
non_nulls: 0,
}),
AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => Self::from(Float {
accum: OrderedF64::from(0.0),
pos_infs: 0,
neg_infs: 0,
nans: 0,
non_nulls: 0,
}),
f if f.is_max() || f.is_min() || matches!(f, AggregateFunc::Count) => {
Self::from(OrdValue {
val: None,
non_nulls: 0,
})
}
f => {
return Err(InternalSnafu {
reason: format!(
"Accumulator does not support this aggregation function: {:?}",
f
),
}
.build());
}
})
}
pub fn try_from_iter(
aggr_fn: &AggregateFunc,
iter: &mut impl Iterator<Item = Value>,
) -> Result<Self, EvalError> {
match aggr_fn {
AggregateFunc::Any
| AggregateFunc::All
| AggregateFunc::MaxBool
| AggregateFunc::MinBool => Ok(Self::from(Bool::try_from_iter(iter)?)),
AggregateFunc::SumInt16
| AggregateFunc::SumInt32
| AggregateFunc::SumInt64
| AggregateFunc::SumUInt16
| AggregateFunc::SumUInt32
| AggregateFunc::SumUInt64 => Ok(Self::from(SimpleNumber::try_from_iter(iter)?)),
AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => {
Ok(Self::from(Float::try_from_iter(iter)?))
}
f if f.is_max() || f.is_min() || matches!(f, AggregateFunc::Count) => {
Ok(Self::from(OrdValue::try_from_iter(iter)?))
}
f => Err(InternalSnafu {
reason: format!(
"Accumulator does not support this aggregation function: {:?}",
f
),
}
.build()),
}
}
pub fn try_into_accum(aggr_fn: &AggregateFunc, state: Vec<Value>) -> Result<Self, EvalError> {
match aggr_fn {
AggregateFunc::Any
| AggregateFunc::All
| AggregateFunc::MaxBool
| AggregateFunc::MinBool => Ok(Self::from(Bool::try_from(state)?)),
AggregateFunc::SumInt16
| AggregateFunc::SumInt32
| AggregateFunc::SumInt64
| AggregateFunc::SumUInt16
| AggregateFunc::SumUInt32
| AggregateFunc::SumUInt64 => Ok(Self::from(SimpleNumber::try_from(state)?)),
AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => {
Ok(Self::from(Float::try_from(state)?))
}
f if f.is_max() || f.is_min() || matches!(f, AggregateFunc::Count) => {
Ok(Self::from(OrdValue::try_from(state)?))
}
f => Err(InternalSnafu {
reason: format!(
"Accumulator does not support this aggregation function: {:?}",
f
),
}
.build()),
}
}
}
fn fail_accum<T>() -> EvalError {
InternalSnafu {
reason: format!(
"list of values exhausted before a accum of type {} can be build from it",
type_name::<T>()
),
}
.build()
}
fn err_try_from_val<T: Display>(reason: T) -> EvalError {
TryFromValueSnafu {
msg: reason.to_string(),
}
.build()
}
fn ty_eq_without_precision(left: ConcreteDataType, right: ConcreteDataType) -> bool {
left == right
|| matches!(left, ConcreteDataType::Timestamp(..))
&& matches!(right, ConcreteDataType::Timestamp(..))
|| matches!(left, ConcreteDataType::Time(..)) && matches!(right, ConcreteDataType::Time(..))
|| matches!(left, ConcreteDataType::Duration(..))
&& matches!(right, ConcreteDataType::Duration(..))
|| matches!(left, ConcreteDataType::Interval(..))
&& matches!(right, ConcreteDataType::Interval(..))
}
#[allow(clippy::too_many_lines)]
#[cfg(test)]
mod test {
use common_time::Timestamp;
use super::*;
#[test]
fn test_accum() {
let testcases = vec![
(
AggregateFunc::SumInt32,
vec![(Value::Int32(1), 1), (Value::Null, 1)],
(
Value::Int64(1),
vec![Value::Decimal128(Decimal128::new(1, 38, 0)), 1i64.into()],
),
),
(
AggregateFunc::SumFloat32,
vec![(Value::Float32(OrderedF32::from(1.0)), 1), (Value::Null, 1)],
(
Value::Float32(OrderedF32::from(1.0)),
vec![
Value::Float64(OrderedF64::from(1.0)),
0i64.into(),
0i64.into(),
0i64.into(),
1i64.into(),
],
),
),
(
AggregateFunc::MaxInt32,
vec![(Value::Int32(1), 1), (Value::Int32(2), 1), (Value::Null, 1)],
(Value::Int32(2), vec![Value::Int32(2), 2i64.into()]),
),
(
AggregateFunc::MinInt32,
vec![(Value::Int32(2), 1), (Value::Int32(1), 1), (Value::Null, 1)],
(Value::Int32(1), vec![Value::Int32(1), 2i64.into()]),
),
(
AggregateFunc::MaxFloat32,
vec![
(Value::Float32(OrderedF32::from(1.0)), 1),
(Value::Float32(OrderedF32::from(2.0)), 1),
(Value::Null, 1),
],
(
Value::Float32(OrderedF32::from(2.0)),
vec![Value::Float32(OrderedF32::from(2.0)), 2i64.into()],
),
),
(
AggregateFunc::MaxDateTime,
vec![
(Value::Timestamp(Timestamp::from(0)), 1),
(Value::Timestamp(Timestamp::from(1)), 1),
(Value::Null, 1),
],
(
Value::Timestamp(Timestamp::from(1)),
vec![Value::Timestamp(Timestamp::from(1)), 2i64.into()],
),
),
(
AggregateFunc::Count,
vec![
(Value::Int32(1), 1),
(Value::Int32(2), 1),
(Value::Null, 1),
(Value::Null, 1),
],
(2i64.into(), vec![Value::Null, 2i64.into()]),
),
(
AggregateFunc::Any,
vec![
(Value::Boolean(false), 1),
(Value::Boolean(false), 1),
(Value::Boolean(true), 1),
(Value::Null, 1),
],
(
Value::Boolean(true),
vec![Value::from(1i64), Value::from(2i64)],
),
),
(
AggregateFunc::All,
vec![
(Value::Boolean(false), 1),
(Value::Boolean(false), 1),
(Value::Boolean(true), 1),
(Value::Null, 1),
],
(
Value::Boolean(false),
vec![Value::from(1i64), Value::from(2i64)],
),
),
(
AggregateFunc::MaxBool,
vec![
(Value::Boolean(false), 1),
(Value::Boolean(false), 1),
(Value::Boolean(true), 1),
(Value::Null, 1),
],
(
Value::Boolean(true),
vec![Value::from(1i64), Value::from(2i64)],
),
),
(
AggregateFunc::MinBool,
vec![
(Value::Boolean(false), 1),
(Value::Boolean(false), 1),
(Value::Boolean(true), 1),
(Value::Null, 1),
],
(
Value::Boolean(false),
vec![Value::from(1i64), Value::from(2i64)],
),
),
];
for (aggr_fn, input, (eval_res, state)) in testcases {
let create_and_insert = || -> Result<Accum, EvalError> {
let mut acc = Accum::new_accum(&aggr_fn)?;
acc.update_batch(&aggr_fn, input.clone())?;
let row = acc.into_state();
let acc = Accum::try_into_accum(&aggr_fn, row.clone())?;
let alter_acc = Accum::try_from_iter(&aggr_fn, &mut row.into_iter())?;
assert_eq!(acc, alter_acc);
Ok(acc)
};
let acc = match create_and_insert() {
Ok(acc) => acc,
Err(err) => panic!(
"Failed to create accum for {:?} with input {:?} with error: {:?}",
aggr_fn, input, err
),
};
if acc.eval(&aggr_fn).unwrap() != eval_res {
panic!(
"Failed to eval accum for {:?} with input {:?}, expect {:?}, got {:?}",
aggr_fn,
input,
eval_res,
acc.eval(&aggr_fn).unwrap()
);
}
let actual_state = acc.into_state();
if actual_state != state {
panic!(
"Failed to cast into state from accum for {:?} with input {:?}, expect state {:?}, got state {:?}",
aggr_fn,
input,
state,
actual_state
);
}
}
}
#[test]
fn test_fail_path_accum() {
{
let bool_accum = Bool::try_from(vec![Value::Null]);
assert!(matches!(bool_accum, Err(EvalError::Internal { .. })));
}
{
let mut bool_accum = Bool::try_from(vec![1i64.into(), 1i64.into()]).unwrap();
let bool_accum_serde = serde_json::to_string(&bool_accum).unwrap();
let bool_accum_de = serde_json::from_str::<Bool>(&bool_accum_serde).unwrap();
assert_eq!(bool_accum, bool_accum_de);
assert!(matches!(
bool_accum.update(&AggregateFunc::MaxDate, 1.into(), 1),
Err(EvalError::Internal { .. })
));
assert!(matches!(
bool_accum.update(&AggregateFunc::Any, 1.into(), 1),
Err(EvalError::TypeMismatch { .. })
));
assert!(matches!(
bool_accum.eval(&AggregateFunc::MaxDate),
Err(EvalError::Internal { .. })
));
}
{
let ret = SimpleNumber::try_from(vec![Value::Null]);
assert!(matches!(ret, Err(EvalError::Internal { .. })));
let mut accum =
SimpleNumber::try_from(vec![Decimal128::new(0, 38, 0).into(), 0i64.into()])
.unwrap();
assert!(matches!(
accum.update(&AggregateFunc::All, 0.into(), 1),
Err(EvalError::Internal { .. })
));
assert!(matches!(
accum.update(&AggregateFunc::SumInt64, 0i32.into(), 1),
Err(EvalError::TypeMismatch { .. })
));
assert!(matches!(
accum.eval(&AggregateFunc::All),
Err(EvalError::Internal { .. })
));
accum
.update(&AggregateFunc::SumInt64, 1i64.into(), 1)
.unwrap();
accum
.update(&AggregateFunc::SumInt64, i64::MAX.into(), 1)
.unwrap();
assert!(matches!(
accum.eval(&AggregateFunc::SumInt64),
Err(EvalError::Overflow { .. })
));
}
{
let ret = Float::try_from(vec![2f64.into(), 0i64.into(), 0i64.into(), 0i64.into()]);
assert!(matches!(ret, Err(EvalError::Internal { .. })));
let mut accum = Float::try_from(vec![
2f64.into(),
0i64.into(),
0i64.into(),
0i64.into(),
1i64.into(),
])
.unwrap();
accum
.update(&AggregateFunc::SumFloat64, 2f64.into(), -1)
.unwrap();
assert!(matches!(
accum.update(&AggregateFunc::All, 0.into(), 1),
Err(EvalError::Internal { .. })
));
assert!(matches!(
accum.update(&AggregateFunc::SumFloat64, 0.0f32.into(), 1),
Err(EvalError::TypeMismatch { .. })
));
assert_eq!(
accum.eval(&AggregateFunc::SumFloat64).unwrap(),
0.0f64.into()
);
assert!(matches!(
accum.eval(&AggregateFunc::All),
Err(EvalError::Internal { .. })
));
accum
.update(&AggregateFunc::SumFloat64, f64::INFINITY.into(), 1)
.unwrap();
accum
.update(&AggregateFunc::SumFloat64, (-f64::INFINITY).into(), 1)
.unwrap();
accum
.update(&AggregateFunc::SumFloat64, f64::NAN.into(), 1)
.unwrap();
}
{
let ret = OrdValue::try_from(vec![Value::Null]);
assert!(matches!(ret, Err(EvalError::Internal { .. })));
let mut accum = OrdValue::try_from(vec![Value::Null, 0i64.into()]).unwrap();
assert!(matches!(
accum.update(&AggregateFunc::All, 0.into(), 1),
Err(EvalError::Internal { .. })
));
accum
.update(&AggregateFunc::MaxInt16, 1i16.into(), 1)
.unwrap();
assert!(matches!(
accum.update(&AggregateFunc::MaxInt16, 0i32.into(), 1),
Err(EvalError::TypeMismatch { .. })
));
assert!(matches!(
accum.update(&AggregateFunc::MaxInt16, 0i16.into(), -1),
Err(EvalError::Internal { .. })
));
accum
.update(&AggregateFunc::MaxInt16, Value::Null, 1)
.unwrap();
}
{
let mut accum = OrdValue::try_from(vec![Value::Null, 0i64.into()]).unwrap();
assert!(matches!(
accum.update(&AggregateFunc::MaxInt64, 0u64.into(), 1),
Err(EvalError::TypeMismatch { .. })
));
}
}
}