1use std::collections::{BTreeMap, BTreeSet};
18
19use arrow::array::BooleanArray;
20use arrow::buffer::BooleanBuffer;
21use arrow::compute::FilterBuilder;
22use common_telemetry::trace;
23use datatypes::prelude::ConcreteDataType;
24use datatypes::value::Value;
25use datatypes::vectors::{BooleanVector, Helper};
26use itertools::Itertools;
27use snafu::{ensure, OptionExt, ResultExt};
28
29use crate::error::{Error, InvalidQuerySnafu};
30use crate::expr::error::{ArrowSnafu, DataTypeSnafu, EvalError, InternalSnafu, TypeMismatchSnafu};
31use crate::expr::{Batch, InvalidArgumentSnafu, ScalarExpr};
32use crate::repr::{self, value_to_internal_ts, Diff, Row};
33
34#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
58pub struct MapFilterProject {
59 pub expressions: Vec<ScalarExpr>,
64 pub predicates: Vec<(usize, ScalarExpr)>,
77 pub projection: Vec<usize>,
79 pub input_arity: usize,
84}
85
86impl MapFilterProject {
87 pub fn new(input_arity: usize) -> Self {
89 Self {
90 expressions: Vec::new(),
91 predicates: Vec::new(),
92 projection: (0..input_arity).collect(),
93 input_arity,
94 }
95 }
96
97 pub fn get_nth_expr(&self, n: usize) -> Option<ScalarExpr> {
98 let idx = *self.projection.get(n)?;
99 if idx < self.input_arity {
100 Some(ScalarExpr::Column(idx))
101 } else {
102 let mut expr = self.expressions.get(idx - self.input_arity)?;
105 loop {
106 match expr {
107 ScalarExpr::Column(prev) => {
108 if *prev < self.input_arity {
109 return Some(ScalarExpr::Column(*prev));
110 } else {
111 expr = self.expressions.get(*prev - self.input_arity)?;
112 continue;
113 }
114 }
115 _ => return Some(expr.clone()),
116 }
117 }
118 }
119 }
120
121 pub fn output_arity(&self) -> usize {
123 self.projection.len()
124 }
125
126 pub fn compose(before: Self, after: Self) -> Result<Self, Error> {
131 let (m, f, p) = after.into_map_filter_project();
132 before.map(m)?.filter(f)?.project(p)
133 }
134
135 pub fn is_identity(&self) -> bool {
137 self.expressions.is_empty()
138 && self.predicates.is_empty()
139 && self.projection.len() == self.input_arity
141 && self.projection.iter().enumerate().all(|(i, p)| i == *p)
142 }
143
144 pub fn project<I>(mut self, columns: I) -> Result<Self, Error>
173 where
174 I: IntoIterator<Item = usize> + std::fmt::Debug,
175 {
176 self.projection = columns
177 .into_iter()
178 .map(|c| self.projection.get(c).cloned().ok_or(c))
179 .collect::<Result<Vec<_>, _>>()
180 .map_err(|c| {
181 InvalidQuerySnafu {
182 reason: format!(
183 "column index {} out of range, expected at most {} columns",
184 c,
185 self.projection.len()
186 ),
187 }
188 .build()
189 })?;
190 Ok(self)
191 }
192
193 pub fn filter<I>(mut self, predicates: I) -> Result<Self, Error>
220 where
221 I: IntoIterator<Item = ScalarExpr>,
222 {
223 for mut predicate in predicates {
224 predicate.permute(&self.projection[..])?;
226
227 let referred_columns = predicate.get_all_ref_columns();
229 for c in referred_columns.iter() {
230 let cur_row_len = self.input_arity + self.expressions.len();
232 ensure!(
233 *c < cur_row_len,
234 InvalidQuerySnafu {
235 reason: format!(
236 "column index {} out of range, expected at most {} columns",
237 c, cur_row_len
238 )
239 }
240 );
241 }
242
243 let max_support = referred_columns
246 .into_iter()
247 .max()
248 .map(|c| c + 1)
249 .unwrap_or(0);
250 self.predicates.push((max_support, predicate))
251 }
252 self.predicates
254 .sort_by_key(|(position, _predicate)| *position);
255 Ok(self)
256 }
257
258 pub fn map<I>(mut self, expressions: I) -> Result<Self, Error>
292 where
293 I: IntoIterator<Item = ScalarExpr>,
294 {
295 for mut expression in expressions {
296 expression.permute(&self.projection[..])?;
298
299 for c in expression.get_all_ref_columns().into_iter() {
301 let current_row_len = self.input_arity + self.expressions.len();
303 ensure!(
304 c < current_row_len,
305 InvalidQuerySnafu {
306 reason: format!(
307 "column index {} out of range, expected at most {} columns",
308 c, current_row_len
309 )
310 }
311 );
312 }
313
314 self.expressions.push(expression);
316 let cur_expr_col_num = self.input_arity + self.expressions.len() - 1;
318 self.projection.push(cur_expr_col_num);
319 }
320
321 Ok(self)
322 }
323
324 pub fn into_map_filter_project(self) -> (Vec<ScalarExpr>, Vec<ScalarExpr>, Vec<usize>) {
326 let predicates = self
327 .predicates
328 .into_iter()
329 .map(|(_pos, predicate)| predicate)
330 .collect();
331 (self.expressions, predicates, self.projection)
332 }
333
334 pub fn as_map_filter_project(&self) -> (Vec<ScalarExpr>, Vec<ScalarExpr>, Vec<usize>) {
339 self.clone().into_map_filter_project()
340 }
341}
342
343impl MapFilterProject {
344 pub fn into_safe(self) -> SafeMfpPlan {
346 SafeMfpPlan { mfp: self }
347 }
348
349 pub fn optimize(&mut self) {
351 }
353 pub fn get_old_to_new_mapping(&self) -> BTreeMap<usize, usize> {
355 BTreeMap::from_iter(
356 self.projection
357 .clone()
358 .into_iter()
359 .enumerate()
360 .map(|(new, old)| {
361 let mut old = old;
363 while let Some(ScalarExpr::Column(prev)) = if old >= self.input_arity {
369 self.expressions.get(old - self.input_arity)
371 } else {
372 None
375 } {
376 old = *prev;
377 if old < self.input_arity {
378 break;
379 }
380 }
381 (old, new)
382 }),
383 )
384 }
385
386 pub fn demand(&self) -> BTreeSet<usize> {
392 let mut demanded = BTreeSet::new();
393 for (_index, pred) in self.predicates.iter() {
395 demanded.extend(pred.get_all_ref_columns());
396 }
397 demanded.extend(self.projection.iter().cloned());
399
400 for index in (0..self.expressions.len()).rev() {
402 if demanded.contains(&(self.input_arity + index)) {
403 demanded.extend(self.expressions[index].get_all_ref_columns());
404 }
405 }
406
407 demanded.retain(|col| col < &self.input_arity);
409 demanded
410 }
411
412 pub fn permute(
422 &mut self,
423 mut shuffle: BTreeMap<usize, usize>,
424 new_input_arity: usize,
425 ) -> Result<(), Error> {
426 let demand = self.demand();
428 for d in demand {
429 ensure!(
430 shuffle.contains_key(&d),
431 InvalidQuerySnafu {
432 reason: format!(
433 "Demanded column {} is not in shuffle's keys: {:?}",
434 d,
435 shuffle.keys()
436 )
437 }
438 );
439 }
440 ensure!(
441 shuffle.len() <= new_input_arity,
442 InvalidQuerySnafu {
443 reason: format!(
444 "shuffle's length {} is greater than new_input_arity {}",
445 shuffle.len(),
446 self.input_arity
447 )
448 }
449 );
450
451 let (mut map, mut filter, mut project) = self.as_map_filter_project();
453 for index in 0..map.len() {
454 shuffle.insert(self.input_arity + index, new_input_arity + index);
456 }
457
458 for expr in map.iter_mut() {
459 expr.permute_map(&shuffle)?;
460 }
461 for pred in filter.iter_mut() {
462 pred.permute_map(&shuffle)?;
463 }
464 let new_row_len = new_input_arity + map.len();
465 for proj in project.iter_mut() {
466 ensure!(
467 shuffle[proj] < new_row_len,
468 InvalidQuerySnafu {
469 reason: format!(
470 "shuffled column index {} out of range, expected at most {} columns",
471 shuffle[proj], new_row_len
472 )
473 }
474 );
475 *proj = shuffle[proj];
476 }
477 *self = Self::new(new_input_arity)
478 .map(map)?
479 .filter(filter)?
480 .project(project)?;
481 Ok(())
482 }
483}
484
485#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
487pub struct SafeMfpPlan {
488 pub(crate) mfp: MapFilterProject,
490}
491
492impl SafeMfpPlan {
493 pub fn permute(&mut self, map: BTreeMap<usize, usize>, new_arity: usize) -> Result<(), Error> {
495 self.mfp.permute(map, new_arity)
496 }
497
498 pub fn eval_batch_into(&self, batch: &mut Batch) -> Result<Batch, EvalError> {
502 ensure!(
503 batch.column_count() == self.mfp.input_arity,
504 InvalidArgumentSnafu {
505 reason: format!(
506 "batch column length {} is not equal to input_arity {}",
507 batch.column_count(),
508 self.mfp.input_arity
509 ),
510 }
511 );
512
513 let passed_predicates = self.eval_batch_inner(batch)?;
514 let filter = FilterBuilder::new(passed_predicates.as_boolean_array());
515 let pred = filter.build();
516 let mut result = vec![];
517 for col in batch.batch() {
518 let filtered = pred
519 .filter(col.to_arrow_array().as_ref())
520 .with_context(|_| ArrowSnafu {
521 context: format!("failed to filter column for mfp operator {:?}", self),
522 })?;
523 result.push(Helper::try_into_vector(filtered).context(DataTypeSnafu {
524 msg: "Failed to convert arrow array to vector",
525 })?);
526 }
527 let projected = self
528 .mfp
529 .projection
530 .iter()
531 .map(|c| result[*c].clone())
532 .collect_vec();
533 let row_count = pred.count();
534
535 Batch::try_new(projected, row_count)
536 }
537
538 pub fn eval_batch_inner(&self, batch: &mut Batch) -> Result<BooleanVector, EvalError> {
540 let mut expression = 0;
542 let buf = BooleanBuffer::new_set(batch.row_count());
544 let arr = BooleanArray::new(buf, None);
545 let mut all_preds = BooleanVector::from(arr);
546
547 for (support, predicate) in self.mfp.predicates.iter() {
549 while self.mfp.input_arity + expression < *support {
550 let expr_eval = self.mfp.expressions[expression].eval_batch(batch)?;
551 batch.batch_mut().push(expr_eval);
552 expression += 1;
553 }
554 let pred_vec = predicate.eval_batch(batch)?;
555 let pred_arr = pred_vec.to_arrow_array();
556 let pred_arr = pred_arr.as_any().downcast_ref::<BooleanArray>().context({
557 TypeMismatchSnafu {
558 expected: ConcreteDataType::boolean_datatype(),
559 actual: pred_vec.data_type(),
560 }
561 })?;
562 let all_arr = all_preds.as_boolean_array();
563 let res_arr = arrow::compute::and(all_arr, pred_arr).context(ArrowSnafu {
564 context: format!("failed to compute predicate for mfp operator {:?}", self),
565 })?;
566 all_preds = BooleanVector::from(res_arr);
567 }
568
569 while expression < self.mfp.expressions.len() {
571 let expr_eval = self.mfp.expressions[expression].eval_batch(batch)?;
572 batch.batch_mut().push(expr_eval);
573 expression += 1;
574 }
575
576 Ok(all_preds)
577 }
578
579 #[inline(always)]
595 pub fn evaluate_into(
596 &self,
597 values: &mut Vec<Value>,
598 row_buf: &mut Row,
599 ) -> Result<Option<Row>, EvalError> {
600 ensure!(
601 values.len() == self.mfp.input_arity,
602 InvalidArgumentSnafu {
603 reason: format!(
604 "values length {} is not equal to input_arity {}",
605 values.len(),
606 self.mfp.input_arity
607 ),
608 }
609 );
610 let passed_predicates = self.evaluate_inner(values)?;
611
612 if !passed_predicates {
613 Ok(None)
614 } else {
615 row_buf.clear();
616 row_buf.extend(self.mfp.projection.iter().map(|c| values[*c].clone()));
617 Ok(Some(row_buf.clone()))
618 }
619 }
620
621 pub fn evaluate_inner(&self, values: &mut Vec<Value>) -> Result<bool, EvalError> {
625 let mut expression = 0;
626 for (support, predicate) in self.mfp.predicates.iter() {
627 while self.mfp.input_arity + expression < *support {
628 values.push(self.mfp.expressions[expression].eval(&values[..])?);
629 expression += 1;
630 }
631 if predicate.eval(&values[..])? != Value::Boolean(true) {
632 return Ok(false);
633 }
634 }
635 while expression < self.mfp.expressions.len() {
637 values.push(self.mfp.expressions[expression].eval(&values[..])?);
638 expression += 1;
639 }
640 Ok(true)
641 }
642}
643
644impl std::ops::Deref for SafeMfpPlan {
645 type Target = MapFilterProject;
646 fn deref(&self) -> &Self::Target {
647 &self.mfp
648 }
649}
650
651#[derive(Clone, Debug, PartialEq)]
661pub struct MfpPlan {
662 pub(crate) mfp: SafeMfpPlan,
664 pub(crate) lower_bounds: Vec<ScalarExpr>,
667 pub(crate) upper_bounds: Vec<ScalarExpr>,
669}
670
671impl MfpPlan {
672 pub fn is_temporal(&self) -> bool {
674 !self.lower_bounds.is_empty() || !self.upper_bounds.is_empty()
675 }
676 pub fn create_from(mut mfp: MapFilterProject) -> Result<Self, Error> {
678 let mut lower_bounds = Vec::new();
679 let mut upper_bounds = Vec::new();
680
681 let mut temporal = Vec::new();
682
683 mfp.optimize();
685
686 mfp.predicates.retain(|(_position, predicate)| {
687 if predicate.contains_temporal() {
688 temporal.push(predicate.clone());
689 false
690 } else {
691 true
692 }
693 });
694
695 for predicate in temporal {
696 let (lower, upper) = predicate.extract_bound()?;
697 lower_bounds.extend(lower);
698 upper_bounds.extend(upper);
699 }
700 Ok(Self {
701 mfp: SafeMfpPlan { mfp },
702 lower_bounds,
703 upper_bounds,
704 })
705 }
706
707 pub fn is_identity(&self) -> bool {
709 self.mfp.mfp.is_identity() && self.lower_bounds.is_empty() && self.upper_bounds.is_empty()
710 }
711
712 pub fn evaluate<E: From<EvalError>>(
720 &self,
721 values: &mut Vec<Value>,
722 sys_time: repr::Timestamp,
723 diff: Diff,
724 ) -> impl Iterator<Item = Result<(Row, repr::Timestamp, Diff), (E, repr::Timestamp, Diff)>>
725 {
726 match self.mfp.evaluate_inner(values) {
727 Err(e) => {
728 return Some(Err((e.into(), sys_time, diff)))
729 .into_iter()
730 .chain(None);
731 }
732 Ok(true) => {}
733 Ok(false) => {
734 return None.into_iter().chain(None);
735 }
736 }
737
738 let mut lower_bound = sys_time;
739 let mut upper_bound = None;
740
741 let mut null_eval = false;
744 let ret_err = |e: EvalError| {
745 Some(Err((e.into(), sys_time, diff)))
746 .into_iter()
747 .chain(None)
748 };
749 for l in self.lower_bounds.iter() {
750 match l.eval(values) {
751 Ok(v) => {
752 if v.is_null() {
753 null_eval = true;
754 continue;
755 }
756 match value_to_internal_ts(v) {
757 Ok(ts) => lower_bound = lower_bound.max(ts),
758 Err(e) => return ret_err(e),
759 }
760 }
761 Err(e) => return ret_err(e),
762 };
763 }
764
765 for u in self.upper_bounds.iter() {
766 if upper_bound != Some(lower_bound) {
767 match u.eval(values) {
768 Err(e) => return ret_err(e),
769 Ok(val) => {
770 if val.is_null() {
771 null_eval = true;
772 continue;
773 }
774 let ts = match value_to_internal_ts(val) {
775 Ok(ts) => ts,
776 Err(e) => return ret_err(e),
777 };
778 if let Some(upper) = upper_bound {
779 upper_bound = Some(upper.min(ts));
780 } else {
781 upper_bound = Some(ts);
782 }
783 if upper_bound.is_some() && upper_bound < Some(lower_bound) {
786 upper_bound = Some(lower_bound);
787 }
788 }
789 }
790 }
791 }
792
793 if Some(lower_bound) != upper_bound && !null_eval {
794 if self.mfp.mfp.projection.iter().any(|c| values.len() <= *c) {
795 trace!("values={:?}, mfp={:?}", &values, &self.mfp.mfp);
796 let err = InternalSnafu {
797 reason: format!(
798 "Index out of bound for mfp={:?} and values={:?}",
799 &self.mfp.mfp, &values
800 ),
801 }
802 .build();
803 return ret_err(err);
804 }
805 let res_row = Row::pack(self.mfp.mfp.projection.iter().map(|c| values[*c].clone()));
807 let upper_opt =
808 upper_bound.map(|upper_bound| Ok((res_row.clone(), upper_bound, -diff)));
809 let lower = Some(Ok((res_row, lower_bound, diff)));
811
812 lower.into_iter().chain(upper_opt)
813 } else {
814 None.into_iter().chain(None)
815 }
816 }
817}
818
819#[cfg(test)]
820mod test {
821 use std::sync::Arc;
822
823 use datatypes::data_type::ConcreteDataType;
824 use datatypes::vectors::{Int32Vector, Int64Vector};
825 use pretty_assertions::assert_eq;
826
827 use super::*;
828 use crate::expr::{BinaryFunc, UnaryFunc, UnmaterializableFunc};
829
830 #[test]
831 fn test_mfp_with_time() {
832 use crate::expr::func::BinaryFunc;
833 let lte_now = ScalarExpr::Column(0).call_binary(
834 ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now),
835 BinaryFunc::Lte,
836 );
837 assert!(lte_now.contains_temporal());
838
839 let gt_now_minus_two = ScalarExpr::Column(0)
840 .call_binary(
841 ScalarExpr::Literal(Value::from(2i64), ConcreteDataType::int64_datatype()),
842 BinaryFunc::AddInt64,
843 )
844 .call_binary(
845 ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now),
846 BinaryFunc::Gt,
847 );
848 assert!(gt_now_minus_two.contains_temporal());
849
850 let mfp = MapFilterProject::new(3)
851 .filter(vec![
852 lte_now,
854 gt_now_minus_two,
856 ])
857 .unwrap()
858 .project(vec![0])
859 .unwrap();
860
861 let mfp = MfpPlan::create_from(mfp).unwrap();
862 let expected = vec![
863 (
864 0,
865 vec![
866 (Row::new(vec![Value::from(4i64)]), 4, 1),
867 (Row::new(vec![Value::from(4i64)]), 6, -1),
868 ],
869 ),
870 (
871 5,
872 vec![
873 (Row::new(vec![Value::from(4i64)]), 5, 1),
874 (Row::new(vec![Value::from(4i64)]), 6, -1),
875 ],
876 ),
877 (10, vec![]),
878 ];
879 for (sys_time, expected) in expected {
880 let mut values = vec![Value::from(4i64), Value::from(2i64), Value::from(3i64)];
881 let ret = mfp
882 .evaluate::<EvalError>(&mut values, sys_time, 1)
883 .collect::<Result<Vec<_>, _>>()
884 .unwrap();
885 assert_eq!(ret, expected);
886 }
887 }
888
889 #[test]
890 fn test_mfp() {
891 use crate::expr::func::BinaryFunc;
892 let mfp = MapFilterProject::new(3)
893 .map(vec![
894 ScalarExpr::Column(0).call_binary(ScalarExpr::Column(1), BinaryFunc::Lt),
895 ScalarExpr::Column(1).call_binary(ScalarExpr::Column(2), BinaryFunc::Lt),
896 ])
897 .unwrap()
898 .project(vec![3, 4])
899 .unwrap();
900 assert!(!mfp.is_identity());
901 let mfp = MapFilterProject::compose(mfp, MapFilterProject::new(2)).unwrap();
902 {
903 let mfp_0 = mfp.as_map_filter_project();
904 let same = MapFilterProject::new(3)
905 .map(mfp_0.0)
906 .unwrap()
907 .filter(mfp_0.1)
908 .unwrap()
909 .project(mfp_0.2)
910 .unwrap();
911 assert_eq!(mfp, same);
912 }
913 assert_eq!(mfp.demand().len(), 3);
914 let mut mfp = mfp;
915 mfp.permute(BTreeMap::from([(0, 2), (2, 0), (1, 1)]), 3)
916 .unwrap();
917 assert_eq!(
918 mfp,
919 MapFilterProject::new(3)
920 .map(vec![
921 ScalarExpr::Column(2).call_binary(ScalarExpr::Column(1), BinaryFunc::Lt),
922 ScalarExpr::Column(1).call_binary(ScalarExpr::Column(0), BinaryFunc::Lt),
923 ])
924 .unwrap()
925 .project(vec![3, 4])
926 .unwrap()
927 );
928 let safe_mfp = SafeMfpPlan { mfp };
929 let mut values = vec![Value::from(4), Value::from(2), Value::from(3)];
930 let ret = safe_mfp
931 .evaluate_into(&mut values, &mut Row::empty())
932 .unwrap()
933 .unwrap();
934 assert_eq!(ret, Row::pack(vec![Value::from(false), Value::from(true)]));
935 let ty = [
936 ConcreteDataType::int32_datatype(),
937 ConcreteDataType::int32_datatype(),
938 ConcreteDataType::int32_datatype(),
939 ];
940 let mut batch = Batch::try_from_rows_with_types(
942 vec![Row::from(vec![
943 Value::from(4),
944 Value::from(2),
945 Value::from(3),
946 ])],
947 &ty,
948 )
949 .unwrap();
950 let ret = safe_mfp.eval_batch_into(&mut batch).unwrap();
951
952 assert_eq!(
953 ret,
954 Batch::try_from_rows_with_types(
955 vec![Row::from(vec![Value::from(false), Value::from(true)])],
956 &[
957 ConcreteDataType::boolean_datatype(),
958 ConcreteDataType::boolean_datatype(),
959 ],
960 )
961 .unwrap()
962 );
963 }
964
965 #[test]
966 fn manipulation_mfp() {
967 let mfp = MapFilterProject::new(4);
969 let mfp = mfp
971 .map(vec![ScalarExpr::Column(0)
972 .call_binary(ScalarExpr::Column(1), BinaryFunc::AddInt32)
973 .call_binary(ScalarExpr::Column(2), BinaryFunc::AddInt32)])
974 .unwrap();
975 let mfp = mfp.project(vec![4]).unwrap();
977 let mfp = mfp
979 .filter(vec![ScalarExpr::Column(0).call_binary(
980 ScalarExpr::Literal(Value::from(10i32), ConcreteDataType::int32_datatype()),
981 BinaryFunc::Gt,
982 )])
983 .unwrap();
984 let input1 = vec![
985 Value::from(4),
986 Value::from(2),
987 Value::from(3),
988 Value::from("abc"),
989 ];
990 let safe_mfp = SafeMfpPlan { mfp };
991 let ret = safe_mfp
992 .evaluate_into(&mut input1.clone(), &mut Row::empty())
993 .unwrap();
994 assert_eq!(ret, None);
995
996 let input_type = [
997 ConcreteDataType::int32_datatype(),
998 ConcreteDataType::int32_datatype(),
999 ConcreteDataType::int32_datatype(),
1000 ConcreteDataType::string_datatype(),
1001 ];
1002
1003 let mut input1_batch =
1004 Batch::try_from_rows_with_types(vec![Row::new(input1)], &input_type).unwrap();
1005 let ret_batch = safe_mfp.eval_batch_into(&mut input1_batch).unwrap();
1006 assert_eq!(
1007 ret_batch,
1008 Batch::try_new(vec![Arc::new(Int32Vector::from_vec(vec![]))], 0).unwrap()
1009 );
1010
1011 let input2 = vec![
1012 Value::from(5),
1013 Value::from(2),
1014 Value::from(4),
1015 Value::from("abc"),
1016 ];
1017 let ret = safe_mfp
1018 .evaluate_into(&mut input2.clone(), &mut Row::empty())
1019 .unwrap();
1020 assert_eq!(ret, Some(Row::pack(vec![Value::from(11)])));
1021
1022 let mut input2_batch =
1023 Batch::try_from_rows_with_types(vec![Row::new(input2)], &input_type).unwrap();
1024 let ret_batch = safe_mfp.eval_batch_into(&mut input2_batch).unwrap();
1025 assert_eq!(
1026 ret_batch,
1027 Batch::try_new(vec![Arc::new(Int32Vector::from_vec(vec![11]))], 1).unwrap()
1028 );
1029 }
1030
1031 #[test]
1032 fn test_permute() {
1033 let mfp = MapFilterProject::new(3)
1034 .map(vec![
1035 ScalarExpr::Column(0).call_binary(ScalarExpr::Column(1), BinaryFunc::Lt)
1036 ])
1037 .unwrap()
1038 .filter(vec![
1039 ScalarExpr::Column(0).call_binary(ScalarExpr::Column(1), BinaryFunc::Gt)
1040 ])
1041 .unwrap()
1042 .project(vec![0, 1])
1043 .unwrap();
1044 assert_eq!(mfp.demand(), BTreeSet::from([0, 1]));
1045 let mut less = mfp.clone();
1046 less.permute(BTreeMap::from([(1, 0), (0, 1)]), 2).unwrap();
1047
1048 let mut more = mfp.clone();
1049 more.permute(BTreeMap::from([(0, 1), (1, 2), (2, 0)]), 4)
1050 .unwrap();
1051 }
1052
1053 #[test]
1054 fn mfp_test_cast_and_filter() {
1055 let mfp = MapFilterProject::new(3)
1056 .map(vec![ScalarExpr::Column(0).call_unary(UnaryFunc::Cast(
1057 ConcreteDataType::int32_datatype(),
1058 ))])
1059 .unwrap()
1060 .filter(vec![
1061 ScalarExpr::Column(3).call_binary(ScalarExpr::Column(1), BinaryFunc::Gt)
1062 ])
1063 .unwrap()
1064 .project([0, 1, 2])
1065 .unwrap();
1066 let input1 = vec![
1067 Value::from(4i64),
1068 Value::from(2),
1069 Value::from(3),
1070 Value::from(53),
1071 ];
1072 let safe_mfp = SafeMfpPlan { mfp };
1073 let ret = safe_mfp.evaluate_into(&mut input1.clone(), &mut Row::empty());
1074 assert!(matches!(ret, Err(EvalError::InvalidArgument { .. })));
1075
1076 let input_type = [
1077 ConcreteDataType::int64_datatype(),
1078 ConcreteDataType::int32_datatype(),
1079 ConcreteDataType::int32_datatype(),
1080 ConcreteDataType::int32_datatype(),
1081 ];
1082 let mut input1_batch =
1083 Batch::try_from_rows_with_types(vec![Row::new(input1)], &input_type).unwrap();
1084 let ret_batch = safe_mfp.eval_batch_into(&mut input1_batch);
1085 assert!(matches!(ret_batch, Err(EvalError::InvalidArgument { .. })));
1086
1087 let input2 = vec![Value::from(4i64), Value::from(2), Value::from(3)];
1088 let ret = safe_mfp
1089 .evaluate_into(&mut input2.clone(), &mut Row::empty())
1090 .unwrap();
1091 assert_eq!(ret, Some(Row::new(input2.clone())));
1092
1093 let input_type = [
1094 ConcreteDataType::int64_datatype(),
1095 ConcreteDataType::int32_datatype(),
1096 ConcreteDataType::int32_datatype(),
1097 ];
1098 let input2_batch =
1099 Batch::try_from_rows_with_types(vec![Row::new(input2)], &input_type).unwrap();
1100 let ret_batch = safe_mfp.eval_batch_into(&mut input2_batch.clone()).unwrap();
1101 assert_eq!(ret_batch, input2_batch);
1102
1103 let input3 = vec![Value::from(4i64), Value::from(5), Value::from(2)];
1104 let ret = safe_mfp
1105 .evaluate_into(&mut input3.clone(), &mut Row::empty())
1106 .unwrap();
1107 assert_eq!(ret, None);
1108
1109 let input3_batch =
1110 Batch::try_from_rows_with_types(vec![Row::new(input3)], &input_type).unwrap();
1111 let ret_batch = safe_mfp.eval_batch_into(&mut input3_batch.clone()).unwrap();
1112 assert_eq!(
1113 ret_batch,
1114 Batch::try_new(
1115 vec![
1116 Arc::new(Int64Vector::from_vec(Default::default())),
1117 Arc::new(Int32Vector::from_vec(Default::default())),
1118 Arc::new(Int32Vector::from_vec(Default::default()))
1119 ],
1120 0
1121 )
1122 .unwrap()
1123 );
1124 }
1125
1126 #[test]
1127 fn test_mfp_out_of_order() {
1128 let mfp = MapFilterProject::new(3)
1129 .project(vec![2, 1, 0])
1130 .unwrap()
1131 .filter(vec![
1132 ScalarExpr::Column(0).call_binary(ScalarExpr::Column(1), BinaryFunc::Gt)
1133 ])
1134 .unwrap()
1135 .map(vec![
1136 ScalarExpr::Column(0).call_binary(ScalarExpr::Column(1), BinaryFunc::Lt)
1137 ])
1138 .unwrap()
1139 .project(vec![3])
1140 .unwrap();
1141 let input1 = vec![Value::from(2), Value::from(3), Value::from(4)];
1142 let safe_mfp = SafeMfpPlan { mfp };
1143 let ret = safe_mfp.evaluate_into(&mut input1.clone(), &mut Row::empty());
1144 assert_eq!(ret.unwrap(), Some(Row::new(vec![Value::from(false)])));
1145
1146 let input_type = [
1147 ConcreteDataType::int32_datatype(),
1148 ConcreteDataType::int32_datatype(),
1149 ConcreteDataType::int32_datatype(),
1150 ];
1151 let mut input1_batch =
1152 Batch::try_from_rows_with_types(vec![Row::new(input1)], &input_type).unwrap();
1153 let ret_batch = safe_mfp.eval_batch_into(&mut input1_batch).unwrap();
1154
1155 assert_eq!(
1156 ret_batch,
1157 Batch::try_new(vec![Arc::new(BooleanVector::from(vec![false]))], 1).unwrap()
1158 );
1159 }
1160 #[test]
1161 fn test_mfp_chore() {
1162 let mfp = MapFilterProject::new(3)
1164 .project([1, 2, 0])
1165 .unwrap()
1166 .project([1, 2, 0])
1167 .unwrap()
1168 .project([1, 2, 0])
1169 .unwrap();
1170 assert_eq!(mfp, MapFilterProject::new(3));
1171 }
1172}