flow/expr/
linear.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! define MapFilterProject which is a compound operator that can be applied row-by-row.
16
17use std::collections::{BTreeMap, BTreeSet};
18
19use arrow::array::BooleanArray;
20use arrow::buffer::BooleanBuffer;
21use arrow::compute::FilterBuilder;
22use common_telemetry::trace;
23use datatypes::prelude::ConcreteDataType;
24use datatypes::value::Value;
25use datatypes::vectors::{BooleanVector, Helper};
26use itertools::Itertools;
27use snafu::{ensure, OptionExt, ResultExt};
28
29use crate::error::{Error, InvalidQuerySnafu};
30use crate::expr::error::{ArrowSnafu, DataTypeSnafu, EvalError, InternalSnafu, TypeMismatchSnafu};
31use crate::expr::{Batch, InvalidArgumentSnafu, ScalarExpr};
32use crate::repr::{self, value_to_internal_ts, Diff, Row};
33
34/// A compound operator that can be applied row-by-row.
35///
36/// In practice, this operator is a sequence of map, filter, and project in arbitrary order,
37/// which can and is stored by reordering the sequence's
38/// apply order to a `map` first, `filter` second and `project` third order.
39///
40/// input is a row(a sequence of values), which is also being used for store intermediate results,
41/// like `map` operator can append new columns to the row according to it's expressions,
42/// `filter` operator decide whether this entire row can even be output by decide whether the row satisfy the predicates,
43/// `project` operator decide which columns of the row should be output.
44///
45/// This operator integrates the map, filter, and project operators.
46/// It applies a sequences of map expressions, which are allowed to
47/// refer to previous expressions, interleaved with predicates which
48/// must be satisfied for an output to be produced. If all predicates
49/// evaluate to `Value::Boolean(True)` the data at the identified columns are
50/// collected and produced as output in a packed `Row`.
51///
52/// This operator is a "builder" and its contents may contain expressions
53/// that are not yet executable. For example, it may contain temporal
54/// expressions in `self.expressions`, even though this is not something
55/// we can directly evaluate. The plan creation methods will defensively
56/// ensure that the right thing happens.
57#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
58pub struct MapFilterProject {
59    /// A sequence of expressions that should be appended to the row.
60    ///
61    /// Many of these expressions may not be produced in the output,
62    /// and may only be present as common subexpressions.
63    pub expressions: Vec<ScalarExpr>,
64    /// Expressions that must evaluate to `Datum::True` for the output
65    /// row to be produced.
66    ///
67    /// Each entry is prepended with a column identifier indicating
68    /// the column *before* which the predicate should first be applied.
69    /// Most commonly this would be one plus the largest column identifier
70    /// in the predicate's referred columns, but it could be larger to implement
71    /// guarded evaluation of predicates.
72    /// Put it in another word, the first element of the tuple means
73    /// the predicates can't be evaluated until that number of columns is formed.
74    ///
75    /// This list should be sorted by the first field.
76    pub predicates: Vec<(usize, ScalarExpr)>,
77    /// A sequence of column identifiers whose data form the output row.
78    pub projection: Vec<usize>,
79    /// The expected number of input columns.
80    ///
81    /// This is needed to ensure correct identification of newly formed
82    /// columns in the output.
83    pub input_arity: usize,
84}
85
86impl MapFilterProject {
87    /// Create a no-op operator for an input of a supplied arity.
88    pub fn new(input_arity: usize) -> Self {
89        Self {
90            expressions: Vec::new(),
91            predicates: Vec::new(),
92            projection: (0..input_arity).collect(),
93            input_arity,
94        }
95    }
96
97    pub fn get_nth_expr(&self, n: usize) -> Option<ScalarExpr> {
98        let idx = *self.projection.get(n)?;
99        if idx < self.input_arity {
100            Some(ScalarExpr::Column(idx))
101        } else {
102            // find direct ref to input's expr
103
104            let mut expr = self.expressions.get(idx - self.input_arity)?;
105            loop {
106                match expr {
107                    ScalarExpr::Column(prev) => {
108                        if *prev < self.input_arity {
109                            return Some(ScalarExpr::Column(*prev));
110                        } else {
111                            expr = self.expressions.get(*prev - self.input_arity)?;
112                            continue;
113                        }
114                    }
115                    _ => return Some(expr.clone()),
116                }
117            }
118        }
119    }
120
121    /// The number of columns expected in the output row.
122    pub fn output_arity(&self) -> usize {
123        self.projection.len()
124    }
125
126    /// Given two mfps, return an mfp that applies one
127    /// followed by the other.
128    /// Note that the arguments are in the opposite order
129    /// from how function composition is usually written in mathematics.
130    pub fn compose(before: Self, after: Self) -> Result<Self, Error> {
131        let (m, f, p) = after.into_map_filter_project();
132        before.map(m)?.filter(f)?.project(p)
133    }
134
135    /// True if the operator describes the identity transformation.
136    pub fn is_identity(&self) -> bool {
137        self.expressions.is_empty()
138            && self.predicates.is_empty()
139            // identity if projection is the identity permutation
140            && self.projection.len() == self.input_arity
141            && self.projection.iter().enumerate().all(|(i, p)| i == *p)
142    }
143
144    /// Retain only the indicated columns in the presented order.
145    ///
146    /// i.e. before: `self.projection = [1, 2, 0], columns = [1, 0]`
147    /// ```mermaid
148    /// flowchart TD
149    /// col-0
150    /// col-1
151    /// col-2
152    /// projection --> |0|col-1
153    /// projection --> |1|col-2
154    /// projection --> |2|col-0
155    /// ```
156    ///
157    /// after: `self.projection = [2, 1]`
158    /// ```mermaid
159    /// flowchart TD
160    /// col-0
161    /// col-1
162    /// col-2
163    /// project("project:[1,2,0]")
164    /// project
165    /// project -->|0| col-1
166    /// project -->|1| col-2
167    /// project -->|2| col-0
168    /// new_project("apply new project:[1,0]")
169    /// new_project -->|0| col-2
170    /// new_project -->|1| col-1
171    /// ```
172    pub fn project<I>(mut self, columns: I) -> Result<Self, Error>
173    where
174        I: IntoIterator<Item = usize> + std::fmt::Debug,
175    {
176        self.projection = columns
177            .into_iter()
178            .map(|c| self.projection.get(c).cloned().ok_or(c))
179            .collect::<Result<Vec<_>, _>>()
180            .map_err(|c| {
181                InvalidQuerySnafu {
182                    reason: format!(
183                        "column index {} out of range, expected at most {} columns",
184                        c,
185                        self.projection.len()
186                    ),
187                }
188                .build()
189            })?;
190        Ok(self)
191    }
192
193    /// Retain only rows satisfying these predicates.
194    ///
195    /// This method introduces predicates as eagerly as they can be evaluated,
196    /// which may not be desired for predicates that may cause exceptions.
197    /// If fine manipulation is required, the predicates can be added manually.
198    ///
199    /// simply added to the end of the predicates list
200    ///
201    /// while paying attention to column references maintained by `self.projection`
202    ///
203    /// so `self.projection = [1, 2, 0], filter = [0]+[1]>0`:
204    /// becomes:
205    /// ```mermaid
206    /// flowchart TD
207    /// col-0
208    /// col-1
209    /// col-2
210    /// project("first project:[1,2,0]")
211    /// project
212    /// project -->|0| col-1
213    /// project -->|1| col-2
214    /// project -->|2| col-0
215    /// filter("then filter:[0]+[1]>0")
216    /// filter -->|0| col-1
217    /// filter --> |1| col-2
218    /// ```
219    pub fn filter<I>(mut self, predicates: I) -> Result<Self, Error>
220    where
221        I: IntoIterator<Item = ScalarExpr>,
222    {
223        for mut predicate in predicates {
224            // Correct column references.
225            predicate.permute(&self.projection[..])?;
226
227            // Validate column references.
228            let referred_columns = predicate.get_all_ref_columns();
229            for c in referred_columns.iter() {
230                // current row len include input columns and previous number of expressions
231                let cur_row_len = self.input_arity + self.expressions.len();
232                ensure!(
233                    *c < cur_row_len,
234                    InvalidQuerySnafu {
235                        reason: format!(
236                            "column index {} out of range, expected at most {} columns",
237                            c, cur_row_len
238                        )
239                    }
240                );
241            }
242
243            // Insert predicate as eagerly as it can be evaluated:
244            // just after the largest column in its support is formed.
245            let max_support = referred_columns
246                .into_iter()
247                .max()
248                .map(|c| c + 1)
249                .unwrap_or(0);
250            self.predicates.push((max_support, predicate))
251        }
252        // Stable sort predicates by position at which they take effect.
253        self.predicates
254            .sort_by_key(|(position, _predicate)| *position);
255        Ok(self)
256    }
257
258    /// Append the result of evaluating expressions to each row.
259    ///
260    /// simply append `expressions` to `self.expressions`
261    ///
262    /// while paying attention to column references maintained by `self.projection`
263    ///
264    /// hence, before apply map with a previously non-trivial projection would be like:
265    /// before:
266    /// ```mermaid
267    /// flowchart TD
268    /// col-0
269    /// col-1
270    /// col-2
271    /// projection --> |0|col-1
272    /// projection --> |1|col-2
273    /// projection --> |2|col-0
274    /// ```
275    /// after apply map:
276    /// ```mermaid
277    /// flowchart TD
278    /// col-0
279    /// col-1
280    /// col-2
281    /// project("project:[1,2,0]")
282    /// project
283    /// project -->|0| col-1
284    /// project -->|1| col-2
285    /// project -->|2| col-0
286    /// map("map:[0]/[1]/[2]")
287    /// map -->|0|col-1
288    /// map -->|1|col-2
289    /// map -->|2|col-0
290    /// ```
291    pub fn map<I>(mut self, expressions: I) -> Result<Self, Error>
292    where
293        I: IntoIterator<Item = ScalarExpr>,
294    {
295        for mut expression in expressions {
296            // Correct column references.
297            expression.permute(&self.projection[..])?;
298
299            // Validate column references.
300            for c in expression.get_all_ref_columns().into_iter() {
301                // current row len include input columns and previous number of expressions
302                let current_row_len = self.input_arity + self.expressions.len();
303                ensure!(
304                    c < current_row_len,
305                    InvalidQuerySnafu {
306                        reason: format!(
307                            "column index {} out of range, expected at most {} columns",
308                            c, current_row_len
309                        )
310                    }
311                );
312            }
313
314            // Introduce expression and produce as output.
315            self.expressions.push(expression);
316            // Expression by default is projected to output.
317            let cur_expr_col_num = self.input_arity + self.expressions.len() - 1;
318            self.projection.push(cur_expr_col_num);
319        }
320
321        Ok(self)
322    }
323
324    /// Like [`MapFilterProject::as_map_filter_project`], but consumes `self` rather than cloning.
325    pub fn into_map_filter_project(self) -> (Vec<ScalarExpr>, Vec<ScalarExpr>, Vec<usize>) {
326        let predicates = self
327            .predicates
328            .into_iter()
329            .map(|(_pos, predicate)| predicate)
330            .collect();
331        (self.expressions, predicates, self.projection)
332    }
333
334    /// As the arguments to `Map`, `Filter`, and `Project` operators.
335    ///
336    /// In principle, this operator can be implemented as a sequence of
337    /// more elemental operators, likely less efficiently.
338    pub fn as_map_filter_project(&self) -> (Vec<ScalarExpr>, Vec<ScalarExpr>, Vec<usize>) {
339        self.clone().into_map_filter_project()
340    }
341}
342
343impl MapFilterProject {
344    /// Convert the `MapFilterProject` into a safe evaluation plan. Marking it safe to evaluate.
345    pub fn into_safe(self) -> SafeMfpPlan {
346        SafeMfpPlan { mfp: self }
347    }
348
349    /// Optimize the `MapFilterProject` in place.
350    pub fn optimize(&mut self) {
351        // TODO(discord9): optimize
352    }
353    /// get the mapping of old columns to new columns after the mfp
354    pub fn get_old_to_new_mapping(&self) -> BTreeMap<usize, usize> {
355        BTreeMap::from_iter(
356            self.projection
357                .clone()
358                .into_iter()
359                .enumerate()
360                .map(|(new, old)| {
361                    // `projection` give the new -> old mapping
362                    let mut old = old;
363                    // trace back to the original column
364                    // since there maybe indirect ref to old columns like
365                    // col 2 <- expr=col(2) at pos col 4 <- expr=col(4) at pos col 6
366                    // ideally such indirect ref should be optimize away
367                    // TODO(discord9): refactor this after impl `optimize()`
368                    while let Some(ScalarExpr::Column(prev)) = if old >= self.input_arity {
369                        // get the correspond expr if not a original column
370                        self.expressions.get(old - self.input_arity)
371                    } else {
372                        // we don't care about non column ref case since only need old to new column mapping
373                        // in which case, the old->new mapping remain the same
374                        None
375                    } {
376                        old = *prev;
377                        if old < self.input_arity {
378                            break;
379                        }
380                    }
381                    (old, new)
382                }),
383        )
384    }
385
386    /// Lists input columns whose values are used in outputs.
387    ///
388    /// It is entirely appropriate to determine the demand of an instance
389    /// and then both apply a projection to the subject of the instance and
390    /// `self.permute` this instance.
391    pub fn demand(&self) -> BTreeSet<usize> {
392        let mut demanded = BTreeSet::new();
393        // first, get all columns referenced by predicates
394        for (_index, pred) in self.predicates.iter() {
395            demanded.extend(pred.get_all_ref_columns());
396        }
397        // then, get columns referenced by projection which is direct output
398        demanded.extend(self.projection.iter().cloned());
399
400        // check every expressions, if a expression is contained in demanded, then all columns it referenced should be added to demanded
401        for index in (0..self.expressions.len()).rev() {
402            if demanded.contains(&(self.input_arity + index)) {
403                demanded.extend(self.expressions[index].get_all_ref_columns());
404            }
405        }
406
407        // only keep demanded columns that are in input
408        demanded.retain(|col| col < &self.input_arity);
409        demanded
410    }
411
412    /// Update input column references, due to an input projection or permutation.
413    ///
414    /// The `shuffle` argument remaps expected column identifiers to new locations,
415    /// with the expectation that `shuffle` describes all input columns, and so the
416    /// intermediate results will be able to start at position `shuffle.len()`.
417    ///
418    /// The supplied `shuffle` may not list columns that are not "demanded" by the
419    /// instance, and so we should ensure that `self` is optimized to not reference
420    /// columns that are not demanded.
421    pub fn permute(
422        &mut self,
423        mut shuffle: BTreeMap<usize, usize>,
424        new_input_arity: usize,
425    ) -> Result<(), Error> {
426        // check shuffle is valid
427        let demand = self.demand();
428        for d in demand {
429            ensure!(
430                shuffle.contains_key(&d),
431                InvalidQuerySnafu {
432                    reason: format!(
433                        "Demanded column {} is not in shuffle's keys: {:?}",
434                        d,
435                        shuffle.keys()
436                    )
437                }
438            );
439        }
440        ensure!(
441            shuffle.len() <= new_input_arity,
442            InvalidQuerySnafu {
443                reason: format!(
444                    "shuffle's length {} is greater than new_input_arity {}",
445                    shuffle.len(),
446                    self.input_arity
447                )
448            }
449        );
450
451        // decompose self into map, filter, project for ease of manipulation
452        let (mut map, mut filter, mut project) = self.as_map_filter_project();
453        for index in 0..map.len() {
454            // Intermediate columns are just shifted.
455            shuffle.insert(self.input_arity + index, new_input_arity + index);
456        }
457
458        for expr in map.iter_mut() {
459            expr.permute_map(&shuffle)?;
460        }
461        for pred in filter.iter_mut() {
462            pred.permute_map(&shuffle)?;
463        }
464        let new_row_len = new_input_arity + map.len();
465        for proj in project.iter_mut() {
466            ensure!(
467                shuffle[proj] < new_row_len,
468                InvalidQuerySnafu {
469                    reason: format!(
470                        "shuffled column index {} out of range, expected at most {} columns",
471                        shuffle[proj], new_row_len
472                    )
473                }
474            );
475            *proj = shuffle[proj];
476        }
477        *self = Self::new(new_input_arity)
478            .map(map)?
479            .filter(filter)?
480            .project(project)?;
481        Ok(())
482    }
483}
484
485/// A wrapper type which indicates it is safe to simply evaluate all expressions.
486#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
487pub struct SafeMfpPlan {
488    /// the inner `MapFilterProject` that is safe to evaluate.
489    pub(crate) mfp: MapFilterProject,
490}
491
492impl SafeMfpPlan {
493    /// See [`MapFilterProject::permute`].
494    pub fn permute(&mut self, map: BTreeMap<usize, usize>, new_arity: usize) -> Result<(), Error> {
495        self.mfp.permute(map, new_arity)
496    }
497
498    /// similar to [`MapFilterProject::evaluate_into`], just in batch, and rows that don't pass the predicates are not included in the output.
499    ///
500    /// so it's not guaranteed that the output will have the same number of rows as the input.
501    pub fn eval_batch_into(&self, batch: &mut Batch) -> Result<Batch, EvalError> {
502        ensure!(
503            batch.column_count() == self.mfp.input_arity,
504            InvalidArgumentSnafu {
505                reason: format!(
506                    "batch column length {} is not equal to input_arity {}",
507                    batch.column_count(),
508                    self.mfp.input_arity
509                ),
510            }
511        );
512
513        let passed_predicates = self.eval_batch_inner(batch)?;
514        let filter = FilterBuilder::new(passed_predicates.as_boolean_array());
515        let pred = filter.build();
516        let mut result = vec![];
517        for col in batch.batch() {
518            let filtered = pred
519                .filter(col.to_arrow_array().as_ref())
520                .with_context(|_| ArrowSnafu {
521                    context: format!("failed to filter column for mfp operator {:?}", self),
522                })?;
523            result.push(Helper::try_into_vector(filtered).context(DataTypeSnafu {
524                msg: "Failed to convert arrow array to vector",
525            })?);
526        }
527        let projected = self
528            .mfp
529            .projection
530            .iter()
531            .map(|c| result[*c].clone())
532            .collect_vec();
533        let row_count = pred.count();
534
535        Batch::try_new(projected, row_count)
536    }
537
538    /// similar to [`MapFilterProject::evaluate_into`], just in batch.
539    pub fn eval_batch_inner(&self, batch: &mut Batch) -> Result<BooleanVector, EvalError> {
540        // mark the columns that have been evaluated and appended to the `batch`
541        let mut expression = 0;
542        // preds default to true and will be updated as we evaluate each predicate
543        let buf = BooleanBuffer::new_set(batch.row_count());
544        let arr = BooleanArray::new(buf, None);
545        let mut all_preds = BooleanVector::from(arr);
546
547        // to compute predicate, need to first compute all expressions used in predicates
548        for (support, predicate) in self.mfp.predicates.iter() {
549            while self.mfp.input_arity + expression < *support {
550                let expr_eval = self.mfp.expressions[expression].eval_batch(batch)?;
551                batch.batch_mut().push(expr_eval);
552                expression += 1;
553            }
554            let pred_vec = predicate.eval_batch(batch)?;
555            let pred_arr = pred_vec.to_arrow_array();
556            let pred_arr = pred_arr.as_any().downcast_ref::<BooleanArray>().context({
557                TypeMismatchSnafu {
558                    expected: ConcreteDataType::boolean_datatype(),
559                    actual: pred_vec.data_type(),
560                }
561            })?;
562            let all_arr = all_preds.as_boolean_array();
563            let res_arr = arrow::compute::and(all_arr, pred_arr).context(ArrowSnafu {
564                context: format!("failed to compute predicate for mfp operator {:?}", self),
565            })?;
566            all_preds = BooleanVector::from(res_arr);
567        }
568
569        // while evaluated expressions are less than total expressions, keep evaluating
570        while expression < self.mfp.expressions.len() {
571            let expr_eval = self.mfp.expressions[expression].eval_batch(batch)?;
572            batch.batch_mut().push(expr_eval);
573            expression += 1;
574        }
575
576        Ok(all_preds)
577    }
578
579    /// Evaluates the linear operator on a supplied list of datums.
580    ///
581    /// The arguments are the initial datums associated with the row,
582    /// and an appropriately lifetimed arena for temporary allocations
583    /// needed by scalar evaluation.
584    ///
585    /// An `Ok` result will either be `None` if any predicate did not
586    /// evaluate to `Value::Boolean(true)`, or the values of the columns listed
587    /// by `self.projection` if all predicates passed. If an error
588    /// occurs in the evaluation it is returned as an `Err` variant.
589    /// As the evaluation exits early with failed predicates, it may
590    /// miss some errors that would occur later in evaluation.
591    ///
592    /// The `row` is not cleared first, but emptied if the function
593    /// returns `Ok(Some(row)).
594    #[inline(always)]
595    pub fn evaluate_into(
596        &self,
597        values: &mut Vec<Value>,
598        row_buf: &mut Row,
599    ) -> Result<Option<Row>, EvalError> {
600        ensure!(
601            values.len() == self.mfp.input_arity,
602            InvalidArgumentSnafu {
603                reason: format!(
604                    "values length {} is not equal to input_arity {}",
605                    values.len(),
606                    self.mfp.input_arity
607                ),
608            }
609        );
610        let passed_predicates = self.evaluate_inner(values)?;
611
612        if !passed_predicates {
613            Ok(None)
614        } else {
615            row_buf.clear();
616            row_buf.extend(self.mfp.projection.iter().map(|c| values[*c].clone()));
617            Ok(Some(row_buf.clone()))
618        }
619    }
620
621    /// Populates `values` with `self.expressions` and tests `self.predicates`.
622    ///
623    /// This does not apply `self.projection`, which is up to the calling method.
624    pub fn evaluate_inner(&self, values: &mut Vec<Value>) -> Result<bool, EvalError> {
625        let mut expression = 0;
626        for (support, predicate) in self.mfp.predicates.iter() {
627            while self.mfp.input_arity + expression < *support {
628                values.push(self.mfp.expressions[expression].eval(&values[..])?);
629                expression += 1;
630            }
631            if predicate.eval(&values[..])? != Value::Boolean(true) {
632                return Ok(false);
633            }
634        }
635        // while evaluated expressions are less than total expressions, keep evaluating
636        while expression < self.mfp.expressions.len() {
637            values.push(self.mfp.expressions[expression].eval(&values[..])?);
638            expression += 1;
639        }
640        Ok(true)
641    }
642}
643
644impl std::ops::Deref for SafeMfpPlan {
645    type Target = MapFilterProject;
646    fn deref(&self) -> &Self::Target {
647        &self.mfp
648    }
649}
650
651/// Predicates partitioned into temporal and non-temporal.
652///
653/// Temporal predicates require some recognition to determine their
654/// structure, and it is best to do that once and re-use the results.
655///
656/// There are restrictions on the temporal predicates we currently support.
657/// They must directly constrain `MzNow` from below or above,
658/// by expressions that do not themselves contain `MzNow`.
659/// Conjunctions of such constraints are also ok.
660#[derive(Clone, Debug, PartialEq)]
661pub struct MfpPlan {
662    /// Normal predicates to evaluate on `&[Datum]` and expect `Ok(Datum::True)`.
663    pub(crate) mfp: SafeMfpPlan,
664    /// TODO(discord9): impl temporal filter later
665    /// Expressions that when evaluated lower-bound `MzNow`.
666    pub(crate) lower_bounds: Vec<ScalarExpr>,
667    /// Expressions that when evaluated upper-bound `MzNow`.
668    pub(crate) upper_bounds: Vec<ScalarExpr>,
669}
670
671impl MfpPlan {
672    /// Indicates if the `MfpPlan` contains temporal predicates. That is have outputs that may occur in future.
673    pub fn is_temporal(&self) -> bool {
674        !self.lower_bounds.is_empty() || !self.upper_bounds.is_empty()
675    }
676    /// find `now` in `predicates` and put them into lower/upper temporal bounds for temporal filter to use
677    pub fn create_from(mut mfp: MapFilterProject) -> Result<Self, Error> {
678        let mut lower_bounds = Vec::new();
679        let mut upper_bounds = Vec::new();
680
681        let mut temporal = Vec::new();
682
683        // Optimize, to ensure that temporal predicates are move in to `mfp.predicates`.
684        mfp.optimize();
685
686        mfp.predicates.retain(|(_position, predicate)| {
687            if predicate.contains_temporal() {
688                temporal.push(predicate.clone());
689                false
690            } else {
691                true
692            }
693        });
694
695        for predicate in temporal {
696            let (lower, upper) = predicate.extract_bound()?;
697            lower_bounds.extend(lower);
698            upper_bounds.extend(upper);
699        }
700        Ok(Self {
701            mfp: SafeMfpPlan { mfp },
702            lower_bounds,
703            upper_bounds,
704        })
705    }
706
707    /// Indicates if the planned `MapFilterProject` emits exactly its inputs as outputs.
708    pub fn is_identity(&self) -> bool {
709        self.mfp.mfp.is_identity() && self.lower_bounds.is_empty() && self.upper_bounds.is_empty()
710    }
711
712    /// if `lower_bound <= sys_time < upper_bound`, return `[(data, sys_time, +1), (data, min_upper_bound, -1)]`
713    ///
714    /// else if `sys_time < lower_bound`, return `[(data, lower_bound, +1), (data, min_upper_bound, -1)]`
715    ///
716    /// else if `sys_time >= upper_bound`, return `[None, None]`
717    ///
718    /// if eval error appeal in any of those process, corresponding result will be `Err`
719    pub fn evaluate<E: From<EvalError>>(
720        &self,
721        values: &mut Vec<Value>,
722        sys_time: repr::Timestamp,
723        diff: Diff,
724    ) -> impl Iterator<Item = Result<(Row, repr::Timestamp, Diff), (E, repr::Timestamp, Diff)>>
725    {
726        match self.mfp.evaluate_inner(values) {
727            Err(e) => {
728                return Some(Err((e.into(), sys_time, diff)))
729                    .into_iter()
730                    .chain(None);
731            }
732            Ok(true) => {}
733            Ok(false) => {
734                return None.into_iter().chain(None);
735            }
736        }
737
738        let mut lower_bound = sys_time;
739        let mut upper_bound = None;
740
741        // Track whether we have seen a null in either bound, as this should
742        // prevent the record from being produced at any time.
743        let mut null_eval = false;
744        let ret_err = |e: EvalError| {
745            Some(Err((e.into(), sys_time, diff)))
746                .into_iter()
747                .chain(None)
748        };
749        for l in self.lower_bounds.iter() {
750            match l.eval(values) {
751                Ok(v) => {
752                    if v.is_null() {
753                        null_eval = true;
754                        continue;
755                    }
756                    match value_to_internal_ts(v) {
757                        Ok(ts) => lower_bound = lower_bound.max(ts),
758                        Err(e) => return ret_err(e),
759                    }
760                }
761                Err(e) => return ret_err(e),
762            };
763        }
764
765        for u in self.upper_bounds.iter() {
766            if upper_bound != Some(lower_bound) {
767                match u.eval(values) {
768                    Err(e) => return ret_err(e),
769                    Ok(val) => {
770                        if val.is_null() {
771                            null_eval = true;
772                            continue;
773                        }
774                        let ts = match value_to_internal_ts(val) {
775                            Ok(ts) => ts,
776                            Err(e) => return ret_err(e),
777                        };
778                        if let Some(upper) = upper_bound {
779                            upper_bound = Some(upper.min(ts));
780                        } else {
781                            upper_bound = Some(ts);
782                        }
783                        // Force the upper bound to be at least the lower
784                        // bound.
785                        if upper_bound.is_some() && upper_bound < Some(lower_bound) {
786                            upper_bound = Some(lower_bound);
787                        }
788                    }
789                }
790            }
791        }
792
793        if Some(lower_bound) != upper_bound && !null_eval {
794            if self.mfp.mfp.projection.iter().any(|c| values.len() <= *c) {
795                trace!("values={:?}, mfp={:?}", &values, &self.mfp.mfp);
796                let err = InternalSnafu {
797                    reason: format!(
798                        "Index out of bound for mfp={:?} and values={:?}",
799                        &self.mfp.mfp, &values
800                    ),
801                }
802                .build();
803                return ret_err(err);
804            }
805            // safety: already checked that `projection` is not out of bound
806            let res_row = Row::pack(self.mfp.mfp.projection.iter().map(|c| values[*c].clone()));
807            let upper_opt =
808                upper_bound.map(|upper_bound| Ok((res_row.clone(), upper_bound, -diff)));
809            // if diff==-1, the `upper_opt` will cancel the future `-1` inserted before by previous diff==1 row
810            let lower = Some(Ok((res_row, lower_bound, diff)));
811
812            lower.into_iter().chain(upper_opt)
813        } else {
814            None.into_iter().chain(None)
815        }
816    }
817}
818
819#[cfg(test)]
820mod test {
821    use std::sync::Arc;
822
823    use datatypes::data_type::ConcreteDataType;
824    use datatypes::vectors::{Int32Vector, Int64Vector};
825    use pretty_assertions::assert_eq;
826
827    use super::*;
828    use crate::expr::{BinaryFunc, UnaryFunc, UnmaterializableFunc};
829
830    #[test]
831    fn test_mfp_with_time() {
832        use crate::expr::func::BinaryFunc;
833        let lte_now = ScalarExpr::Column(0).call_binary(
834            ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now),
835            BinaryFunc::Lte,
836        );
837        assert!(lte_now.contains_temporal());
838
839        let gt_now_minus_two = ScalarExpr::Column(0)
840            .call_binary(
841                ScalarExpr::Literal(Value::from(2i64), ConcreteDataType::int64_datatype()),
842                BinaryFunc::AddInt64,
843            )
844            .call_binary(
845                ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now),
846                BinaryFunc::Gt,
847            );
848        assert!(gt_now_minus_two.contains_temporal());
849
850        let mfp = MapFilterProject::new(3)
851            .filter(vec![
852                // col(0) <= now()
853                lte_now,
854                // col(0) + 2 > now()
855                gt_now_minus_two,
856            ])
857            .unwrap()
858            .project(vec![0])
859            .unwrap();
860
861        let mfp = MfpPlan::create_from(mfp).unwrap();
862        let expected = vec![
863            (
864                0,
865                vec![
866                    (Row::new(vec![Value::from(4i64)]), 4, 1),
867                    (Row::new(vec![Value::from(4i64)]), 6, -1),
868                ],
869            ),
870            (
871                5,
872                vec![
873                    (Row::new(vec![Value::from(4i64)]), 5, 1),
874                    (Row::new(vec![Value::from(4i64)]), 6, -1),
875                ],
876            ),
877            (10, vec![]),
878        ];
879        for (sys_time, expected) in expected {
880            let mut values = vec![Value::from(4i64), Value::from(2i64), Value::from(3i64)];
881            let ret = mfp
882                .evaluate::<EvalError>(&mut values, sys_time, 1)
883                .collect::<Result<Vec<_>, _>>()
884                .unwrap();
885            assert_eq!(ret, expected);
886        }
887    }
888
889    #[test]
890    fn test_mfp() {
891        use crate::expr::func::BinaryFunc;
892        let mfp = MapFilterProject::new(3)
893            .map(vec![
894                ScalarExpr::Column(0).call_binary(ScalarExpr::Column(1), BinaryFunc::Lt),
895                ScalarExpr::Column(1).call_binary(ScalarExpr::Column(2), BinaryFunc::Lt),
896            ])
897            .unwrap()
898            .project(vec![3, 4])
899            .unwrap();
900        assert!(!mfp.is_identity());
901        let mfp = MapFilterProject::compose(mfp, MapFilterProject::new(2)).unwrap();
902        {
903            let mfp_0 = mfp.as_map_filter_project();
904            let same = MapFilterProject::new(3)
905                .map(mfp_0.0)
906                .unwrap()
907                .filter(mfp_0.1)
908                .unwrap()
909                .project(mfp_0.2)
910                .unwrap();
911            assert_eq!(mfp, same);
912        }
913        assert_eq!(mfp.demand().len(), 3);
914        let mut mfp = mfp;
915        mfp.permute(BTreeMap::from([(0, 2), (2, 0), (1, 1)]), 3)
916            .unwrap();
917        assert_eq!(
918            mfp,
919            MapFilterProject::new(3)
920                .map(vec![
921                    ScalarExpr::Column(2).call_binary(ScalarExpr::Column(1), BinaryFunc::Lt),
922                    ScalarExpr::Column(1).call_binary(ScalarExpr::Column(0), BinaryFunc::Lt),
923                ])
924                .unwrap()
925                .project(vec![3, 4])
926                .unwrap()
927        );
928        let safe_mfp = SafeMfpPlan { mfp };
929        let mut values = vec![Value::from(4), Value::from(2), Value::from(3)];
930        let ret = safe_mfp
931            .evaluate_into(&mut values, &mut Row::empty())
932            .unwrap()
933            .unwrap();
934        assert_eq!(ret, Row::pack(vec![Value::from(false), Value::from(true)]));
935        let ty = [
936            ConcreteDataType::int32_datatype(),
937            ConcreteDataType::int32_datatype(),
938            ConcreteDataType::int32_datatype(),
939        ];
940        // batch mode
941        let mut batch = Batch::try_from_rows_with_types(
942            vec![Row::from(vec![
943                Value::from(4),
944                Value::from(2),
945                Value::from(3),
946            ])],
947            &ty,
948        )
949        .unwrap();
950        let ret = safe_mfp.eval_batch_into(&mut batch).unwrap();
951
952        assert_eq!(
953            ret,
954            Batch::try_from_rows_with_types(
955                vec![Row::from(vec![Value::from(false), Value::from(true)])],
956                &[
957                    ConcreteDataType::boolean_datatype(),
958                    ConcreteDataType::boolean_datatype(),
959                ],
960            )
961            .unwrap()
962        );
963    }
964
965    #[test]
966    fn manipulation_mfp() {
967        // give a input of 4 columns
968        let mfp = MapFilterProject::new(4);
969        // append a expression to the mfp'input row that get the sum of the first 3 columns
970        let mfp = mfp
971            .map(vec![ScalarExpr::Column(0)
972                .call_binary(ScalarExpr::Column(1), BinaryFunc::AddInt32)
973                .call_binary(ScalarExpr::Column(2), BinaryFunc::AddInt32)])
974            .unwrap();
975        // only retain sum result
976        let mfp = mfp.project(vec![4]).unwrap();
977        // accept only if the sum is greater than 10
978        let mfp = mfp
979            .filter(vec![ScalarExpr::Column(0).call_binary(
980                ScalarExpr::Literal(Value::from(10i32), ConcreteDataType::int32_datatype()),
981                BinaryFunc::Gt,
982            )])
983            .unwrap();
984        let input1 = vec![
985            Value::from(4),
986            Value::from(2),
987            Value::from(3),
988            Value::from("abc"),
989        ];
990        let safe_mfp = SafeMfpPlan { mfp };
991        let ret = safe_mfp
992            .evaluate_into(&mut input1.clone(), &mut Row::empty())
993            .unwrap();
994        assert_eq!(ret, None);
995
996        let input_type = [
997            ConcreteDataType::int32_datatype(),
998            ConcreteDataType::int32_datatype(),
999            ConcreteDataType::int32_datatype(),
1000            ConcreteDataType::string_datatype(),
1001        ];
1002
1003        let mut input1_batch =
1004            Batch::try_from_rows_with_types(vec![Row::new(input1)], &input_type).unwrap();
1005        let ret_batch = safe_mfp.eval_batch_into(&mut input1_batch).unwrap();
1006        assert_eq!(
1007            ret_batch,
1008            Batch::try_new(vec![Arc::new(Int32Vector::from_vec(vec![]))], 0).unwrap()
1009        );
1010
1011        let input2 = vec![
1012            Value::from(5),
1013            Value::from(2),
1014            Value::from(4),
1015            Value::from("abc"),
1016        ];
1017        let ret = safe_mfp
1018            .evaluate_into(&mut input2.clone(), &mut Row::empty())
1019            .unwrap();
1020        assert_eq!(ret, Some(Row::pack(vec![Value::from(11)])));
1021
1022        let mut input2_batch =
1023            Batch::try_from_rows_with_types(vec![Row::new(input2)], &input_type).unwrap();
1024        let ret_batch = safe_mfp.eval_batch_into(&mut input2_batch).unwrap();
1025        assert_eq!(
1026            ret_batch,
1027            Batch::try_new(vec![Arc::new(Int32Vector::from_vec(vec![11]))], 1).unwrap()
1028        );
1029    }
1030
1031    #[test]
1032    fn test_permute() {
1033        let mfp = MapFilterProject::new(3)
1034            .map(vec![
1035                ScalarExpr::Column(0).call_binary(ScalarExpr::Column(1), BinaryFunc::Lt)
1036            ])
1037            .unwrap()
1038            .filter(vec![
1039                ScalarExpr::Column(0).call_binary(ScalarExpr::Column(1), BinaryFunc::Gt)
1040            ])
1041            .unwrap()
1042            .project(vec![0, 1])
1043            .unwrap();
1044        assert_eq!(mfp.demand(), BTreeSet::from([0, 1]));
1045        let mut less = mfp.clone();
1046        less.permute(BTreeMap::from([(1, 0), (0, 1)]), 2).unwrap();
1047
1048        let mut more = mfp.clone();
1049        more.permute(BTreeMap::from([(0, 1), (1, 2), (2, 0)]), 4)
1050            .unwrap();
1051    }
1052
1053    #[test]
1054    fn mfp_test_cast_and_filter() {
1055        let mfp = MapFilterProject::new(3)
1056            .map(vec![ScalarExpr::Column(0).call_unary(UnaryFunc::Cast(
1057                ConcreteDataType::int32_datatype(),
1058            ))])
1059            .unwrap()
1060            .filter(vec![
1061                ScalarExpr::Column(3).call_binary(ScalarExpr::Column(1), BinaryFunc::Gt)
1062            ])
1063            .unwrap()
1064            .project([0, 1, 2])
1065            .unwrap();
1066        let input1 = vec![
1067            Value::from(4i64),
1068            Value::from(2),
1069            Value::from(3),
1070            Value::from(53),
1071        ];
1072        let safe_mfp = SafeMfpPlan { mfp };
1073        let ret = safe_mfp.evaluate_into(&mut input1.clone(), &mut Row::empty());
1074        assert!(matches!(ret, Err(EvalError::InvalidArgument { .. })));
1075
1076        let input_type = [
1077            ConcreteDataType::int64_datatype(),
1078            ConcreteDataType::int32_datatype(),
1079            ConcreteDataType::int32_datatype(),
1080            ConcreteDataType::int32_datatype(),
1081        ];
1082        let mut input1_batch =
1083            Batch::try_from_rows_with_types(vec![Row::new(input1)], &input_type).unwrap();
1084        let ret_batch = safe_mfp.eval_batch_into(&mut input1_batch);
1085        assert!(matches!(ret_batch, Err(EvalError::InvalidArgument { .. })));
1086
1087        let input2 = vec![Value::from(4i64), Value::from(2), Value::from(3)];
1088        let ret = safe_mfp
1089            .evaluate_into(&mut input2.clone(), &mut Row::empty())
1090            .unwrap();
1091        assert_eq!(ret, Some(Row::new(input2.clone())));
1092
1093        let input_type = [
1094            ConcreteDataType::int64_datatype(),
1095            ConcreteDataType::int32_datatype(),
1096            ConcreteDataType::int32_datatype(),
1097        ];
1098        let input2_batch =
1099            Batch::try_from_rows_with_types(vec![Row::new(input2)], &input_type).unwrap();
1100        let ret_batch = safe_mfp.eval_batch_into(&mut input2_batch.clone()).unwrap();
1101        assert_eq!(ret_batch, input2_batch);
1102
1103        let input3 = vec![Value::from(4i64), Value::from(5), Value::from(2)];
1104        let ret = safe_mfp
1105            .evaluate_into(&mut input3.clone(), &mut Row::empty())
1106            .unwrap();
1107        assert_eq!(ret, None);
1108
1109        let input3_batch =
1110            Batch::try_from_rows_with_types(vec![Row::new(input3)], &input_type).unwrap();
1111        let ret_batch = safe_mfp.eval_batch_into(&mut input3_batch.clone()).unwrap();
1112        assert_eq!(
1113            ret_batch,
1114            Batch::try_new(
1115                vec![
1116                    Arc::new(Int64Vector::from_vec(Default::default())),
1117                    Arc::new(Int32Vector::from_vec(Default::default())),
1118                    Arc::new(Int32Vector::from_vec(Default::default()))
1119                ],
1120                0
1121            )
1122            .unwrap()
1123        );
1124    }
1125
1126    #[test]
1127    fn test_mfp_out_of_order() {
1128        let mfp = MapFilterProject::new(3)
1129            .project(vec![2, 1, 0])
1130            .unwrap()
1131            .filter(vec![
1132                ScalarExpr::Column(0).call_binary(ScalarExpr::Column(1), BinaryFunc::Gt)
1133            ])
1134            .unwrap()
1135            .map(vec![
1136                ScalarExpr::Column(0).call_binary(ScalarExpr::Column(1), BinaryFunc::Lt)
1137            ])
1138            .unwrap()
1139            .project(vec![3])
1140            .unwrap();
1141        let input1 = vec![Value::from(2), Value::from(3), Value::from(4)];
1142        let safe_mfp = SafeMfpPlan { mfp };
1143        let ret = safe_mfp.evaluate_into(&mut input1.clone(), &mut Row::empty());
1144        assert_eq!(ret.unwrap(), Some(Row::new(vec![Value::from(false)])));
1145
1146        let input_type = [
1147            ConcreteDataType::int32_datatype(),
1148            ConcreteDataType::int32_datatype(),
1149            ConcreteDataType::int32_datatype(),
1150        ];
1151        let mut input1_batch =
1152            Batch::try_from_rows_with_types(vec![Row::new(input1)], &input_type).unwrap();
1153        let ret_batch = safe_mfp.eval_batch_into(&mut input1_batch).unwrap();
1154
1155        assert_eq!(
1156            ret_batch,
1157            Batch::try_new(vec![Arc::new(BooleanVector::from(vec![false]))], 1).unwrap()
1158        );
1159    }
1160    #[test]
1161    fn test_mfp_chore() {
1162        // project keeps permute columns until it becomes the identity permutation
1163        let mfp = MapFilterProject::new(3)
1164            .project([1, 2, 0])
1165            .unwrap()
1166            .project([1, 2, 0])
1167            .unwrap()
1168            .project([1, 2, 0])
1169            .unwrap();
1170        assert_eq!(mfp, MapFilterProject::new(3));
1171    }
1172}