flow/expr/
func.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module contains the definition of functions that can be used in expressions.
16
17use std::collections::HashMap;
18use std::sync::{Arc, OnceLock};
19use std::time::Duration;
20
21use arrow::array::{ArrayRef, BooleanArray};
22use common_error::ext::BoxedError;
23use common_time::timestamp::TimeUnit;
24use common_time::Timestamp;
25use datafusion_expr::Operator;
26use datatypes::data_type::ConcreteDataType;
27use datatypes::prelude::DataType;
28use datatypes::types::cast;
29use datatypes::value::Value;
30use datatypes::vectors::{BooleanVector, Helper, TimestampMillisecondVector, VectorRef};
31use serde::{Deserialize, Serialize};
32use smallvec::smallvec;
33use snafu::{ensure, OptionExt, ResultExt};
34use strum::{EnumIter, IntoEnumIterator};
35use substrait::df_logical_plan::consumer::name_to_op;
36
37use crate::error::{Error, ExternalSnafu, InvalidQuerySnafu, PlanSnafu, UnexpectedSnafu};
38use crate::expr::error::{
39    ArrowSnafu, CastValueSnafu, DataTypeSnafu, DivisionByZeroSnafu, EvalError, OverflowSnafu,
40    TryFromValueSnafu, TypeMismatchSnafu,
41};
42use crate::expr::signature::{GenericFn, Signature};
43use crate::expr::{Batch, InvalidArgumentSnafu, ScalarExpr, TypedExpr, TUMBLE_END, TUMBLE_START};
44use crate::repr::{self, value_to_internal_ts};
45
46/// UnmaterializableFunc is a function that can't be eval independently,
47/// and require special handling
48#[derive(Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Hash)]
49pub enum UnmaterializableFunc {
50    Now,
51    CurrentSchema,
52    TumbleWindow {
53        ts: Box<TypedExpr>,
54        window_size: Duration,
55        start_time: Option<Timestamp>,
56    },
57}
58
59impl UnmaterializableFunc {
60    /// Return the signature of the function
61    pub fn signature(&self) -> Signature {
62        match self {
63            Self::Now => Signature {
64                input: smallvec![],
65                // TODO(yingwen): Maybe return timestamp.
66                output: ConcreteDataType::timestamp_millisecond_datatype(),
67                generic_fn: GenericFn::Now,
68            },
69            Self::CurrentSchema => Signature {
70                input: smallvec![],
71                output: ConcreteDataType::string_datatype(),
72                generic_fn: GenericFn::CurrentSchema,
73            },
74            Self::TumbleWindow { .. } => Signature {
75                input: smallvec![ConcreteDataType::timestamp_millisecond_datatype()],
76                output: ConcreteDataType::timestamp_millisecond_datatype(),
77                generic_fn: GenericFn::TumbleWindow,
78            },
79        }
80    }
81
82    pub fn is_valid_func_name(name: &str) -> bool {
83        matches!(
84            name.to_lowercase().as_str(),
85            "now" | "current_schema" | "tumble"
86        )
87    }
88
89    /// Create a UnmaterializableFunc from a string of the function name
90    pub fn from_str_args(name: &str, _args: Vec<TypedExpr>) -> Result<Self, Error> {
91        match name.to_lowercase().as_str() {
92            "now" => Ok(Self::Now),
93            "current_schema" => Ok(Self::CurrentSchema),
94            _ => InvalidQuerySnafu {
95                reason: format!("Unknown unmaterializable function: {}", name),
96            }
97            .fail(),
98        }
99    }
100}
101
102/// UnaryFunc is a function that takes one argument. Also notice this enum doesn't contain function arguments,
103/// because the arguments are stored in the expression. (except `cast` function, which requires a type argument)
104#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
105pub enum UnaryFunc {
106    Not,
107    IsNull,
108    IsTrue,
109    IsFalse,
110    StepTimestamp,
111    Cast(ConcreteDataType),
112    TumbleWindowFloor {
113        window_size: Duration,
114        start_time: Option<Timestamp>,
115    },
116    TumbleWindowCeiling {
117        window_size: Duration,
118        start_time: Option<Timestamp>,
119    },
120}
121
122impl UnaryFunc {
123    /// Return the signature of the function
124    pub fn signature(&self) -> Signature {
125        match self {
126            Self::IsNull => Signature {
127                input: smallvec![ConcreteDataType::null_datatype()],
128                output: ConcreteDataType::boolean_datatype(),
129                generic_fn: GenericFn::IsNull,
130            },
131            Self::Not | Self::IsTrue | Self::IsFalse => Signature {
132                input: smallvec![ConcreteDataType::boolean_datatype()],
133                output: ConcreteDataType::boolean_datatype(),
134                generic_fn: match self {
135                    Self::Not => GenericFn::Not,
136                    Self::IsTrue => GenericFn::IsTrue,
137                    Self::IsFalse => GenericFn::IsFalse,
138                    _ => unreachable!(),
139                },
140            },
141            Self::StepTimestamp => Signature {
142                input: smallvec![ConcreteDataType::timestamp_millisecond_datatype()],
143                output: ConcreteDataType::timestamp_millisecond_datatype(),
144                generic_fn: GenericFn::StepTimestamp,
145            },
146            Self::Cast(to) => Signature {
147                input: smallvec![ConcreteDataType::null_datatype()],
148                output: to.clone(),
149                generic_fn: GenericFn::Cast,
150            },
151            Self::TumbleWindowFloor { .. } => Signature {
152                input: smallvec![ConcreteDataType::timestamp_millisecond_datatype()],
153                output: ConcreteDataType::timestamp_millisecond_datatype(),
154                generic_fn: GenericFn::TumbleWindow,
155            },
156            Self::TumbleWindowCeiling { .. } => Signature {
157                input: smallvec![ConcreteDataType::timestamp_millisecond_datatype()],
158                output: ConcreteDataType::timestamp_millisecond_datatype(),
159                generic_fn: GenericFn::TumbleWindow,
160            },
161        }
162    }
163
164    pub fn is_valid_func_name(name: &str) -> bool {
165        matches!(
166            name.to_lowercase().as_str(),
167            "not" | "is_null" | "is_true" | "is_false" | "step_timestamp" | "cast"
168        )
169    }
170
171    /// Create a UnaryFunc from a string of the function name and given argument type(optional)
172    pub fn from_str_and_type(
173        name: &str,
174        arg_type: Option<ConcreteDataType>,
175    ) -> Result<Self, Error> {
176        match name {
177            "not" => Ok(Self::Not),
178            "is_null" => Ok(Self::IsNull),
179            "is_true" => Ok(Self::IsTrue),
180            "is_false" => Ok(Self::IsFalse),
181            "step_timestamp" => Ok(Self::StepTimestamp),
182            "cast" => {
183                let arg_type = arg_type.with_context(|| InvalidQuerySnafu {
184                    reason: "cast function requires a type argument".to_string(),
185                })?;
186                Ok(UnaryFunc::Cast(arg_type))
187            }
188            _ => InvalidQuerySnafu {
189                reason: format!("Unknown unary function: {}", name),
190            }
191            .fail(),
192        }
193    }
194
195    pub fn eval_batch(&self, batch: &Batch, expr: &ScalarExpr) -> Result<VectorRef, EvalError> {
196        let arg_col = expr.eval_batch(batch)?;
197        match self {
198            Self::Not => {
199                let arrow_array = arg_col.to_arrow_array();
200                let bool_array = arrow_array
201                    .as_any()
202                    .downcast_ref::<BooleanArray>()
203                    .context({
204                        TypeMismatchSnafu {
205                            expected: ConcreteDataType::boolean_datatype(),
206                            actual: arg_col.data_type(),
207                        }
208                    })?;
209                let ret = arrow::compute::not(bool_array).context(ArrowSnafu { context: "not" })?;
210                let ret = BooleanVector::from(ret);
211                Ok(Arc::new(ret))
212            }
213            Self::IsNull => {
214                let arrow_array = arg_col.to_arrow_array();
215                let ret = arrow::compute::is_null(&arrow_array)
216                    .context(ArrowSnafu { context: "is_null" })?;
217                let ret = BooleanVector::from(ret);
218                Ok(Arc::new(ret))
219            }
220            Self::IsTrue | Self::IsFalse => {
221                let arrow_array = arg_col.to_arrow_array();
222                let bool_array = arrow_array
223                    .as_any()
224                    .downcast_ref::<BooleanArray>()
225                    .context({
226                        TypeMismatchSnafu {
227                            expected: ConcreteDataType::boolean_datatype(),
228                            actual: arg_col.data_type(),
229                        }
230                    })?;
231
232                if matches!(self, Self::IsTrue) {
233                    Ok(Arc::new(BooleanVector::from(bool_array.clone())))
234                } else {
235                    let ret =
236                        arrow::compute::not(bool_array).context(ArrowSnafu { context: "not" })?;
237                    Ok(Arc::new(BooleanVector::from(ret)))
238                }
239            }
240            Self::StepTimestamp => {
241                let timestamp_array = get_timestamp_array(&arg_col)?;
242                let timestamp_array_ref = timestamp_array
243                    .as_any()
244                    .downcast_ref::<arrow::array::TimestampMillisecondArray>()
245                    .context({
246                        TypeMismatchSnafu {
247                            expected: ConcreteDataType::boolean_datatype(),
248                            actual: ConcreteDataType::from_arrow_type(timestamp_array.data_type()),
249                        }
250                    })?;
251
252                let ret = arrow::compute::unary(timestamp_array_ref, |arr| arr + 1);
253                let ret = TimestampMillisecondVector::from(ret);
254                Ok(Arc::new(ret))
255            }
256            Self::Cast(to) => {
257                let arrow_array = arg_col.to_arrow_array();
258                let ret = arrow::compute::cast(&arrow_array, &to.as_arrow_type())
259                    .context(ArrowSnafu { context: "cast" })?;
260                let vector = Helper::try_into_vector(ret).context(DataTypeSnafu {
261                    msg: "Fail to convert to Vector",
262                })?;
263                Ok(vector)
264            }
265            Self::TumbleWindowFloor {
266                window_size,
267                start_time,
268            } => {
269                let timestamp_array = get_timestamp_array(&arg_col)?;
270                let date_array_ref = timestamp_array
271                    .as_any()
272                    .downcast_ref::<arrow::array::TimestampMillisecondArray>()
273                    .context({
274                        TypeMismatchSnafu {
275                            expected: ConcreteDataType::boolean_datatype(),
276                            actual: ConcreteDataType::from_arrow_type(timestamp_array.data_type()),
277                        }
278                    })?;
279
280                let start_time = start_time.map(|t| t.value());
281                let window_size = window_size.as_millis() as repr::Duration;
282
283                let ret = arrow::compute::unary(date_array_ref, |ts| {
284                    get_window_start(ts, window_size, start_time)
285                });
286
287                let ret = TimestampMillisecondVector::from(ret);
288                Ok(Arc::new(ret))
289            }
290            Self::TumbleWindowCeiling {
291                window_size,
292                start_time,
293            } => {
294                let timestamp_array = get_timestamp_array(&arg_col)?;
295                let date_array_ref = timestamp_array
296                    .as_any()
297                    .downcast_ref::<arrow::array::TimestampMillisecondArray>()
298                    .context({
299                        TypeMismatchSnafu {
300                            expected: ConcreteDataType::boolean_datatype(),
301                            actual: ConcreteDataType::from_arrow_type(timestamp_array.data_type()),
302                        }
303                    })?;
304
305                let start_time = start_time.map(|t| t.value());
306                let window_size = window_size.as_millis() as repr::Duration;
307
308                let ret = arrow::compute::unary(date_array_ref, |ts| {
309                    get_window_start(ts, window_size, start_time) + window_size
310                });
311
312                let ret = TimestampMillisecondVector::from(ret);
313                Ok(Arc::new(ret))
314            }
315        }
316    }
317
318    pub fn from_tumble_func(name: &str, args: &[TypedExpr]) -> Result<(Self, TypedExpr), Error> {
319        match name.to_lowercase().as_str() {
320            TUMBLE_START | TUMBLE_END => {
321                let ts = args.first().context(InvalidQuerySnafu {
322                    reason: "Tumble window function requires a timestamp argument",
323                })?;
324                let window_size = {
325                    let window_size_untyped = args
326                        .get(1)
327                        .and_then(|expr| expr.expr.as_literal())
328                        .context(InvalidQuerySnafu {
329                        reason: "Tumble window function requires a window size argument",
330                    })?;
331                    if let Some(window_size) = window_size_untyped.as_string() {
332                        // cast as interval
333                        let interval = cast(
334                            Value::from(window_size),
335                            &ConcreteDataType::interval_day_time_datatype(),
336                        )
337                        .map_err(BoxedError::new)
338                        .context(ExternalSnafu)?
339                        .as_interval_day_time()
340                        .context(UnexpectedSnafu {
341                            reason: "Expect window size arg to be interval after successful cast"
342                                .to_string(),
343                        })?;
344                        Duration::from_millis(interval.as_millis() as u64)
345                    } else if let Some(interval) = window_size_untyped.as_interval_day_time() {
346                        Duration::from_millis(interval.as_millis() as u64)
347                    } else {
348                        InvalidQuerySnafu {
349                                reason: format!(
350                                    "Tumble window function requires window size argument to be either a interval or a string describe a interval, found {:?}",
351                                    window_size_untyped
352                                )
353                            }.fail()?
354                    }
355                };
356
357                // start time argument is optional
358                let start_time = match args.get(2) {
359                    Some(start_time) => {
360                        if let Some(value) = start_time.expr.as_literal() {
361                            // cast as timestamp
362                            let ret = cast(
363                                value,
364                                &ConcreteDataType::timestamp_millisecond_datatype(),
365                            )
366                            .map_err(BoxedError::new)
367                            .context(ExternalSnafu)?
368                            .as_timestamp()
369                            .context(UnexpectedSnafu {
370                                reason:
371                                    "Expect start time arg to be timestamp after successful cast"
372                                        .to_string(),
373                            })?;
374                            Some(ret)
375                        } else {
376                            UnexpectedSnafu {
377                                reason: "Expect start time arg to be literal",
378                            }
379                            .fail()?
380                        }
381                    }
382                    None => None,
383                };
384
385                if name == TUMBLE_START {
386                    Ok((
387                        Self::TumbleWindowFloor {
388                            window_size,
389                            start_time,
390                        },
391                        ts.clone(),
392                    ))
393                } else if name == TUMBLE_END {
394                    Ok((
395                        Self::TumbleWindowCeiling {
396                            window_size,
397                            start_time,
398                        },
399                        ts.clone(),
400                    ))
401                } else {
402                    unreachable!()
403                }
404            }
405            _ => crate::error::InternalSnafu {
406                reason: format!("Unknown tumble kind function: {}", name),
407            }
408            .fail()?,
409        }
410    }
411
412    /// Evaluate the function with given values and expression
413    ///
414    /// # Arguments
415    ///
416    /// - `values`: The values to be used in the evaluation
417    ///
418    /// - `expr`: The expression to be evaluated and use as argument, will extract the value from the `values` and evaluate the expression
419    pub fn eval(&self, values: &[Value], expr: &ScalarExpr) -> Result<Value, EvalError> {
420        let arg = expr.eval(values)?;
421        match self {
422            Self::Not => {
423                let bool = if let Value::Boolean(bool) = arg {
424                    Ok(bool)
425                } else {
426                    TypeMismatchSnafu {
427                        expected: ConcreteDataType::boolean_datatype(),
428                        actual: arg.data_type(),
429                    }
430                    .fail()?
431                }?;
432                Ok(Value::from(!bool))
433            }
434            Self::IsNull => Ok(Value::from(arg.is_null())),
435            Self::IsTrue | Self::IsFalse => {
436                let bool = if let Value::Boolean(bool) = arg {
437                    Ok(bool)
438                } else {
439                    TypeMismatchSnafu {
440                        expected: ConcreteDataType::boolean_datatype(),
441                        actual: arg.data_type(),
442                    }
443                    .fail()?
444                }?;
445                if matches!(self, Self::IsTrue) {
446                    Ok(Value::from(bool))
447                } else {
448                    Ok(Value::from(!bool))
449                }
450            }
451            Self::StepTimestamp => {
452                let ty = arg.data_type();
453                if let Value::Timestamp(timestamp) = arg {
454                    let timestamp = Timestamp::new_millisecond(timestamp.value() + 1);
455                    Ok(Value::from(timestamp))
456                } else if let Ok(v) = value_to_internal_ts(arg) {
457                    let timestamp = Timestamp::new_millisecond(v + 1);
458                    Ok(Value::from(timestamp))
459                } else {
460                    TypeMismatchSnafu {
461                        expected: ConcreteDataType::timestamp_millisecond_datatype(),
462                        actual: ty,
463                    }
464                    .fail()?
465                }
466            }
467            Self::Cast(to) => {
468                let arg_ty = arg.data_type();
469                cast(arg, to).context({
470                    CastValueSnafu {
471                        from: arg_ty,
472                        to: to.clone(),
473                    }
474                })
475            }
476            Self::TumbleWindowFloor {
477                window_size,
478                start_time,
479            } => {
480                let ts = get_ts_as_millisecond(arg)?;
481                let start_time = start_time.map(|t| t.value());
482                let window_size = window_size.as_millis() as repr::Duration;
483                let window_start = get_window_start(ts, window_size, start_time);
484
485                let ret = Timestamp::new_millisecond(window_start);
486                Ok(Value::from(ret))
487            }
488            Self::TumbleWindowCeiling {
489                window_size,
490                start_time,
491            } => {
492                let ts = get_ts_as_millisecond(arg)?;
493                let start_time = start_time.map(|t| t.value());
494                let window_size = window_size.as_millis() as repr::Duration;
495                let window_start = get_window_start(ts, window_size, start_time);
496
497                let window_end = window_start + window_size;
498                let ret = Timestamp::new_millisecond(window_end);
499                Ok(Value::from(ret))
500            }
501        }
502    }
503}
504
505fn get_timestamp_array(vector: &VectorRef) -> Result<arrow::array::ArrayRef, EvalError> {
506    let arrow_array = vector.to_arrow_array();
507    let timestamp_array = if *arrow_array.data_type()
508        == ConcreteDataType::timestamp_millisecond_datatype().as_arrow_type()
509    {
510        arrow_array
511    } else {
512        arrow::compute::cast(
513            &arrow_array,
514            &ConcreteDataType::timestamp_millisecond_datatype().as_arrow_type(),
515        )
516        .context(ArrowSnafu {
517            context: "Trying to cast to timestamp in StepTimestamp",
518        })?
519    };
520    Ok(timestamp_array)
521}
522
523fn get_window_start(
524    ts: repr::Timestamp,
525    window_size: repr::Duration,
526    start_time: Option<repr::Timestamp>,
527) -> repr::Timestamp {
528    let start_time = start_time.unwrap_or(0);
529    // left close right open
530    if ts >= start_time {
531        start_time + (ts - start_time) / window_size * window_size
532    } else {
533        start_time + (ts - start_time) / window_size * window_size
534            - if ((start_time - ts) % window_size) != 0 {
535                window_size
536            } else {
537                0
538            }
539    }
540}
541
542#[test]
543fn test_get_window_start() {
544    assert_eq!(get_window_start(1, 3, None), 0);
545    assert_eq!(get_window_start(3, 3, None), 3);
546    assert_eq!(get_window_start(0, 3, None), 0);
547
548    assert_eq!(get_window_start(-1, 3, None), -3);
549    assert_eq!(get_window_start(-3, 3, None), -3);
550}
551
552fn get_ts_as_millisecond(arg: Value) -> Result<repr::Timestamp, EvalError> {
553    let ts = if let Some(ts) = arg.as_timestamp() {
554        ts.convert_to(TimeUnit::Millisecond)
555            .context(OverflowSnafu)?
556            .value()
557    } else {
558        InvalidArgumentSnafu {
559            reason: "Expect input to be timestamp or datetime type",
560        }
561        .fail()?
562    };
563    Ok(ts)
564}
565
566/// BinaryFunc is a function that takes two arguments.
567/// Also notice this enum doesn't contain function arguments, since the arguments are stored in the expression.
568///
569/// TODO(discord9): support more binary functions for more types
570#[derive(
571    Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash, EnumIter,
572)]
573pub enum BinaryFunc {
574    Eq,
575    NotEq,
576    Lt,
577    Lte,
578    Gt,
579    Gte,
580    AddInt16,
581    AddInt32,
582    AddInt64,
583    AddUInt16,
584    AddUInt32,
585    AddUInt64,
586    AddFloat32,
587    AddFloat64,
588    SubInt16,
589    SubInt32,
590    SubInt64,
591    SubUInt16,
592    SubUInt32,
593    SubUInt64,
594    SubFloat32,
595    SubFloat64,
596    MulInt16,
597    MulInt32,
598    MulInt64,
599    MulUInt16,
600    MulUInt32,
601    MulUInt64,
602    MulFloat32,
603    MulFloat64,
604    DivInt16,
605    DivInt32,
606    DivInt64,
607    DivUInt16,
608    DivUInt32,
609    DivUInt64,
610    DivFloat32,
611    DivFloat64,
612    ModInt16,
613    ModInt32,
614    ModInt64,
615    ModUInt16,
616    ModUInt32,
617    ModUInt64,
618}
619
620/// Generate binary function signature based on the function and the input types
621/// The user can provide custom signature for some functions in the form of a regular match arm,
622/// and the rest will be generated according to the provided list of functions like this:
623/// ```ignore
624/// AddInt16=>(int16_datatype,Add),
625/// ```
626/// which expand to:
627/// ```ignore, rust
628/// Self::AddInt16 => Signature {
629///    input: smallvec![
630///       ConcreteDataType::int16_datatype(),
631///      ConcreteDataType::int16_datatype(),
632///    ],
633///    output: ConcreteDataType::int16_datatype(),
634///    generic_fn: GenericFn::Add,
635/// },
636/// ````
637macro_rules! generate_binary_signature {
638    ($value:ident, { $($user_arm:tt)* },
639    [ $(
640        $auto_arm:ident=>($con_type:ident,$generic:ident)
641        ),*
642    ]) => {
643        match $value {
644            $($user_arm)*,
645            $(
646                Self::$auto_arm => Signature {
647                    input: smallvec![
648                        ConcreteDataType::$con_type(),
649                        ConcreteDataType::$con_type(),
650                    ],
651                    output: ConcreteDataType::$con_type(),
652                    generic_fn: GenericFn::$generic,
653                },
654            )*
655        }
656    };
657}
658
659static SPECIALIZATION: OnceLock<HashMap<(GenericFn, ConcreteDataType), BinaryFunc>> =
660    OnceLock::new();
661
662impl BinaryFunc {
663    /// Use null type to ref to any type
664    pub fn signature(&self) -> Signature {
665        generate_binary_signature!(self, {
666                Self::Eq | Self::NotEq | Self::Lt | Self::Lte | Self::Gt | Self::Gte => Signature {
667                    input: smallvec![
668                        ConcreteDataType::null_datatype(),
669                        ConcreteDataType::null_datatype()
670                    ],
671                    output: ConcreteDataType::boolean_datatype(),
672                    generic_fn: match self {
673                        Self::Eq => GenericFn::Eq,
674                        Self::NotEq => GenericFn::NotEq,
675                        Self::Lt => GenericFn::Lt,
676                        Self::Lte => GenericFn::Lte,
677                        Self::Gt => GenericFn::Gt,
678                        Self::Gte => GenericFn::Gte,
679                        _ => unreachable!(),
680                    },
681                }
682            },
683            [
684                AddInt16=>(int16_datatype,Add),
685                AddInt32=>(int32_datatype,Add),
686                AddInt64=>(int64_datatype,Add),
687                AddUInt16=>(uint16_datatype,Add),
688                AddUInt32=>(uint32_datatype,Add),
689                AddUInt64=>(uint64_datatype,Add),
690                AddFloat32=>(float32_datatype,Add),
691                AddFloat64=>(float64_datatype,Add),
692                SubInt16=>(int16_datatype,Sub),
693                SubInt32=>(int32_datatype,Sub),
694                SubInt64=>(int64_datatype,Sub),
695                SubUInt16=>(uint16_datatype,Sub),
696                SubUInt32=>(uint32_datatype,Sub),
697                SubUInt64=>(uint64_datatype,Sub),
698                SubFloat32=>(float32_datatype,Sub),
699                SubFloat64=>(float64_datatype,Sub),
700                MulInt16=>(int16_datatype,Mul),
701                MulInt32=>(int32_datatype,Mul),
702                MulInt64=>(int64_datatype,Mul),
703                MulUInt16=>(uint16_datatype,Mul),
704                MulUInt32=>(uint32_datatype,Mul),
705                MulUInt64=>(uint64_datatype,Mul),
706                MulFloat32=>(float32_datatype,Mul),
707                MulFloat64=>(float64_datatype,Mul),
708                DivInt16=>(int16_datatype,Div),
709                DivInt32=>(int32_datatype,Div),
710                DivInt64=>(int64_datatype,Div),
711                DivUInt16=>(uint16_datatype,Div),
712                DivUInt32=>(uint32_datatype,Div),
713                DivUInt64=>(uint64_datatype,Div),
714                DivFloat32=>(float32_datatype,Div),
715                DivFloat64=>(float64_datatype,Div),
716                ModInt16=>(int16_datatype,Mod),
717                ModInt32=>(int32_datatype,Mod),
718                ModInt64=>(int64_datatype,Mod),
719                ModUInt16=>(uint16_datatype,Mod),
720                ModUInt32=>(uint32_datatype,Mod),
721                ModUInt64=>(uint64_datatype,Mod)
722            ]
723        )
724    }
725
726    pub fn add(input_type: ConcreteDataType) -> Result<Self, Error> {
727        Self::specialization(GenericFn::Add, input_type)
728    }
729
730    pub fn sub(input_type: ConcreteDataType) -> Result<Self, Error> {
731        Self::specialization(GenericFn::Sub, input_type)
732    }
733
734    pub fn mul(input_type: ConcreteDataType) -> Result<Self, Error> {
735        Self::specialization(GenericFn::Mul, input_type)
736    }
737
738    pub fn div(input_type: ConcreteDataType) -> Result<Self, Error> {
739        Self::specialization(GenericFn::Div, input_type)
740    }
741
742    /// Get the specialization of the binary function based on the generic function and the input type
743    pub fn specialization(generic: GenericFn, input_type: ConcreteDataType) -> Result<Self, Error> {
744        let rule = SPECIALIZATION.get_or_init(|| {
745            let mut spec = HashMap::new();
746            for func in BinaryFunc::iter() {
747                let sig = func.signature();
748                spec.insert((sig.generic_fn, sig.input[0].clone()), func);
749            }
750            spec
751        });
752        rule.get(&(generic, input_type.clone()))
753            .cloned()
754            .with_context(|| InvalidQuerySnafu {
755                reason: format!(
756                    "No specialization found for binary function {:?} with input type {:?}",
757                    generic, input_type
758                ),
759            })
760    }
761
762    /// try it's best to infer types from the input types and expressions
763    ///
764    /// if it can't found out types, will return None
765    pub(crate) fn infer_type_from(
766        generic: GenericFn,
767        arg_exprs: &[ScalarExpr],
768        arg_types: &[Option<ConcreteDataType>],
769    ) -> Result<ConcreteDataType, Error> {
770        let ret = match (arg_types[0].as_ref(), arg_types[1].as_ref()) {
771            (Some(t1), Some(t2)) => {
772                ensure!(
773                    t1 == t2,
774                    InvalidQuerySnafu {
775                        reason: format!(
776                            "Binary function {:?} requires both arguments to have the same type, left={:?}, right={:?}",
777                            generic, t1, t2
778                        ),
779                    }
780                );
781                t1.clone()
782            }
783            (Some(t), None) | (None, Some(t)) => t.clone(),
784            _ => arg_exprs[0]
785                .as_literal()
786                .map(|lit| lit.data_type())
787                .or_else(|| arg_exprs[1].as_literal().map(|lit| lit.data_type()))
788                .with_context(|| InvalidQuerySnafu {
789                    reason: format!(
790                        "Binary function {:?} requires at least one argument with known type",
791                        generic
792                    ),
793                })?,
794        };
795        Ok(ret)
796    }
797
798    pub fn is_valid_func_name(name: &str) -> bool {
799        matches!(
800            name.to_lowercase().as_str(),
801            "eq" | "equal"
802                | "not_eq"
803                | "not_equal"
804                | "lt"
805                | "lte"
806                | "gt"
807                | "gte"
808                | "add"
809                | "sub"
810                | "subtract"
811                | "mul"
812                | "multiply"
813                | "div"
814                | "divide"
815                | "mod"
816        )
817    }
818
819    /// choose the appropriate specialization based on the input types
820    /// return a specialization of the binary function and it's actual input and output type(so no null type present)
821    ///
822    /// will try it best to extract from `arg_types` and `arg_exprs` to get the input types
823    /// if `arg_types` is not enough, it will try to extract from `arg_exprs` if `arg_exprs` is literal with known type
824    pub fn from_str_expr_and_type(
825        name: &str,
826        arg_exprs: &[ScalarExpr],
827        arg_types: &[Option<ConcreteDataType>],
828    ) -> Result<(Self, Signature), Error> {
829        // this `name_to_op` if error simply return a similar message of `unsupported function xxx` so
830        let op = name_to_op(name).with_context(|| InvalidQuerySnafu {
831            reason: format!("Unsupported binary function: {}", name),
832        })?;
833
834        // get first arg type and make sure if both is some, they are the same
835        let generic_fn = {
836            match op {
837                Operator::Eq => GenericFn::Eq,
838                Operator::NotEq => GenericFn::NotEq,
839                Operator::Lt => GenericFn::Lt,
840                Operator::LtEq => GenericFn::Lte,
841                Operator::Gt => GenericFn::Gt,
842                Operator::GtEq => GenericFn::Gte,
843                Operator::Plus => GenericFn::Add,
844                Operator::Minus => GenericFn::Sub,
845                Operator::Multiply => GenericFn::Mul,
846                Operator::Divide => GenericFn::Div,
847                Operator::Modulo => GenericFn::Mod,
848                _ => {
849                    return InvalidQuerySnafu {
850                        reason: format!("Unsupported binary function: {}", name),
851                    }
852                    .fail();
853                }
854            }
855        };
856        let need_type = matches!(
857            generic_fn,
858            GenericFn::Add | GenericFn::Sub | GenericFn::Mul | GenericFn::Div | GenericFn::Mod
859        );
860
861        ensure!(
862            arg_exprs.len() == 2 && arg_types.len() == 2,
863            PlanSnafu {
864                reason: "Binary function requires exactly 2 arguments".to_string()
865            }
866        );
867
868        let arg_type = Self::infer_type_from(generic_fn, arg_exprs, arg_types)?;
869
870        // if type is not needed, we can erase input type to null to find correct functions for
871        // functions that do not need type
872        let query_input_type = if need_type {
873            arg_type.clone()
874        } else {
875            ConcreteDataType::null_datatype()
876        };
877
878        let spec_fn = Self::specialization(generic_fn, query_input_type)?;
879
880        let signature = Signature {
881            input: smallvec![arg_type.clone(), arg_type],
882            output: spec_fn.signature().output,
883            generic_fn,
884        };
885
886        Ok((spec_fn, signature))
887    }
888
889    pub fn eval_batch(
890        &self,
891        batch: &Batch,
892        expr1: &ScalarExpr,
893        expr2: &ScalarExpr,
894    ) -> Result<VectorRef, EvalError> {
895        let left = expr1.eval_batch(batch)?;
896        let left = left.to_arrow_array();
897        let right = expr2.eval_batch(batch)?;
898        let right = right.to_arrow_array();
899
900        let arrow_array: ArrayRef = match self {
901            Self::Eq => Arc::new(
902                arrow::compute::kernels::cmp::eq(&left, &right)
903                    .context(ArrowSnafu { context: "eq" })?,
904            ),
905            Self::NotEq => Arc::new(
906                arrow::compute::kernels::cmp::neq(&left, &right)
907                    .context(ArrowSnafu { context: "neq" })?,
908            ),
909            Self::Lt => Arc::new(
910                arrow::compute::kernels::cmp::lt(&left, &right)
911                    .context(ArrowSnafu { context: "lt" })?,
912            ),
913            Self::Lte => Arc::new(
914                arrow::compute::kernels::cmp::lt_eq(&left, &right)
915                    .context(ArrowSnafu { context: "lte" })?,
916            ),
917            Self::Gt => Arc::new(
918                arrow::compute::kernels::cmp::gt(&left, &right)
919                    .context(ArrowSnafu { context: "gt" })?,
920            ),
921            Self::Gte => Arc::new(
922                arrow::compute::kernels::cmp::gt_eq(&left, &right)
923                    .context(ArrowSnafu { context: "gte" })?,
924            ),
925
926            Self::AddInt16
927            | Self::AddInt32
928            | Self::AddInt64
929            | Self::AddUInt16
930            | Self::AddUInt32
931            | Self::AddUInt64
932            | Self::AddFloat32
933            | Self::AddFloat64 => arrow::compute::kernels::numeric::add(&left, &right)
934                .context(ArrowSnafu { context: "add" })?,
935
936            Self::SubInt16
937            | Self::SubInt32
938            | Self::SubInt64
939            | Self::SubUInt16
940            | Self::SubUInt32
941            | Self::SubUInt64
942            | Self::SubFloat32
943            | Self::SubFloat64 => arrow::compute::kernels::numeric::sub(&left, &right)
944                .context(ArrowSnafu { context: "sub" })?,
945
946            Self::MulInt16
947            | Self::MulInt32
948            | Self::MulInt64
949            | Self::MulUInt16
950            | Self::MulUInt32
951            | Self::MulUInt64
952            | Self::MulFloat32
953            | Self::MulFloat64 => arrow::compute::kernels::numeric::mul(&left, &right)
954                .context(ArrowSnafu { context: "mul" })?,
955
956            Self::DivInt16
957            | Self::DivInt32
958            | Self::DivInt64
959            | Self::DivUInt16
960            | Self::DivUInt32
961            | Self::DivUInt64
962            | Self::DivFloat32
963            | Self::DivFloat64 => arrow::compute::kernels::numeric::div(&left, &right)
964                .context(ArrowSnafu { context: "div" })?,
965
966            Self::ModInt16
967            | Self::ModInt32
968            | Self::ModInt64
969            | Self::ModUInt16
970            | Self::ModUInt32
971            | Self::ModUInt64 => arrow::compute::kernels::numeric::rem(&left, &right)
972                .context(ArrowSnafu { context: "rem" })?,
973        };
974
975        let vector = Helper::try_into_vector(arrow_array).context(DataTypeSnafu {
976            msg: "Fail to convert to Vector",
977        })?;
978        Ok(vector)
979    }
980
981    /// Evaluate the function with given values and expression
982    ///
983    /// # Arguments
984    ///
985    /// - `values`: The values to be used in the evaluation
986    ///
987    /// - `expr1`: The first arg to be evaluated, will extract the value from the `values` and evaluate the expression
988    ///
989    /// - `expr2`: The second arg to be evaluated
990    pub fn eval(
991        &self,
992        values: &[Value],
993        expr1: &ScalarExpr,
994        expr2: &ScalarExpr,
995    ) -> Result<Value, EvalError> {
996        let left = expr1.eval(values)?;
997        let right = expr2.eval(values)?;
998        match self {
999            Self::Eq => Ok(Value::from(left == right)),
1000            Self::NotEq => Ok(Value::from(left != right)),
1001            Self::Lt => Ok(Value::from(left < right)),
1002            Self::Lte => Ok(Value::from(left <= right)),
1003            Self::Gt => Ok(Value::from(left > right)),
1004            Self::Gte => Ok(Value::from(left >= right)),
1005
1006            Self::AddInt16 => Ok(add::<i16>(left, right)?),
1007            Self::AddInt32 => Ok(add::<i32>(left, right)?),
1008            Self::AddInt64 => Ok(add::<i64>(left, right)?),
1009            Self::AddUInt16 => Ok(add::<u16>(left, right)?),
1010            Self::AddUInt32 => Ok(add::<u32>(left, right)?),
1011            Self::AddUInt64 => Ok(add::<u64>(left, right)?),
1012            Self::AddFloat32 => Ok(add::<f32>(left, right)?),
1013            Self::AddFloat64 => Ok(add::<f64>(left, right)?),
1014
1015            Self::SubInt16 => Ok(sub::<i16>(left, right)?),
1016            Self::SubInt32 => Ok(sub::<i32>(left, right)?),
1017            Self::SubInt64 => Ok(sub::<i64>(left, right)?),
1018            Self::SubUInt16 => Ok(sub::<u16>(left, right)?),
1019            Self::SubUInt32 => Ok(sub::<u32>(left, right)?),
1020            Self::SubUInt64 => Ok(sub::<u64>(left, right)?),
1021            Self::SubFloat32 => Ok(sub::<f32>(left, right)?),
1022            Self::SubFloat64 => Ok(sub::<f64>(left, right)?),
1023
1024            Self::MulInt16 => Ok(mul::<i16>(left, right)?),
1025            Self::MulInt32 => Ok(mul::<i32>(left, right)?),
1026            Self::MulInt64 => Ok(mul::<i64>(left, right)?),
1027            Self::MulUInt16 => Ok(mul::<u16>(left, right)?),
1028            Self::MulUInt32 => Ok(mul::<u32>(left, right)?),
1029            Self::MulUInt64 => Ok(mul::<u64>(left, right)?),
1030            Self::MulFloat32 => Ok(mul::<f32>(left, right)?),
1031            Self::MulFloat64 => Ok(mul::<f64>(left, right)?),
1032
1033            Self::DivInt16 => Ok(div::<i16>(left, right)?),
1034            Self::DivInt32 => Ok(div::<i32>(left, right)?),
1035            Self::DivInt64 => Ok(div::<i64>(left, right)?),
1036            Self::DivUInt16 => Ok(div::<u16>(left, right)?),
1037            Self::DivUInt32 => Ok(div::<u32>(left, right)?),
1038            Self::DivUInt64 => Ok(div::<u64>(left, right)?),
1039            Self::DivFloat32 => Ok(div::<f32>(left, right)?),
1040            Self::DivFloat64 => Ok(div::<f64>(left, right)?),
1041
1042            Self::ModInt16 => Ok(rem::<i16>(left, right)?),
1043            Self::ModInt32 => Ok(rem::<i32>(left, right)?),
1044            Self::ModInt64 => Ok(rem::<i64>(left, right)?),
1045            Self::ModUInt16 => Ok(rem::<u16>(left, right)?),
1046            Self::ModUInt32 => Ok(rem::<u32>(left, right)?),
1047            Self::ModUInt64 => Ok(rem::<u64>(left, right)?),
1048        }
1049    }
1050
1051    /// Reverse the comparison operator, i.e. `a < b` becomes `b > a`,
1052    /// equal and not equal are unchanged.
1053    pub fn reverse_compare(&self) -> Result<Self, Error> {
1054        let ret = match &self {
1055            BinaryFunc::Eq => BinaryFunc::Eq,
1056            BinaryFunc::NotEq => BinaryFunc::NotEq,
1057            BinaryFunc::Lt => BinaryFunc::Gt,
1058            BinaryFunc::Lte => BinaryFunc::Gte,
1059            BinaryFunc::Gt => BinaryFunc::Lt,
1060            BinaryFunc::Gte => BinaryFunc::Lte,
1061            _ => {
1062                return InvalidQuerySnafu {
1063                    reason: format!("Expect a comparison operator, found {:?}", self),
1064                }
1065                .fail();
1066            }
1067        };
1068        Ok(ret)
1069    }
1070}
1071
1072/// VariadicFunc is a function that takes a variable number of arguments.
1073#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
1074pub enum VariadicFunc {
1075    And,
1076    Or,
1077}
1078
1079impl VariadicFunc {
1080    /// Return the signature of the function
1081    pub fn signature(&self) -> Signature {
1082        Signature {
1083            input: smallvec![ConcreteDataType::boolean_datatype()],
1084            output: ConcreteDataType::boolean_datatype(),
1085            generic_fn: match self {
1086                Self::And => GenericFn::And,
1087                Self::Or => GenericFn::Or,
1088            },
1089        }
1090    }
1091
1092    pub fn is_valid_func_name(name: &str) -> bool {
1093        matches!(name.to_lowercase().as_str(), "and" | "or")
1094    }
1095
1096    /// Create a VariadicFunc from a string of the function name and given argument types(optional)
1097    pub fn from_str_and_types(
1098        name: &str,
1099        arg_types: &[Option<ConcreteDataType>],
1100    ) -> Result<Self, Error> {
1101        // TODO(discord9): future variadic funcs to be added might need to check arg_types
1102        let _ = arg_types;
1103        match name {
1104            "and" => Ok(Self::And),
1105            "or" => Ok(Self::Or),
1106            _ => InvalidQuerySnafu {
1107                reason: format!("Unknown variadic function: {}", name),
1108            }
1109            .fail(),
1110        }
1111    }
1112
1113    pub fn eval_batch(&self, batch: &Batch, exprs: &[ScalarExpr]) -> Result<VectorRef, EvalError> {
1114        ensure!(
1115            !exprs.is_empty(),
1116            InvalidArgumentSnafu {
1117                reason: format!("Variadic function {:?} requires at least 1 arguments", self)
1118            }
1119        );
1120        let args = exprs
1121            .iter()
1122            .map(|expr| expr.eval_batch(batch).map(|v| v.to_arrow_array()))
1123            .collect::<Result<Vec<_>, _>>()?;
1124        let mut iter = args.into_iter();
1125
1126        let first = iter.next().unwrap();
1127        let mut left = first
1128            .as_any()
1129            .downcast_ref::<BooleanArray>()
1130            .context({
1131                TypeMismatchSnafu {
1132                    expected: ConcreteDataType::boolean_datatype(),
1133                    actual: ConcreteDataType::from_arrow_type(first.data_type()),
1134                }
1135            })?
1136            .clone();
1137
1138        for right in iter {
1139            let right = right.as_any().downcast_ref::<BooleanArray>().context({
1140                TypeMismatchSnafu {
1141                    expected: ConcreteDataType::boolean_datatype(),
1142                    actual: ConcreteDataType::from_arrow_type(right.data_type()),
1143                }
1144            })?;
1145            left = match self {
1146                Self::And => {
1147                    arrow::compute::and(&left, right).context(ArrowSnafu { context: "and" })?
1148                }
1149                Self::Or => {
1150                    arrow::compute::or(&left, right).context(ArrowSnafu { context: "or" })?
1151                }
1152            }
1153        }
1154
1155        Ok(Arc::new(BooleanVector::from(left)))
1156    }
1157
1158    /// Evaluate the function with given values and expressions
1159    pub fn eval(&self, values: &[Value], exprs: &[ScalarExpr]) -> Result<Value, EvalError> {
1160        match self {
1161            VariadicFunc::And => and(values, exprs),
1162            VariadicFunc::Or => or(values, exprs),
1163        }
1164    }
1165}
1166
1167fn and(values: &[Value], exprs: &[ScalarExpr]) -> Result<Value, EvalError> {
1168    // If any is false, then return false. Else, if any is null, then return null. Else, return true.
1169    let mut null = false;
1170    for expr in exprs {
1171        match expr.eval(values) {
1172            Ok(Value::Boolean(true)) => {}
1173            Ok(Value::Boolean(false)) => return Ok(Value::Boolean(false)), // short-circuit
1174            Ok(Value::Null) => null = true,
1175            Err(this_err) => {
1176                return Err(this_err);
1177            } // retain first error encountered
1178            Ok(x) => InvalidArgumentSnafu {
1179                reason: format!(
1180                    "`and()` only support boolean type, found value {:?} of type {:?}",
1181                    x,
1182                    x.data_type()
1183                ),
1184            }
1185            .fail()?,
1186        }
1187    }
1188    match null {
1189        true => Ok(Value::Null),
1190        false => Ok(Value::Boolean(true)),
1191    }
1192}
1193
1194fn or(values: &[Value], exprs: &[ScalarExpr]) -> Result<Value, EvalError> {
1195    // If any is false, then return false. Else, if any is null, then return null. Else, return true.
1196    let mut null = false;
1197    for expr in exprs {
1198        match expr.eval(values) {
1199            Ok(Value::Boolean(true)) => return Ok(Value::Boolean(true)), // short-circuit
1200            Ok(Value::Boolean(false)) => {}
1201            Ok(Value::Null) => null = true,
1202            Err(this_err) => {
1203                return Err(this_err);
1204            } // retain first error encountered
1205            Ok(x) => InvalidArgumentSnafu {
1206                reason: format!(
1207                    "`or()` only support boolean type, found value {:?} of type {:?}",
1208                    x,
1209                    x.data_type()
1210                ),
1211            }
1212            .fail()?,
1213        }
1214    }
1215    match null {
1216        true => Ok(Value::Null),
1217        false => Ok(Value::Boolean(false)),
1218    }
1219}
1220
1221fn add<T>(left: Value, right: Value) -> Result<Value, EvalError>
1222where
1223    T: TryFrom<Value, Error = datatypes::Error> + num_traits::Num,
1224    Value: From<T>,
1225{
1226    let left = T::try_from(left).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?;
1227    let right = T::try_from(right).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?;
1228    Ok(Value::from(left + right))
1229}
1230
1231fn sub<T>(left: Value, right: Value) -> Result<Value, EvalError>
1232where
1233    T: TryFrom<Value, Error = datatypes::Error> + num_traits::Num,
1234    Value: From<T>,
1235{
1236    let left = T::try_from(left).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?;
1237    let right = T::try_from(right).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?;
1238    Ok(Value::from(left - right))
1239}
1240
1241fn mul<T>(left: Value, right: Value) -> Result<Value, EvalError>
1242where
1243    T: TryFrom<Value, Error = datatypes::Error> + num_traits::Num,
1244    Value: From<T>,
1245{
1246    let left = T::try_from(left).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?;
1247    let right = T::try_from(right).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?;
1248    Ok(Value::from(left * right))
1249}
1250
1251fn div<T>(left: Value, right: Value) -> Result<Value, EvalError>
1252where
1253    T: TryFrom<Value, Error = datatypes::Error> + num_traits::Num,
1254    <T as TryFrom<Value>>::Error: std::fmt::Debug,
1255    Value: From<T>,
1256{
1257    let left = T::try_from(left).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?;
1258    let right = T::try_from(right).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?;
1259    if right.is_zero() {
1260        return Err(DivisionByZeroSnafu {}.build());
1261    }
1262    Ok(Value::from(left / right))
1263}
1264
1265fn rem<T>(left: Value, right: Value) -> Result<Value, EvalError>
1266where
1267    T: TryFrom<Value, Error = datatypes::Error> + num_traits::Num,
1268    <T as TryFrom<Value>>::Error: std::fmt::Debug,
1269    Value: From<T>,
1270{
1271    let left = T::try_from(left).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?;
1272    let right = T::try_from(right).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?;
1273    Ok(Value::from(left % right))
1274}
1275
1276#[cfg(test)]
1277mod test {
1278    use std::sync::Arc;
1279
1280    use datatypes::vectors::Vector;
1281    use pretty_assertions::assert_eq;
1282
1283    use super::*;
1284
1285    #[test]
1286    fn test_tumble_batch() {
1287        let timestamp_vector = TimestampMillisecondVector::from_vec(vec![1, 2, 10, 13, 14, 20, 25]);
1288        let tumble_start = UnaryFunc::TumbleWindowFloor {
1289            window_size: Duration::from_millis(10),
1290            start_time: None,
1291        };
1292        let tumble_end = UnaryFunc::TumbleWindowCeiling {
1293            window_size: Duration::from_millis(10),
1294            start_time: None,
1295        };
1296
1297        let len = timestamp_vector.len();
1298        let batch = Batch::try_new(vec![Arc::new(timestamp_vector)], len).unwrap();
1299        let arg = ScalarExpr::Column(0);
1300
1301        let start = tumble_start.eval_batch(&batch, &arg).unwrap();
1302        let end = tumble_end.eval_batch(&batch, &arg).unwrap();
1303        assert_eq!(
1304            start.to_arrow_array().as_ref(),
1305            TimestampMillisecondVector::from_vec(vec![0, 0, 10, 10, 10, 20, 20])
1306                .to_arrow_array()
1307                .as_ref()
1308        );
1309
1310        assert_eq!(
1311            end.to_arrow_array().as_ref(),
1312            TimestampMillisecondVector::from_vec(vec![10, 10, 20, 20, 20, 30, 30])
1313                .to_arrow_array()
1314                .as_ref()
1315        );
1316
1317        let ts_ms_vector = TimestampMillisecondVector::from_vec(vec![1, 2, 10, 13, 14, 20, 25]);
1318        let batch = Batch::try_new(vec![Arc::new(ts_ms_vector)], len).unwrap();
1319
1320        let start = tumble_start.eval_batch(&batch, &arg).unwrap();
1321        let end = tumble_end.eval_batch(&batch, &arg).unwrap();
1322
1323        assert_eq!(
1324            start.to_arrow_array().as_ref(),
1325            TimestampMillisecondVector::from_vec(vec![0, 0, 10, 10, 10, 20, 20])
1326                .to_arrow_array()
1327                .as_ref()
1328        );
1329
1330        assert_eq!(
1331            end.to_arrow_array().as_ref(),
1332            TimestampMillisecondVector::from_vec(vec![10, 10, 20, 20, 20, 30, 30])
1333                .to_arrow_array()
1334                .as_ref()
1335        );
1336    }
1337
1338    #[test]
1339    fn test_num_ops() {
1340        let left = Value::from(10);
1341        let right = Value::from(3);
1342        let res = add::<i32>(left.clone(), right.clone()).unwrap();
1343        assert_eq!(res, Value::from(13));
1344        let res = sub::<i32>(left.clone(), right.clone()).unwrap();
1345        assert_eq!(res, Value::from(7));
1346        let res = mul::<i32>(left.clone(), right.clone()).unwrap();
1347        assert_eq!(res, Value::from(30));
1348        let res = div::<i32>(left.clone(), right.clone()).unwrap();
1349        assert_eq!(res, Value::from(3));
1350        let res = rem::<i32>(left, right).unwrap();
1351        assert_eq!(res, Value::from(1));
1352
1353        let values = vec![Value::from(true), Value::from(false)];
1354        let exprs = vec![ScalarExpr::Column(0), ScalarExpr::Column(1)];
1355        let res = and(&values, &exprs).unwrap();
1356        assert_eq!(res, Value::from(false));
1357        let res = or(&values, &exprs).unwrap();
1358        assert_eq!(res, Value::from(true));
1359    }
1360
1361    /// test if the binary function specialization works
1362    /// whether from direct type or from the expression that is literal
1363    #[test]
1364    fn test_binary_func_spec() {
1365        assert_eq!(
1366            BinaryFunc::from_str_expr_and_type(
1367                "add",
1368                &[ScalarExpr::Column(0), ScalarExpr::Column(0)],
1369                &[
1370                    Some(ConcreteDataType::int32_datatype()),
1371                    Some(ConcreteDataType::int32_datatype())
1372                ]
1373            )
1374            .unwrap(),
1375            (BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature())
1376        );
1377
1378        assert_eq!(
1379            BinaryFunc::from_str_expr_and_type(
1380                "add",
1381                &[ScalarExpr::Column(0), ScalarExpr::Column(0)],
1382                &[Some(ConcreteDataType::int32_datatype()), None]
1383            )
1384            .unwrap(),
1385            (BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature())
1386        );
1387
1388        assert_eq!(
1389            BinaryFunc::from_str_expr_and_type(
1390                "add",
1391                &[ScalarExpr::Column(0), ScalarExpr::Column(0)],
1392                &[Some(ConcreteDataType::int32_datatype()), None]
1393            )
1394            .unwrap(),
1395            (BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature())
1396        );
1397
1398        assert_eq!(
1399            BinaryFunc::from_str_expr_and_type(
1400                "add",
1401                &[ScalarExpr::Column(0), ScalarExpr::Column(0)],
1402                &[Some(ConcreteDataType::int32_datatype()), None]
1403            )
1404            .unwrap(),
1405            (BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature())
1406        );
1407
1408        assert_eq!(
1409            BinaryFunc::from_str_expr_and_type(
1410                "add",
1411                &[
1412                    ScalarExpr::Literal(Value::from(1i32), ConcreteDataType::int32_datatype()),
1413                    ScalarExpr::Column(0)
1414                ],
1415                &[None, None]
1416            )
1417            .unwrap(),
1418            (BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature())
1419        );
1420
1421        // this testcase make sure the specialization can find actual type from expression and fill in signature
1422        assert_eq!(
1423            BinaryFunc::from_str_expr_and_type(
1424                "equal",
1425                &[
1426                    ScalarExpr::Literal(Value::from(1i32), ConcreteDataType::int32_datatype()),
1427                    ScalarExpr::Column(0)
1428                ],
1429                &[None, None]
1430            )
1431            .unwrap(),
1432            (
1433                BinaryFunc::Eq,
1434                Signature {
1435                    input: smallvec![
1436                        ConcreteDataType::int32_datatype(),
1437                        ConcreteDataType::int32_datatype()
1438                    ],
1439                    output: ConcreteDataType::boolean_datatype(),
1440                    generic_fn: GenericFn::Eq
1441                }
1442            )
1443        );
1444
1445        matches!(
1446            BinaryFunc::from_str_expr_and_type(
1447                "add",
1448                &[ScalarExpr::Column(0), ScalarExpr::Column(0)],
1449                &[None, None]
1450            ),
1451            Err(Error::InvalidQuery { .. })
1452        );
1453    }
1454
1455    #[test]
1456    fn test_cast_int() {
1457        let interval = cast(
1458            Value::from("1 second"),
1459            &ConcreteDataType::interval_day_time_datatype(),
1460        )
1461        .unwrap();
1462        assert_eq!(
1463            interval,
1464            Value::from(common_time::IntervalDayTime::new(0, 1000))
1465        );
1466    }
1467}