1use std::collections::HashMap;
18use std::sync::{Arc, OnceLock};
19use std::time::Duration;
20
21use arrow::array::{ArrayRef, BooleanArray};
22use common_error::ext::BoxedError;
23use common_time::timestamp::TimeUnit;
24use common_time::Timestamp;
25use datafusion_expr::Operator;
26use datatypes::data_type::ConcreteDataType;
27use datatypes::prelude::DataType;
28use datatypes::types::cast;
29use datatypes::value::Value;
30use datatypes::vectors::{BooleanVector, Helper, TimestampMillisecondVector, VectorRef};
31use serde::{Deserialize, Serialize};
32use smallvec::smallvec;
33use snafu::{ensure, OptionExt, ResultExt};
34use strum::{EnumIter, IntoEnumIterator};
35use substrait::df_logical_plan::consumer::name_to_op;
36
37use crate::error::{Error, ExternalSnafu, InvalidQuerySnafu, PlanSnafu, UnexpectedSnafu};
38use crate::expr::error::{
39 ArrowSnafu, CastValueSnafu, DataTypeSnafu, DivisionByZeroSnafu, EvalError, OverflowSnafu,
40 TryFromValueSnafu, TypeMismatchSnafu,
41};
42use crate::expr::signature::{GenericFn, Signature};
43use crate::expr::{Batch, InvalidArgumentSnafu, ScalarExpr, TypedExpr, TUMBLE_END, TUMBLE_START};
44use crate::repr::{self, value_to_internal_ts};
45
46#[derive(Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Hash)]
49pub enum UnmaterializableFunc {
50 Now,
51 CurrentSchema,
52 TumbleWindow {
53 ts: Box<TypedExpr>,
54 window_size: Duration,
55 start_time: Option<Timestamp>,
56 },
57}
58
59impl UnmaterializableFunc {
60 pub fn signature(&self) -> Signature {
62 match self {
63 Self::Now => Signature {
64 input: smallvec![],
65 output: ConcreteDataType::timestamp_millisecond_datatype(),
67 generic_fn: GenericFn::Now,
68 },
69 Self::CurrentSchema => Signature {
70 input: smallvec![],
71 output: ConcreteDataType::string_datatype(),
72 generic_fn: GenericFn::CurrentSchema,
73 },
74 Self::TumbleWindow { .. } => Signature {
75 input: smallvec![ConcreteDataType::timestamp_millisecond_datatype()],
76 output: ConcreteDataType::timestamp_millisecond_datatype(),
77 generic_fn: GenericFn::TumbleWindow,
78 },
79 }
80 }
81
82 pub fn is_valid_func_name(name: &str) -> bool {
83 matches!(
84 name.to_lowercase().as_str(),
85 "now" | "current_schema" | "tumble"
86 )
87 }
88
89 pub fn from_str_args(name: &str, _args: Vec<TypedExpr>) -> Result<Self, Error> {
91 match name.to_lowercase().as_str() {
92 "now" => Ok(Self::Now),
93 "current_schema" => Ok(Self::CurrentSchema),
94 _ => InvalidQuerySnafu {
95 reason: format!("Unknown unmaterializable function: {}", name),
96 }
97 .fail(),
98 }
99 }
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
105pub enum UnaryFunc {
106 Not,
107 IsNull,
108 IsTrue,
109 IsFalse,
110 StepTimestamp,
111 Cast(ConcreteDataType),
112 TumbleWindowFloor {
113 window_size: Duration,
114 start_time: Option<Timestamp>,
115 },
116 TumbleWindowCeiling {
117 window_size: Duration,
118 start_time: Option<Timestamp>,
119 },
120}
121
122impl UnaryFunc {
123 pub fn signature(&self) -> Signature {
125 match self {
126 Self::IsNull => Signature {
127 input: smallvec![ConcreteDataType::null_datatype()],
128 output: ConcreteDataType::boolean_datatype(),
129 generic_fn: GenericFn::IsNull,
130 },
131 Self::Not | Self::IsTrue | Self::IsFalse => Signature {
132 input: smallvec![ConcreteDataType::boolean_datatype()],
133 output: ConcreteDataType::boolean_datatype(),
134 generic_fn: match self {
135 Self::Not => GenericFn::Not,
136 Self::IsTrue => GenericFn::IsTrue,
137 Self::IsFalse => GenericFn::IsFalse,
138 _ => unreachable!(),
139 },
140 },
141 Self::StepTimestamp => Signature {
142 input: smallvec![ConcreteDataType::timestamp_millisecond_datatype()],
143 output: ConcreteDataType::timestamp_millisecond_datatype(),
144 generic_fn: GenericFn::StepTimestamp,
145 },
146 Self::Cast(to) => Signature {
147 input: smallvec![ConcreteDataType::null_datatype()],
148 output: to.clone(),
149 generic_fn: GenericFn::Cast,
150 },
151 Self::TumbleWindowFloor { .. } => Signature {
152 input: smallvec![ConcreteDataType::timestamp_millisecond_datatype()],
153 output: ConcreteDataType::timestamp_millisecond_datatype(),
154 generic_fn: GenericFn::TumbleWindow,
155 },
156 Self::TumbleWindowCeiling { .. } => Signature {
157 input: smallvec![ConcreteDataType::timestamp_millisecond_datatype()],
158 output: ConcreteDataType::timestamp_millisecond_datatype(),
159 generic_fn: GenericFn::TumbleWindow,
160 },
161 }
162 }
163
164 pub fn is_valid_func_name(name: &str) -> bool {
165 matches!(
166 name.to_lowercase().as_str(),
167 "not" | "is_null" | "is_true" | "is_false" | "step_timestamp" | "cast"
168 )
169 }
170
171 pub fn from_str_and_type(
173 name: &str,
174 arg_type: Option<ConcreteDataType>,
175 ) -> Result<Self, Error> {
176 match name {
177 "not" => Ok(Self::Not),
178 "is_null" => Ok(Self::IsNull),
179 "is_true" => Ok(Self::IsTrue),
180 "is_false" => Ok(Self::IsFalse),
181 "step_timestamp" => Ok(Self::StepTimestamp),
182 "cast" => {
183 let arg_type = arg_type.with_context(|| InvalidQuerySnafu {
184 reason: "cast function requires a type argument".to_string(),
185 })?;
186 Ok(UnaryFunc::Cast(arg_type))
187 }
188 _ => InvalidQuerySnafu {
189 reason: format!("Unknown unary function: {}", name),
190 }
191 .fail(),
192 }
193 }
194
195 pub fn eval_batch(&self, batch: &Batch, expr: &ScalarExpr) -> Result<VectorRef, EvalError> {
196 let arg_col = expr.eval_batch(batch)?;
197 match self {
198 Self::Not => {
199 let arrow_array = arg_col.to_arrow_array();
200 let bool_array = arrow_array
201 .as_any()
202 .downcast_ref::<BooleanArray>()
203 .context({
204 TypeMismatchSnafu {
205 expected: ConcreteDataType::boolean_datatype(),
206 actual: arg_col.data_type(),
207 }
208 })?;
209 let ret = arrow::compute::not(bool_array).context(ArrowSnafu { context: "not" })?;
210 let ret = BooleanVector::from(ret);
211 Ok(Arc::new(ret))
212 }
213 Self::IsNull => {
214 let arrow_array = arg_col.to_arrow_array();
215 let ret = arrow::compute::is_null(&arrow_array)
216 .context(ArrowSnafu { context: "is_null" })?;
217 let ret = BooleanVector::from(ret);
218 Ok(Arc::new(ret))
219 }
220 Self::IsTrue | Self::IsFalse => {
221 let arrow_array = arg_col.to_arrow_array();
222 let bool_array = arrow_array
223 .as_any()
224 .downcast_ref::<BooleanArray>()
225 .context({
226 TypeMismatchSnafu {
227 expected: ConcreteDataType::boolean_datatype(),
228 actual: arg_col.data_type(),
229 }
230 })?;
231
232 if matches!(self, Self::IsTrue) {
233 Ok(Arc::new(BooleanVector::from(bool_array.clone())))
234 } else {
235 let ret =
236 arrow::compute::not(bool_array).context(ArrowSnafu { context: "not" })?;
237 Ok(Arc::new(BooleanVector::from(ret)))
238 }
239 }
240 Self::StepTimestamp => {
241 let timestamp_array = get_timestamp_array(&arg_col)?;
242 let timestamp_array_ref = timestamp_array
243 .as_any()
244 .downcast_ref::<arrow::array::TimestampMillisecondArray>()
245 .context({
246 TypeMismatchSnafu {
247 expected: ConcreteDataType::boolean_datatype(),
248 actual: ConcreteDataType::from_arrow_type(timestamp_array.data_type()),
249 }
250 })?;
251
252 let ret = arrow::compute::unary(timestamp_array_ref, |arr| arr + 1);
253 let ret = TimestampMillisecondVector::from(ret);
254 Ok(Arc::new(ret))
255 }
256 Self::Cast(to) => {
257 let arrow_array = arg_col.to_arrow_array();
258 let ret = arrow::compute::cast(&arrow_array, &to.as_arrow_type())
259 .context(ArrowSnafu { context: "cast" })?;
260 let vector = Helper::try_into_vector(ret).context(DataTypeSnafu {
261 msg: "Fail to convert to Vector",
262 })?;
263 Ok(vector)
264 }
265 Self::TumbleWindowFloor {
266 window_size,
267 start_time,
268 } => {
269 let timestamp_array = get_timestamp_array(&arg_col)?;
270 let date_array_ref = timestamp_array
271 .as_any()
272 .downcast_ref::<arrow::array::TimestampMillisecondArray>()
273 .context({
274 TypeMismatchSnafu {
275 expected: ConcreteDataType::boolean_datatype(),
276 actual: ConcreteDataType::from_arrow_type(timestamp_array.data_type()),
277 }
278 })?;
279
280 let start_time = start_time.map(|t| t.value());
281 let window_size = window_size.as_millis() as repr::Duration;
282
283 let ret = arrow::compute::unary(date_array_ref, |ts| {
284 get_window_start(ts, window_size, start_time)
285 });
286
287 let ret = TimestampMillisecondVector::from(ret);
288 Ok(Arc::new(ret))
289 }
290 Self::TumbleWindowCeiling {
291 window_size,
292 start_time,
293 } => {
294 let timestamp_array = get_timestamp_array(&arg_col)?;
295 let date_array_ref = timestamp_array
296 .as_any()
297 .downcast_ref::<arrow::array::TimestampMillisecondArray>()
298 .context({
299 TypeMismatchSnafu {
300 expected: ConcreteDataType::boolean_datatype(),
301 actual: ConcreteDataType::from_arrow_type(timestamp_array.data_type()),
302 }
303 })?;
304
305 let start_time = start_time.map(|t| t.value());
306 let window_size = window_size.as_millis() as repr::Duration;
307
308 let ret = arrow::compute::unary(date_array_ref, |ts| {
309 get_window_start(ts, window_size, start_time) + window_size
310 });
311
312 let ret = TimestampMillisecondVector::from(ret);
313 Ok(Arc::new(ret))
314 }
315 }
316 }
317
318 pub fn from_tumble_func(name: &str, args: &[TypedExpr]) -> Result<(Self, TypedExpr), Error> {
319 match name.to_lowercase().as_str() {
320 TUMBLE_START | TUMBLE_END => {
321 let ts = args.first().context(InvalidQuerySnafu {
322 reason: "Tumble window function requires a timestamp argument",
323 })?;
324 let window_size = {
325 let window_size_untyped = args
326 .get(1)
327 .and_then(|expr| expr.expr.as_literal())
328 .context(InvalidQuerySnafu {
329 reason: "Tumble window function requires a window size argument",
330 })?;
331 if let Some(window_size) = window_size_untyped.as_string() {
332 let interval = cast(
334 Value::from(window_size),
335 &ConcreteDataType::interval_day_time_datatype(),
336 )
337 .map_err(BoxedError::new)
338 .context(ExternalSnafu)?
339 .as_interval_day_time()
340 .context(UnexpectedSnafu {
341 reason: "Expect window size arg to be interval after successful cast"
342 .to_string(),
343 })?;
344 Duration::from_millis(interval.as_millis() as u64)
345 } else if let Some(interval) = window_size_untyped.as_interval_day_time() {
346 Duration::from_millis(interval.as_millis() as u64)
347 } else {
348 InvalidQuerySnafu {
349 reason: format!(
350 "Tumble window function requires window size argument to be either a interval or a string describe a interval, found {:?}",
351 window_size_untyped
352 )
353 }.fail()?
354 }
355 };
356
357 let start_time = match args.get(2) {
359 Some(start_time) => {
360 if let Some(value) = start_time.expr.as_literal() {
361 let ret = cast(
363 value,
364 &ConcreteDataType::timestamp_millisecond_datatype(),
365 )
366 .map_err(BoxedError::new)
367 .context(ExternalSnafu)?
368 .as_timestamp()
369 .context(UnexpectedSnafu {
370 reason:
371 "Expect start time arg to be timestamp after successful cast"
372 .to_string(),
373 })?;
374 Some(ret)
375 } else {
376 UnexpectedSnafu {
377 reason: "Expect start time arg to be literal",
378 }
379 .fail()?
380 }
381 }
382 None => None,
383 };
384
385 if name == TUMBLE_START {
386 Ok((
387 Self::TumbleWindowFloor {
388 window_size,
389 start_time,
390 },
391 ts.clone(),
392 ))
393 } else if name == TUMBLE_END {
394 Ok((
395 Self::TumbleWindowCeiling {
396 window_size,
397 start_time,
398 },
399 ts.clone(),
400 ))
401 } else {
402 unreachable!()
403 }
404 }
405 _ => crate::error::InternalSnafu {
406 reason: format!("Unknown tumble kind function: {}", name),
407 }
408 .fail()?,
409 }
410 }
411
412 pub fn eval(&self, values: &[Value], expr: &ScalarExpr) -> Result<Value, EvalError> {
420 let arg = expr.eval(values)?;
421 match self {
422 Self::Not => {
423 let bool = if let Value::Boolean(bool) = arg {
424 Ok(bool)
425 } else {
426 TypeMismatchSnafu {
427 expected: ConcreteDataType::boolean_datatype(),
428 actual: arg.data_type(),
429 }
430 .fail()?
431 }?;
432 Ok(Value::from(!bool))
433 }
434 Self::IsNull => Ok(Value::from(arg.is_null())),
435 Self::IsTrue | Self::IsFalse => {
436 let bool = if let Value::Boolean(bool) = arg {
437 Ok(bool)
438 } else {
439 TypeMismatchSnafu {
440 expected: ConcreteDataType::boolean_datatype(),
441 actual: arg.data_type(),
442 }
443 .fail()?
444 }?;
445 if matches!(self, Self::IsTrue) {
446 Ok(Value::from(bool))
447 } else {
448 Ok(Value::from(!bool))
449 }
450 }
451 Self::StepTimestamp => {
452 let ty = arg.data_type();
453 if let Value::Timestamp(timestamp) = arg {
454 let timestamp = Timestamp::new_millisecond(timestamp.value() + 1);
455 Ok(Value::from(timestamp))
456 } else if let Ok(v) = value_to_internal_ts(arg) {
457 let timestamp = Timestamp::new_millisecond(v + 1);
458 Ok(Value::from(timestamp))
459 } else {
460 TypeMismatchSnafu {
461 expected: ConcreteDataType::timestamp_millisecond_datatype(),
462 actual: ty,
463 }
464 .fail()?
465 }
466 }
467 Self::Cast(to) => {
468 let arg_ty = arg.data_type();
469 cast(arg, to).context({
470 CastValueSnafu {
471 from: arg_ty,
472 to: to.clone(),
473 }
474 })
475 }
476 Self::TumbleWindowFloor {
477 window_size,
478 start_time,
479 } => {
480 let ts = get_ts_as_millisecond(arg)?;
481 let start_time = start_time.map(|t| t.value());
482 let window_size = window_size.as_millis() as repr::Duration;
483 let window_start = get_window_start(ts, window_size, start_time);
484
485 let ret = Timestamp::new_millisecond(window_start);
486 Ok(Value::from(ret))
487 }
488 Self::TumbleWindowCeiling {
489 window_size,
490 start_time,
491 } => {
492 let ts = get_ts_as_millisecond(arg)?;
493 let start_time = start_time.map(|t| t.value());
494 let window_size = window_size.as_millis() as repr::Duration;
495 let window_start = get_window_start(ts, window_size, start_time);
496
497 let window_end = window_start + window_size;
498 let ret = Timestamp::new_millisecond(window_end);
499 Ok(Value::from(ret))
500 }
501 }
502 }
503}
504
505fn get_timestamp_array(vector: &VectorRef) -> Result<arrow::array::ArrayRef, EvalError> {
506 let arrow_array = vector.to_arrow_array();
507 let timestamp_array = if *arrow_array.data_type()
508 == ConcreteDataType::timestamp_millisecond_datatype().as_arrow_type()
509 {
510 arrow_array
511 } else {
512 arrow::compute::cast(
513 &arrow_array,
514 &ConcreteDataType::timestamp_millisecond_datatype().as_arrow_type(),
515 )
516 .context(ArrowSnafu {
517 context: "Trying to cast to timestamp in StepTimestamp",
518 })?
519 };
520 Ok(timestamp_array)
521}
522
523fn get_window_start(
524 ts: repr::Timestamp,
525 window_size: repr::Duration,
526 start_time: Option<repr::Timestamp>,
527) -> repr::Timestamp {
528 let start_time = start_time.unwrap_or(0);
529 if ts >= start_time {
531 start_time + (ts - start_time) / window_size * window_size
532 } else {
533 start_time + (ts - start_time) / window_size * window_size
534 - if ((start_time - ts) % window_size) != 0 {
535 window_size
536 } else {
537 0
538 }
539 }
540}
541
542#[test]
543fn test_get_window_start() {
544 assert_eq!(get_window_start(1, 3, None), 0);
545 assert_eq!(get_window_start(3, 3, None), 3);
546 assert_eq!(get_window_start(0, 3, None), 0);
547
548 assert_eq!(get_window_start(-1, 3, None), -3);
549 assert_eq!(get_window_start(-3, 3, None), -3);
550}
551
552fn get_ts_as_millisecond(arg: Value) -> Result<repr::Timestamp, EvalError> {
553 let ts = if let Some(ts) = arg.as_timestamp() {
554 ts.convert_to(TimeUnit::Millisecond)
555 .context(OverflowSnafu)?
556 .value()
557 } else {
558 InvalidArgumentSnafu {
559 reason: "Expect input to be timestamp or datetime type",
560 }
561 .fail()?
562 };
563 Ok(ts)
564}
565
566#[derive(
571 Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash, EnumIter,
572)]
573pub enum BinaryFunc {
574 Eq,
575 NotEq,
576 Lt,
577 Lte,
578 Gt,
579 Gte,
580 AddInt16,
581 AddInt32,
582 AddInt64,
583 AddUInt16,
584 AddUInt32,
585 AddUInt64,
586 AddFloat32,
587 AddFloat64,
588 SubInt16,
589 SubInt32,
590 SubInt64,
591 SubUInt16,
592 SubUInt32,
593 SubUInt64,
594 SubFloat32,
595 SubFloat64,
596 MulInt16,
597 MulInt32,
598 MulInt64,
599 MulUInt16,
600 MulUInt32,
601 MulUInt64,
602 MulFloat32,
603 MulFloat64,
604 DivInt16,
605 DivInt32,
606 DivInt64,
607 DivUInt16,
608 DivUInt32,
609 DivUInt64,
610 DivFloat32,
611 DivFloat64,
612 ModInt16,
613 ModInt32,
614 ModInt64,
615 ModUInt16,
616 ModUInt32,
617 ModUInt64,
618}
619
620macro_rules! generate_binary_signature {
638 ($value:ident, { $($user_arm:tt)* },
639 [ $(
640 $auto_arm:ident=>($con_type:ident,$generic:ident)
641 ),*
642 ]) => {
643 match $value {
644 $($user_arm)*,
645 $(
646 Self::$auto_arm => Signature {
647 input: smallvec![
648 ConcreteDataType::$con_type(),
649 ConcreteDataType::$con_type(),
650 ],
651 output: ConcreteDataType::$con_type(),
652 generic_fn: GenericFn::$generic,
653 },
654 )*
655 }
656 };
657}
658
659static SPECIALIZATION: OnceLock<HashMap<(GenericFn, ConcreteDataType), BinaryFunc>> =
660 OnceLock::new();
661
662impl BinaryFunc {
663 pub fn signature(&self) -> Signature {
665 generate_binary_signature!(self, {
666 Self::Eq | Self::NotEq | Self::Lt | Self::Lte | Self::Gt | Self::Gte => Signature {
667 input: smallvec![
668 ConcreteDataType::null_datatype(),
669 ConcreteDataType::null_datatype()
670 ],
671 output: ConcreteDataType::boolean_datatype(),
672 generic_fn: match self {
673 Self::Eq => GenericFn::Eq,
674 Self::NotEq => GenericFn::NotEq,
675 Self::Lt => GenericFn::Lt,
676 Self::Lte => GenericFn::Lte,
677 Self::Gt => GenericFn::Gt,
678 Self::Gte => GenericFn::Gte,
679 _ => unreachable!(),
680 },
681 }
682 },
683 [
684 AddInt16=>(int16_datatype,Add),
685 AddInt32=>(int32_datatype,Add),
686 AddInt64=>(int64_datatype,Add),
687 AddUInt16=>(uint16_datatype,Add),
688 AddUInt32=>(uint32_datatype,Add),
689 AddUInt64=>(uint64_datatype,Add),
690 AddFloat32=>(float32_datatype,Add),
691 AddFloat64=>(float64_datatype,Add),
692 SubInt16=>(int16_datatype,Sub),
693 SubInt32=>(int32_datatype,Sub),
694 SubInt64=>(int64_datatype,Sub),
695 SubUInt16=>(uint16_datatype,Sub),
696 SubUInt32=>(uint32_datatype,Sub),
697 SubUInt64=>(uint64_datatype,Sub),
698 SubFloat32=>(float32_datatype,Sub),
699 SubFloat64=>(float64_datatype,Sub),
700 MulInt16=>(int16_datatype,Mul),
701 MulInt32=>(int32_datatype,Mul),
702 MulInt64=>(int64_datatype,Mul),
703 MulUInt16=>(uint16_datatype,Mul),
704 MulUInt32=>(uint32_datatype,Mul),
705 MulUInt64=>(uint64_datatype,Mul),
706 MulFloat32=>(float32_datatype,Mul),
707 MulFloat64=>(float64_datatype,Mul),
708 DivInt16=>(int16_datatype,Div),
709 DivInt32=>(int32_datatype,Div),
710 DivInt64=>(int64_datatype,Div),
711 DivUInt16=>(uint16_datatype,Div),
712 DivUInt32=>(uint32_datatype,Div),
713 DivUInt64=>(uint64_datatype,Div),
714 DivFloat32=>(float32_datatype,Div),
715 DivFloat64=>(float64_datatype,Div),
716 ModInt16=>(int16_datatype,Mod),
717 ModInt32=>(int32_datatype,Mod),
718 ModInt64=>(int64_datatype,Mod),
719 ModUInt16=>(uint16_datatype,Mod),
720 ModUInt32=>(uint32_datatype,Mod),
721 ModUInt64=>(uint64_datatype,Mod)
722 ]
723 )
724 }
725
726 pub fn add(input_type: ConcreteDataType) -> Result<Self, Error> {
727 Self::specialization(GenericFn::Add, input_type)
728 }
729
730 pub fn sub(input_type: ConcreteDataType) -> Result<Self, Error> {
731 Self::specialization(GenericFn::Sub, input_type)
732 }
733
734 pub fn mul(input_type: ConcreteDataType) -> Result<Self, Error> {
735 Self::specialization(GenericFn::Mul, input_type)
736 }
737
738 pub fn div(input_type: ConcreteDataType) -> Result<Self, Error> {
739 Self::specialization(GenericFn::Div, input_type)
740 }
741
742 pub fn specialization(generic: GenericFn, input_type: ConcreteDataType) -> Result<Self, Error> {
744 let rule = SPECIALIZATION.get_or_init(|| {
745 let mut spec = HashMap::new();
746 for func in BinaryFunc::iter() {
747 let sig = func.signature();
748 spec.insert((sig.generic_fn, sig.input[0].clone()), func);
749 }
750 spec
751 });
752 rule.get(&(generic, input_type.clone()))
753 .cloned()
754 .with_context(|| InvalidQuerySnafu {
755 reason: format!(
756 "No specialization found for binary function {:?} with input type {:?}",
757 generic, input_type
758 ),
759 })
760 }
761
762 pub(crate) fn infer_type_from(
766 generic: GenericFn,
767 arg_exprs: &[ScalarExpr],
768 arg_types: &[Option<ConcreteDataType>],
769 ) -> Result<ConcreteDataType, Error> {
770 let ret = match (arg_types[0].as_ref(), arg_types[1].as_ref()) {
771 (Some(t1), Some(t2)) => {
772 ensure!(
773 t1 == t2,
774 InvalidQuerySnafu {
775 reason: format!(
776 "Binary function {:?} requires both arguments to have the same type, left={:?}, right={:?}",
777 generic, t1, t2
778 ),
779 }
780 );
781 t1.clone()
782 }
783 (Some(t), None) | (None, Some(t)) => t.clone(),
784 _ => arg_exprs[0]
785 .as_literal()
786 .map(|lit| lit.data_type())
787 .or_else(|| arg_exprs[1].as_literal().map(|lit| lit.data_type()))
788 .with_context(|| InvalidQuerySnafu {
789 reason: format!(
790 "Binary function {:?} requires at least one argument with known type",
791 generic
792 ),
793 })?,
794 };
795 Ok(ret)
796 }
797
798 pub fn is_valid_func_name(name: &str) -> bool {
799 matches!(
800 name.to_lowercase().as_str(),
801 "eq" | "equal"
802 | "not_eq"
803 | "not_equal"
804 | "lt"
805 | "lte"
806 | "gt"
807 | "gte"
808 | "add"
809 | "sub"
810 | "subtract"
811 | "mul"
812 | "multiply"
813 | "div"
814 | "divide"
815 | "mod"
816 )
817 }
818
819 pub fn from_str_expr_and_type(
825 name: &str,
826 arg_exprs: &[ScalarExpr],
827 arg_types: &[Option<ConcreteDataType>],
828 ) -> Result<(Self, Signature), Error> {
829 let op = name_to_op(name).with_context(|| InvalidQuerySnafu {
831 reason: format!("Unsupported binary function: {}", name),
832 })?;
833
834 let generic_fn = {
836 match op {
837 Operator::Eq => GenericFn::Eq,
838 Operator::NotEq => GenericFn::NotEq,
839 Operator::Lt => GenericFn::Lt,
840 Operator::LtEq => GenericFn::Lte,
841 Operator::Gt => GenericFn::Gt,
842 Operator::GtEq => GenericFn::Gte,
843 Operator::Plus => GenericFn::Add,
844 Operator::Minus => GenericFn::Sub,
845 Operator::Multiply => GenericFn::Mul,
846 Operator::Divide => GenericFn::Div,
847 Operator::Modulo => GenericFn::Mod,
848 _ => {
849 return InvalidQuerySnafu {
850 reason: format!("Unsupported binary function: {}", name),
851 }
852 .fail();
853 }
854 }
855 };
856 let need_type = matches!(
857 generic_fn,
858 GenericFn::Add | GenericFn::Sub | GenericFn::Mul | GenericFn::Div | GenericFn::Mod
859 );
860
861 ensure!(
862 arg_exprs.len() == 2 && arg_types.len() == 2,
863 PlanSnafu {
864 reason: "Binary function requires exactly 2 arguments".to_string()
865 }
866 );
867
868 let arg_type = Self::infer_type_from(generic_fn, arg_exprs, arg_types)?;
869
870 let query_input_type = if need_type {
873 arg_type.clone()
874 } else {
875 ConcreteDataType::null_datatype()
876 };
877
878 let spec_fn = Self::specialization(generic_fn, query_input_type)?;
879
880 let signature = Signature {
881 input: smallvec![arg_type.clone(), arg_type],
882 output: spec_fn.signature().output,
883 generic_fn,
884 };
885
886 Ok((spec_fn, signature))
887 }
888
889 pub fn eval_batch(
890 &self,
891 batch: &Batch,
892 expr1: &ScalarExpr,
893 expr2: &ScalarExpr,
894 ) -> Result<VectorRef, EvalError> {
895 let left = expr1.eval_batch(batch)?;
896 let left = left.to_arrow_array();
897 let right = expr2.eval_batch(batch)?;
898 let right = right.to_arrow_array();
899
900 let arrow_array: ArrayRef = match self {
901 Self::Eq => Arc::new(
902 arrow::compute::kernels::cmp::eq(&left, &right)
903 .context(ArrowSnafu { context: "eq" })?,
904 ),
905 Self::NotEq => Arc::new(
906 arrow::compute::kernels::cmp::neq(&left, &right)
907 .context(ArrowSnafu { context: "neq" })?,
908 ),
909 Self::Lt => Arc::new(
910 arrow::compute::kernels::cmp::lt(&left, &right)
911 .context(ArrowSnafu { context: "lt" })?,
912 ),
913 Self::Lte => Arc::new(
914 arrow::compute::kernels::cmp::lt_eq(&left, &right)
915 .context(ArrowSnafu { context: "lte" })?,
916 ),
917 Self::Gt => Arc::new(
918 arrow::compute::kernels::cmp::gt(&left, &right)
919 .context(ArrowSnafu { context: "gt" })?,
920 ),
921 Self::Gte => Arc::new(
922 arrow::compute::kernels::cmp::gt_eq(&left, &right)
923 .context(ArrowSnafu { context: "gte" })?,
924 ),
925
926 Self::AddInt16
927 | Self::AddInt32
928 | Self::AddInt64
929 | Self::AddUInt16
930 | Self::AddUInt32
931 | Self::AddUInt64
932 | Self::AddFloat32
933 | Self::AddFloat64 => arrow::compute::kernels::numeric::add(&left, &right)
934 .context(ArrowSnafu { context: "add" })?,
935
936 Self::SubInt16
937 | Self::SubInt32
938 | Self::SubInt64
939 | Self::SubUInt16
940 | Self::SubUInt32
941 | Self::SubUInt64
942 | Self::SubFloat32
943 | Self::SubFloat64 => arrow::compute::kernels::numeric::sub(&left, &right)
944 .context(ArrowSnafu { context: "sub" })?,
945
946 Self::MulInt16
947 | Self::MulInt32
948 | Self::MulInt64
949 | Self::MulUInt16
950 | Self::MulUInt32
951 | Self::MulUInt64
952 | Self::MulFloat32
953 | Self::MulFloat64 => arrow::compute::kernels::numeric::mul(&left, &right)
954 .context(ArrowSnafu { context: "mul" })?,
955
956 Self::DivInt16
957 | Self::DivInt32
958 | Self::DivInt64
959 | Self::DivUInt16
960 | Self::DivUInt32
961 | Self::DivUInt64
962 | Self::DivFloat32
963 | Self::DivFloat64 => arrow::compute::kernels::numeric::div(&left, &right)
964 .context(ArrowSnafu { context: "div" })?,
965
966 Self::ModInt16
967 | Self::ModInt32
968 | Self::ModInt64
969 | Self::ModUInt16
970 | Self::ModUInt32
971 | Self::ModUInt64 => arrow::compute::kernels::numeric::rem(&left, &right)
972 .context(ArrowSnafu { context: "rem" })?,
973 };
974
975 let vector = Helper::try_into_vector(arrow_array).context(DataTypeSnafu {
976 msg: "Fail to convert to Vector",
977 })?;
978 Ok(vector)
979 }
980
981 pub fn eval(
991 &self,
992 values: &[Value],
993 expr1: &ScalarExpr,
994 expr2: &ScalarExpr,
995 ) -> Result<Value, EvalError> {
996 let left = expr1.eval(values)?;
997 let right = expr2.eval(values)?;
998 match self {
999 Self::Eq => Ok(Value::from(left == right)),
1000 Self::NotEq => Ok(Value::from(left != right)),
1001 Self::Lt => Ok(Value::from(left < right)),
1002 Self::Lte => Ok(Value::from(left <= right)),
1003 Self::Gt => Ok(Value::from(left > right)),
1004 Self::Gte => Ok(Value::from(left >= right)),
1005
1006 Self::AddInt16 => Ok(add::<i16>(left, right)?),
1007 Self::AddInt32 => Ok(add::<i32>(left, right)?),
1008 Self::AddInt64 => Ok(add::<i64>(left, right)?),
1009 Self::AddUInt16 => Ok(add::<u16>(left, right)?),
1010 Self::AddUInt32 => Ok(add::<u32>(left, right)?),
1011 Self::AddUInt64 => Ok(add::<u64>(left, right)?),
1012 Self::AddFloat32 => Ok(add::<f32>(left, right)?),
1013 Self::AddFloat64 => Ok(add::<f64>(left, right)?),
1014
1015 Self::SubInt16 => Ok(sub::<i16>(left, right)?),
1016 Self::SubInt32 => Ok(sub::<i32>(left, right)?),
1017 Self::SubInt64 => Ok(sub::<i64>(left, right)?),
1018 Self::SubUInt16 => Ok(sub::<u16>(left, right)?),
1019 Self::SubUInt32 => Ok(sub::<u32>(left, right)?),
1020 Self::SubUInt64 => Ok(sub::<u64>(left, right)?),
1021 Self::SubFloat32 => Ok(sub::<f32>(left, right)?),
1022 Self::SubFloat64 => Ok(sub::<f64>(left, right)?),
1023
1024 Self::MulInt16 => Ok(mul::<i16>(left, right)?),
1025 Self::MulInt32 => Ok(mul::<i32>(left, right)?),
1026 Self::MulInt64 => Ok(mul::<i64>(left, right)?),
1027 Self::MulUInt16 => Ok(mul::<u16>(left, right)?),
1028 Self::MulUInt32 => Ok(mul::<u32>(left, right)?),
1029 Self::MulUInt64 => Ok(mul::<u64>(left, right)?),
1030 Self::MulFloat32 => Ok(mul::<f32>(left, right)?),
1031 Self::MulFloat64 => Ok(mul::<f64>(left, right)?),
1032
1033 Self::DivInt16 => Ok(div::<i16>(left, right)?),
1034 Self::DivInt32 => Ok(div::<i32>(left, right)?),
1035 Self::DivInt64 => Ok(div::<i64>(left, right)?),
1036 Self::DivUInt16 => Ok(div::<u16>(left, right)?),
1037 Self::DivUInt32 => Ok(div::<u32>(left, right)?),
1038 Self::DivUInt64 => Ok(div::<u64>(left, right)?),
1039 Self::DivFloat32 => Ok(div::<f32>(left, right)?),
1040 Self::DivFloat64 => Ok(div::<f64>(left, right)?),
1041
1042 Self::ModInt16 => Ok(rem::<i16>(left, right)?),
1043 Self::ModInt32 => Ok(rem::<i32>(left, right)?),
1044 Self::ModInt64 => Ok(rem::<i64>(left, right)?),
1045 Self::ModUInt16 => Ok(rem::<u16>(left, right)?),
1046 Self::ModUInt32 => Ok(rem::<u32>(left, right)?),
1047 Self::ModUInt64 => Ok(rem::<u64>(left, right)?),
1048 }
1049 }
1050
1051 pub fn reverse_compare(&self) -> Result<Self, Error> {
1054 let ret = match &self {
1055 BinaryFunc::Eq => BinaryFunc::Eq,
1056 BinaryFunc::NotEq => BinaryFunc::NotEq,
1057 BinaryFunc::Lt => BinaryFunc::Gt,
1058 BinaryFunc::Lte => BinaryFunc::Gte,
1059 BinaryFunc::Gt => BinaryFunc::Lt,
1060 BinaryFunc::Gte => BinaryFunc::Lte,
1061 _ => {
1062 return InvalidQuerySnafu {
1063 reason: format!("Expect a comparison operator, found {:?}", self),
1064 }
1065 .fail();
1066 }
1067 };
1068 Ok(ret)
1069 }
1070}
1071
1072#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
1074pub enum VariadicFunc {
1075 And,
1076 Or,
1077}
1078
1079impl VariadicFunc {
1080 pub fn signature(&self) -> Signature {
1082 Signature {
1083 input: smallvec![ConcreteDataType::boolean_datatype()],
1084 output: ConcreteDataType::boolean_datatype(),
1085 generic_fn: match self {
1086 Self::And => GenericFn::And,
1087 Self::Or => GenericFn::Or,
1088 },
1089 }
1090 }
1091
1092 pub fn is_valid_func_name(name: &str) -> bool {
1093 matches!(name.to_lowercase().as_str(), "and" | "or")
1094 }
1095
1096 pub fn from_str_and_types(
1098 name: &str,
1099 arg_types: &[Option<ConcreteDataType>],
1100 ) -> Result<Self, Error> {
1101 let _ = arg_types;
1103 match name {
1104 "and" => Ok(Self::And),
1105 "or" => Ok(Self::Or),
1106 _ => InvalidQuerySnafu {
1107 reason: format!("Unknown variadic function: {}", name),
1108 }
1109 .fail(),
1110 }
1111 }
1112
1113 pub fn eval_batch(&self, batch: &Batch, exprs: &[ScalarExpr]) -> Result<VectorRef, EvalError> {
1114 ensure!(
1115 !exprs.is_empty(),
1116 InvalidArgumentSnafu {
1117 reason: format!("Variadic function {:?} requires at least 1 arguments", self)
1118 }
1119 );
1120 let args = exprs
1121 .iter()
1122 .map(|expr| expr.eval_batch(batch).map(|v| v.to_arrow_array()))
1123 .collect::<Result<Vec<_>, _>>()?;
1124 let mut iter = args.into_iter();
1125
1126 let first = iter.next().unwrap();
1127 let mut left = first
1128 .as_any()
1129 .downcast_ref::<BooleanArray>()
1130 .context({
1131 TypeMismatchSnafu {
1132 expected: ConcreteDataType::boolean_datatype(),
1133 actual: ConcreteDataType::from_arrow_type(first.data_type()),
1134 }
1135 })?
1136 .clone();
1137
1138 for right in iter {
1139 let right = right.as_any().downcast_ref::<BooleanArray>().context({
1140 TypeMismatchSnafu {
1141 expected: ConcreteDataType::boolean_datatype(),
1142 actual: ConcreteDataType::from_arrow_type(right.data_type()),
1143 }
1144 })?;
1145 left = match self {
1146 Self::And => {
1147 arrow::compute::and(&left, right).context(ArrowSnafu { context: "and" })?
1148 }
1149 Self::Or => {
1150 arrow::compute::or(&left, right).context(ArrowSnafu { context: "or" })?
1151 }
1152 }
1153 }
1154
1155 Ok(Arc::new(BooleanVector::from(left)))
1156 }
1157
1158 pub fn eval(&self, values: &[Value], exprs: &[ScalarExpr]) -> Result<Value, EvalError> {
1160 match self {
1161 VariadicFunc::And => and(values, exprs),
1162 VariadicFunc::Or => or(values, exprs),
1163 }
1164 }
1165}
1166
1167fn and(values: &[Value], exprs: &[ScalarExpr]) -> Result<Value, EvalError> {
1168 let mut null = false;
1170 for expr in exprs {
1171 match expr.eval(values) {
1172 Ok(Value::Boolean(true)) => {}
1173 Ok(Value::Boolean(false)) => return Ok(Value::Boolean(false)), Ok(Value::Null) => null = true,
1175 Err(this_err) => {
1176 return Err(this_err);
1177 } Ok(x) => InvalidArgumentSnafu {
1179 reason: format!(
1180 "`and()` only support boolean type, found value {:?} of type {:?}",
1181 x,
1182 x.data_type()
1183 ),
1184 }
1185 .fail()?,
1186 }
1187 }
1188 match null {
1189 true => Ok(Value::Null),
1190 false => Ok(Value::Boolean(true)),
1191 }
1192}
1193
1194fn or(values: &[Value], exprs: &[ScalarExpr]) -> Result<Value, EvalError> {
1195 let mut null = false;
1197 for expr in exprs {
1198 match expr.eval(values) {
1199 Ok(Value::Boolean(true)) => return Ok(Value::Boolean(true)), Ok(Value::Boolean(false)) => {}
1201 Ok(Value::Null) => null = true,
1202 Err(this_err) => {
1203 return Err(this_err);
1204 } Ok(x) => InvalidArgumentSnafu {
1206 reason: format!(
1207 "`or()` only support boolean type, found value {:?} of type {:?}",
1208 x,
1209 x.data_type()
1210 ),
1211 }
1212 .fail()?,
1213 }
1214 }
1215 match null {
1216 true => Ok(Value::Null),
1217 false => Ok(Value::Boolean(false)),
1218 }
1219}
1220
1221fn add<T>(left: Value, right: Value) -> Result<Value, EvalError>
1222where
1223 T: TryFrom<Value, Error = datatypes::Error> + num_traits::Num,
1224 Value: From<T>,
1225{
1226 let left = T::try_from(left).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?;
1227 let right = T::try_from(right).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?;
1228 Ok(Value::from(left + right))
1229}
1230
1231fn sub<T>(left: Value, right: Value) -> Result<Value, EvalError>
1232where
1233 T: TryFrom<Value, Error = datatypes::Error> + num_traits::Num,
1234 Value: From<T>,
1235{
1236 let left = T::try_from(left).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?;
1237 let right = T::try_from(right).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?;
1238 Ok(Value::from(left - right))
1239}
1240
1241fn mul<T>(left: Value, right: Value) -> Result<Value, EvalError>
1242where
1243 T: TryFrom<Value, Error = datatypes::Error> + num_traits::Num,
1244 Value: From<T>,
1245{
1246 let left = T::try_from(left).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?;
1247 let right = T::try_from(right).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?;
1248 Ok(Value::from(left * right))
1249}
1250
1251fn div<T>(left: Value, right: Value) -> Result<Value, EvalError>
1252where
1253 T: TryFrom<Value, Error = datatypes::Error> + num_traits::Num,
1254 <T as TryFrom<Value>>::Error: std::fmt::Debug,
1255 Value: From<T>,
1256{
1257 let left = T::try_from(left).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?;
1258 let right = T::try_from(right).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?;
1259 if right.is_zero() {
1260 return Err(DivisionByZeroSnafu {}.build());
1261 }
1262 Ok(Value::from(left / right))
1263}
1264
1265fn rem<T>(left: Value, right: Value) -> Result<Value, EvalError>
1266where
1267 T: TryFrom<Value, Error = datatypes::Error> + num_traits::Num,
1268 <T as TryFrom<Value>>::Error: std::fmt::Debug,
1269 Value: From<T>,
1270{
1271 let left = T::try_from(left).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?;
1272 let right = T::try_from(right).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?;
1273 Ok(Value::from(left % right))
1274}
1275
1276#[cfg(test)]
1277mod test {
1278 use std::sync::Arc;
1279
1280 use datatypes::vectors::Vector;
1281 use pretty_assertions::assert_eq;
1282
1283 use super::*;
1284
1285 #[test]
1286 fn test_tumble_batch() {
1287 let timestamp_vector = TimestampMillisecondVector::from_vec(vec![1, 2, 10, 13, 14, 20, 25]);
1288 let tumble_start = UnaryFunc::TumbleWindowFloor {
1289 window_size: Duration::from_millis(10),
1290 start_time: None,
1291 };
1292 let tumble_end = UnaryFunc::TumbleWindowCeiling {
1293 window_size: Duration::from_millis(10),
1294 start_time: None,
1295 };
1296
1297 let len = timestamp_vector.len();
1298 let batch = Batch::try_new(vec![Arc::new(timestamp_vector)], len).unwrap();
1299 let arg = ScalarExpr::Column(0);
1300
1301 let start = tumble_start.eval_batch(&batch, &arg).unwrap();
1302 let end = tumble_end.eval_batch(&batch, &arg).unwrap();
1303 assert_eq!(
1304 start.to_arrow_array().as_ref(),
1305 TimestampMillisecondVector::from_vec(vec![0, 0, 10, 10, 10, 20, 20])
1306 .to_arrow_array()
1307 .as_ref()
1308 );
1309
1310 assert_eq!(
1311 end.to_arrow_array().as_ref(),
1312 TimestampMillisecondVector::from_vec(vec![10, 10, 20, 20, 20, 30, 30])
1313 .to_arrow_array()
1314 .as_ref()
1315 );
1316
1317 let ts_ms_vector = TimestampMillisecondVector::from_vec(vec![1, 2, 10, 13, 14, 20, 25]);
1318 let batch = Batch::try_new(vec![Arc::new(ts_ms_vector)], len).unwrap();
1319
1320 let start = tumble_start.eval_batch(&batch, &arg).unwrap();
1321 let end = tumble_end.eval_batch(&batch, &arg).unwrap();
1322
1323 assert_eq!(
1324 start.to_arrow_array().as_ref(),
1325 TimestampMillisecondVector::from_vec(vec![0, 0, 10, 10, 10, 20, 20])
1326 .to_arrow_array()
1327 .as_ref()
1328 );
1329
1330 assert_eq!(
1331 end.to_arrow_array().as_ref(),
1332 TimestampMillisecondVector::from_vec(vec![10, 10, 20, 20, 20, 30, 30])
1333 .to_arrow_array()
1334 .as_ref()
1335 );
1336 }
1337
1338 #[test]
1339 fn test_num_ops() {
1340 let left = Value::from(10);
1341 let right = Value::from(3);
1342 let res = add::<i32>(left.clone(), right.clone()).unwrap();
1343 assert_eq!(res, Value::from(13));
1344 let res = sub::<i32>(left.clone(), right.clone()).unwrap();
1345 assert_eq!(res, Value::from(7));
1346 let res = mul::<i32>(left.clone(), right.clone()).unwrap();
1347 assert_eq!(res, Value::from(30));
1348 let res = div::<i32>(left.clone(), right.clone()).unwrap();
1349 assert_eq!(res, Value::from(3));
1350 let res = rem::<i32>(left, right).unwrap();
1351 assert_eq!(res, Value::from(1));
1352
1353 let values = vec![Value::from(true), Value::from(false)];
1354 let exprs = vec![ScalarExpr::Column(0), ScalarExpr::Column(1)];
1355 let res = and(&values, &exprs).unwrap();
1356 assert_eq!(res, Value::from(false));
1357 let res = or(&values, &exprs).unwrap();
1358 assert_eq!(res, Value::from(true));
1359 }
1360
1361 #[test]
1364 fn test_binary_func_spec() {
1365 assert_eq!(
1366 BinaryFunc::from_str_expr_and_type(
1367 "add",
1368 &[ScalarExpr::Column(0), ScalarExpr::Column(0)],
1369 &[
1370 Some(ConcreteDataType::int32_datatype()),
1371 Some(ConcreteDataType::int32_datatype())
1372 ]
1373 )
1374 .unwrap(),
1375 (BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature())
1376 );
1377
1378 assert_eq!(
1379 BinaryFunc::from_str_expr_and_type(
1380 "add",
1381 &[ScalarExpr::Column(0), ScalarExpr::Column(0)],
1382 &[Some(ConcreteDataType::int32_datatype()), None]
1383 )
1384 .unwrap(),
1385 (BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature())
1386 );
1387
1388 assert_eq!(
1389 BinaryFunc::from_str_expr_and_type(
1390 "add",
1391 &[ScalarExpr::Column(0), ScalarExpr::Column(0)],
1392 &[Some(ConcreteDataType::int32_datatype()), None]
1393 )
1394 .unwrap(),
1395 (BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature())
1396 );
1397
1398 assert_eq!(
1399 BinaryFunc::from_str_expr_and_type(
1400 "add",
1401 &[ScalarExpr::Column(0), ScalarExpr::Column(0)],
1402 &[Some(ConcreteDataType::int32_datatype()), None]
1403 )
1404 .unwrap(),
1405 (BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature())
1406 );
1407
1408 assert_eq!(
1409 BinaryFunc::from_str_expr_and_type(
1410 "add",
1411 &[
1412 ScalarExpr::Literal(Value::from(1i32), ConcreteDataType::int32_datatype()),
1413 ScalarExpr::Column(0)
1414 ],
1415 &[None, None]
1416 )
1417 .unwrap(),
1418 (BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature())
1419 );
1420
1421 assert_eq!(
1423 BinaryFunc::from_str_expr_and_type(
1424 "equal",
1425 &[
1426 ScalarExpr::Literal(Value::from(1i32), ConcreteDataType::int32_datatype()),
1427 ScalarExpr::Column(0)
1428 ],
1429 &[None, None]
1430 )
1431 .unwrap(),
1432 (
1433 BinaryFunc::Eq,
1434 Signature {
1435 input: smallvec![
1436 ConcreteDataType::int32_datatype(),
1437 ConcreteDataType::int32_datatype()
1438 ],
1439 output: ConcreteDataType::boolean_datatype(),
1440 generic_fn: GenericFn::Eq
1441 }
1442 )
1443 );
1444
1445 matches!(
1446 BinaryFunc::from_str_expr_and_type(
1447 "add",
1448 &[ScalarExpr::Column(0), ScalarExpr::Column(0)],
1449 &[None, None]
1450 ),
1451 Err(Error::InvalidQuery { .. })
1452 );
1453 }
1454
1455 #[test]
1456 fn test_cast_int() {
1457 let interval = cast(
1458 Value::from("1 second"),
1459 &ConcreteDataType::interval_day_time_datatype(),
1460 )
1461 .unwrap();
1462 assert_eq!(
1463 interval,
1464 Value::from(common_time::IntervalDayTime::new(0, 1000))
1465 );
1466 }
1467}