flow/
expr.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! for declare Expression in dataflow, including map, reduce, id and join(TODO!) etc.
16
17mod df_func;
18pub(crate) mod error;
19pub(crate) mod func;
20mod id;
21mod linear;
22pub(crate) mod relation;
23mod scalar;
24mod signature;
25pub(crate) mod utils;
26
27use arrow::compute::FilterBuilder;
28use common_recordbatch::RecordBatch;
29use datatypes::prelude::{ConcreteDataType, DataType};
30use datatypes::value::Value;
31use datatypes::vectors::{BooleanVector, Helper, VectorRef};
32pub(crate) use df_func::{DfScalarFunction, RawDfScalarFn};
33pub(crate) use error::{EvalError, InvalidArgumentSnafu};
34pub(crate) use func::{BinaryFunc, UnaryFunc, UnmaterializableFunc, VariadicFunc};
35pub(crate) use id::{GlobalId, Id, LocalId};
36use itertools::Itertools;
37pub(crate) use linear::{MapFilterProject, MfpPlan, SafeMfpPlan};
38pub(crate) use relation::{Accum, Accumulator, AggregateExpr, AggregateFunc};
39pub(crate) use scalar::{ScalarExpr, TypedExpr};
40use snafu::{ResultExt, ensure};
41
42use crate::Error;
43use crate::error::DatatypesSnafu;
44use crate::expr::error::{ArrowSnafu, DataTypeSnafu};
45use crate::repr::Diff;
46
47pub const TUMBLE_START: &str = "tumble_start";
48pub const TUMBLE_END: &str = "tumble_end";
49
50/// A batch of vectors with the same length but without schema, only useful in dataflow
51///
52/// somewhere cheap to clone since it just contains a list of VectorRef(which is a `Arc`).
53#[derive(Debug, Clone)]
54pub struct Batch {
55    batch: Vec<VectorRef>,
56    row_count: usize,
57    /// describe if corresponding rows in batch is insert or delete, None means all rows are insert
58    diffs: Option<VectorRef>,
59}
60
61impl TryFrom<RecordBatch> for Batch {
62    type Error = Error;
63
64    fn try_from(value: RecordBatch) -> Result<Self, Self::Error> {
65        let columns = value.columns();
66        let batch = Helper::try_into_vectors(columns).context(DatatypesSnafu {
67            extra: "failed to convert Arrow array to vector when building Flow batch",
68        })?;
69        Ok(Self {
70            row_count: value.num_rows(),
71            batch,
72            diffs: None,
73        })
74    }
75}
76
77impl PartialEq for Batch {
78    fn eq(&self, other: &Self) -> bool {
79        let mut batch_eq = true;
80        if self.batch.len() != other.batch.len() {
81            return false;
82        }
83        for (left, right) in self.batch.iter().zip(other.batch.iter()) {
84            batch_eq = batch_eq
85                && <dyn arrow::array::Array>::eq(&left.to_arrow_array(), &right.to_arrow_array());
86        }
87
88        let diff_eq = match (&self.diffs, &other.diffs) {
89            (Some(left), Some(right)) => {
90                <dyn arrow::array::Array>::eq(&left.to_arrow_array(), &right.to_arrow_array())
91            }
92            (None, None) => true,
93            _ => false,
94        };
95        batch_eq && diff_eq && self.row_count == other.row_count
96    }
97}
98
99impl Eq for Batch {}
100
101impl Default for Batch {
102    fn default() -> Self {
103        Self::empty()
104    }
105}
106
107impl Batch {
108    /// Get batch from rows, will try best to determine data type
109    pub fn try_from_rows_with_types(
110        rows: Vec<crate::repr::Row>,
111        batch_datatypes: &[ConcreteDataType],
112    ) -> Result<Self, EvalError> {
113        if rows.is_empty() {
114            return Ok(Self::empty());
115        }
116        let len = rows.len();
117        let mut builder = batch_datatypes
118            .iter()
119            .map(|ty| ty.create_mutable_vector(len))
120            .collect_vec();
121        for row in rows {
122            ensure!(
123                row.len() == builder.len(),
124                InvalidArgumentSnafu {
125                    reason: format!(
126                        "row length not match, expect {}, found {}",
127                        builder.len(),
128                        row.len()
129                    )
130                }
131            );
132            for (idx, value) in row.iter().enumerate() {
133                builder[idx]
134                    .try_push_value_ref(&value.as_value_ref())
135                    .context(DataTypeSnafu {
136                        msg: "Failed to convert rows to columns",
137                    })?;
138            }
139        }
140
141        let columns = builder.into_iter().map(|mut b| b.to_vector()).collect_vec();
142        let batch = Self::try_new(columns, len)?;
143        Ok(batch)
144    }
145
146    pub fn empty() -> Self {
147        Self {
148            batch: vec![],
149            row_count: 0,
150            diffs: None,
151        }
152    }
153    pub fn try_new(batch: Vec<VectorRef>, row_count: usize) -> Result<Self, EvalError> {
154        ensure!(
155            batch.iter().map(|v| v.len()).all_equal()
156                && batch.first().map(|v| v.len() == row_count).unwrap_or(true),
157            InvalidArgumentSnafu {
158                reason: "All columns should have same length".to_string()
159            }
160        );
161        Ok(Self {
162            batch,
163            row_count,
164            diffs: None,
165        })
166    }
167
168    pub fn new_unchecked(batch: Vec<VectorRef>, row_count: usize) -> Self {
169        Self {
170            batch,
171            row_count,
172            diffs: None,
173        }
174    }
175
176    pub fn batch(&self) -> &[VectorRef] {
177        &self.batch
178    }
179
180    pub fn batch_mut(&mut self) -> &mut Vec<VectorRef> {
181        &mut self.batch
182    }
183
184    pub fn row_count(&self) -> usize {
185        self.row_count
186    }
187
188    pub fn set_row_count(&mut self, row_count: usize) {
189        self.row_count = row_count;
190    }
191
192    pub fn column_count(&self) -> usize {
193        self.batch.len()
194    }
195
196    pub fn get_row(&self, idx: usize) -> Result<Vec<Value>, EvalError> {
197        ensure!(
198            idx < self.row_count,
199            InvalidArgumentSnafu {
200                reason: format!(
201                    "Expect row index to be less than {}, found {}",
202                    self.row_count, idx
203                )
204            }
205        );
206        let mut ret = Vec::with_capacity(self.column_count());
207        ret.extend(self.batch.iter().map(|v| v.get(idx)));
208        Ok(ret)
209    }
210
211    /// Slices the `Batch`, returning a new `Batch`.
212    pub fn slice(&self, offset: usize, length: usize) -> Result<Batch, EvalError> {
213        let batch = self
214            .batch()
215            .iter()
216            .map(|v| v.slice(offset, length))
217            .collect_vec();
218        Batch::try_new(batch, length)
219    }
220
221    /// append another batch to self
222    ///
223    /// NOTE: This is expensive since it will create new vectors for each column
224    pub fn append_batch(&mut self, other: Batch) -> Result<(), EvalError> {
225        ensure!(
226            self.batch.len() == other.batch.len()
227                || self.batch.is_empty()
228                || other.batch.is_empty(),
229            InvalidArgumentSnafu {
230                reason: format!(
231                    "Expect two batch to have same numbers of column, found {} and {} columns",
232                    self.batch.len(),
233                    other.batch.len()
234                )
235            }
236        );
237
238        if self.batch.is_empty() {
239            self.batch = other.batch;
240            self.row_count = other.row_count;
241            return Ok(());
242        } else if other.batch.is_empty() {
243            return Ok(());
244        }
245
246        let dts = {
247            let max_len = self.batch.len().max(other.batch.len());
248            let mut dts = Vec::with_capacity(max_len);
249            for i in 0..max_len {
250                if let Some(v) = self.batch().get(i)
251                    && !v.data_type().is_null()
252                {
253                    dts.push(v.data_type())
254                } else if let Some(v) = other.batch().get(i)
255                    && !v.data_type().is_null()
256                {
257                    dts.push(v.data_type())
258                } else {
259                    // both are null, so we will push null type
260                    dts.push(datatypes::prelude::ConcreteDataType::null_datatype())
261                }
262            }
263
264            dts
265        };
266
267        let batch_builders = dts
268            .iter()
269            .map(|dt| dt.create_mutable_vector(self.row_count() + other.row_count()))
270            .collect_vec();
271
272        let mut result = vec![];
273        let self_row_count = self.row_count();
274        let other_row_count = other.row_count();
275        for (idx, mut builder) in batch_builders.into_iter().enumerate() {
276            builder
277                .extend_slice_of(self.batch()[idx].as_ref(), 0, self_row_count)
278                .context(DataTypeSnafu {
279                    msg: "Failed to extend vector",
280                })?;
281            builder
282                .extend_slice_of(other.batch()[idx].as_ref(), 0, other_row_count)
283                .context(DataTypeSnafu {
284                    msg: "Failed to extend vector",
285                })?;
286            result.push(builder.to_vector());
287        }
288        self.batch = result;
289        self.row_count = self_row_count + other_row_count;
290        Ok(())
291    }
292
293    /// filter the batch with given predicate
294    pub fn filter(&self, predicate: &BooleanVector) -> Result<Self, EvalError> {
295        let len = predicate.as_boolean_array().true_count();
296        let filter_builder = FilterBuilder::new(predicate.as_boolean_array()).optimize();
297        let filter_pred = filter_builder.build();
298        let filtered = self
299            .batch()
300            .iter()
301            .map(|col| filter_pred.filter(col.to_arrow_array().as_ref()))
302            .try_collect::<_, Vec<_>, _>()
303            .context(ArrowSnafu {
304                context: "Failed to filter val batches",
305            })?;
306        let res_vector = Helper::try_into_vectors(&filtered).context(DataTypeSnafu {
307            msg: "can't convert arrow array to vector",
308        })?;
309        Self::try_new(res_vector, len)
310    }
311}
312
313/// Vector with diff to note the insert and delete
314pub(crate) struct VectorDiff {
315    vector: VectorRef,
316    diff: Option<VectorRef>,
317}
318
319impl From<VectorRef> for VectorDiff {
320    fn from(vector: VectorRef) -> Self {
321        Self { vector, diff: None }
322    }
323}
324
325impl VectorDiff {
326    fn len(&self) -> usize {
327        self.vector.len()
328    }
329
330    fn try_new(vector: VectorRef, diff: Option<VectorRef>) -> Result<Self, EvalError> {
331        ensure!(
332            diff.as_ref().is_none_or(|diff| diff.len() == vector.len()),
333            InvalidArgumentSnafu {
334                reason: "Length of vector and diff should be the same"
335            }
336        );
337        Ok(Self { vector, diff })
338    }
339}
340
341impl IntoIterator for VectorDiff {
342    type Item = (Value, Diff);
343    type IntoIter = VectorDiffIter;
344
345    fn into_iter(self) -> Self::IntoIter {
346        VectorDiffIter {
347            vector: self.vector,
348            diff: self.diff,
349            idx: 0,
350        }
351    }
352}
353
354/// iterator for VectorDiff
355pub(crate) struct VectorDiffIter {
356    vector: VectorRef,
357    diff: Option<VectorRef>,
358    idx: usize,
359}
360
361impl std::iter::Iterator for VectorDiffIter {
362    type Item = (Value, Diff);
363
364    fn next(&mut self) -> Option<Self::Item> {
365        if self.idx >= self.vector.len() {
366            return None;
367        }
368        let value = self.vector.get(self.idx);
369        // +1 means insert, -1 means delete, and default to +1 insert when diff is not provided
370        let diff = if let Some(diff) = self.diff.as_ref() {
371            if let Ok(diff_at) = diff.get(self.idx).try_into() {
372                diff_at
373            } else {
374                common_telemetry::warn!("Invalid diff value at index {}", self.idx);
375                return None;
376            }
377        } else {
378            1
379        };
380
381        self.idx += 1;
382        Some((value, diff))
383    }
384}