flow/
expr.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! for declare Expression in dataflow, including map, reduce, id and join(TODO!) etc.
16
17mod df_func;
18pub(crate) mod error;
19pub(crate) mod func;
20mod id;
21mod linear;
22pub(crate) mod relation;
23mod scalar;
24mod signature;
25pub(crate) mod utils;
26
27use arrow::compute::FilterBuilder;
28use datatypes::prelude::{ConcreteDataType, DataType};
29use datatypes::value::Value;
30use datatypes::vectors::{BooleanVector, Helper, VectorRef};
31pub(crate) use df_func::{DfScalarFunction, RawDfScalarFn};
32pub(crate) use error::{EvalError, InvalidArgumentSnafu};
33pub(crate) use func::{BinaryFunc, UnaryFunc, UnmaterializableFunc, VariadicFunc};
34pub(crate) use id::{GlobalId, Id, LocalId};
35use itertools::Itertools;
36pub(crate) use linear::{MapFilterProject, MfpPlan, SafeMfpPlan};
37pub(crate) use relation::{Accum, Accumulator, AggregateExpr, AggregateFunc};
38pub(crate) use scalar::{ScalarExpr, TypedExpr};
39use snafu::{ensure, ResultExt};
40
41use crate::expr::error::{ArrowSnafu, DataTypeSnafu};
42use crate::repr::Diff;
43
44pub const TUMBLE_START: &str = "tumble_start";
45pub const TUMBLE_END: &str = "tumble_end";
46
47/// A batch of vectors with the same length but without schema, only useful in dataflow
48///
49/// somewhere cheap to clone since it just contains a list of VectorRef(which is a `Arc`).
50#[derive(Debug, Clone)]
51pub struct Batch {
52    batch: Vec<VectorRef>,
53    row_count: usize,
54    /// describe if corresponding rows in batch is insert or delete, None means all rows are insert
55    diffs: Option<VectorRef>,
56}
57
58impl From<common_recordbatch::RecordBatch> for Batch {
59    fn from(value: common_recordbatch::RecordBatch) -> Self {
60        Self {
61            row_count: value.num_rows(),
62            batch: value.columns,
63            diffs: None,
64        }
65    }
66}
67
68impl PartialEq for Batch {
69    fn eq(&self, other: &Self) -> bool {
70        let mut batch_eq = true;
71        if self.batch.len() != other.batch.len() {
72            return false;
73        }
74        for (left, right) in self.batch.iter().zip(other.batch.iter()) {
75            batch_eq = batch_eq
76                && <dyn arrow::array::Array>::eq(&left.to_arrow_array(), &right.to_arrow_array());
77        }
78
79        let diff_eq = match (&self.diffs, &other.diffs) {
80            (Some(left), Some(right)) => {
81                <dyn arrow::array::Array>::eq(&left.to_arrow_array(), &right.to_arrow_array())
82            }
83            (None, None) => true,
84            _ => false,
85        };
86        batch_eq && diff_eq && self.row_count == other.row_count
87    }
88}
89
90impl Eq for Batch {}
91
92impl Default for Batch {
93    fn default() -> Self {
94        Self::empty()
95    }
96}
97
98impl Batch {
99    /// Get batch from rows, will try best to determine data type
100    pub fn try_from_rows_with_types(
101        rows: Vec<crate::repr::Row>,
102        batch_datatypes: &[ConcreteDataType],
103    ) -> Result<Self, EvalError> {
104        if rows.is_empty() {
105            return Ok(Self::empty());
106        }
107        let len = rows.len();
108        let mut builder = batch_datatypes
109            .iter()
110            .map(|ty| ty.create_mutable_vector(len))
111            .collect_vec();
112        for row in rows {
113            ensure!(
114                row.len() == builder.len(),
115                InvalidArgumentSnafu {
116                    reason: format!(
117                        "row length not match, expect {}, found {}",
118                        builder.len(),
119                        row.len()
120                    )
121                }
122            );
123            for (idx, value) in row.iter().enumerate() {
124                builder[idx]
125                    .try_push_value_ref(value.as_value_ref())
126                    .context(DataTypeSnafu {
127                        msg: "Failed to convert rows to columns",
128                    })?;
129            }
130        }
131
132        let columns = builder.into_iter().map(|mut b| b.to_vector()).collect_vec();
133        let batch = Self::try_new(columns, len)?;
134        Ok(batch)
135    }
136
137    pub fn empty() -> Self {
138        Self {
139            batch: vec![],
140            row_count: 0,
141            diffs: None,
142        }
143    }
144    pub fn try_new(batch: Vec<VectorRef>, row_count: usize) -> Result<Self, EvalError> {
145        ensure!(
146            batch.iter().map(|v| v.len()).all_equal()
147                && batch.first().map(|v| v.len() == row_count).unwrap_or(true),
148            InvalidArgumentSnafu {
149                reason: "All columns should have same length".to_string()
150            }
151        );
152        Ok(Self {
153            batch,
154            row_count,
155            diffs: None,
156        })
157    }
158
159    pub fn new_unchecked(batch: Vec<VectorRef>, row_count: usize) -> Self {
160        Self {
161            batch,
162            row_count,
163            diffs: None,
164        }
165    }
166
167    pub fn batch(&self) -> &[VectorRef] {
168        &self.batch
169    }
170
171    pub fn batch_mut(&mut self) -> &mut Vec<VectorRef> {
172        &mut self.batch
173    }
174
175    pub fn row_count(&self) -> usize {
176        self.row_count
177    }
178
179    pub fn set_row_count(&mut self, row_count: usize) {
180        self.row_count = row_count;
181    }
182
183    pub fn column_count(&self) -> usize {
184        self.batch.len()
185    }
186
187    pub fn get_row(&self, idx: usize) -> Result<Vec<Value>, EvalError> {
188        ensure!(
189            idx < self.row_count,
190            InvalidArgumentSnafu {
191                reason: format!(
192                    "Expect row index to be less than {}, found {}",
193                    self.row_count, idx
194                )
195            }
196        );
197        let mut ret = Vec::with_capacity(self.column_count());
198        ret.extend(self.batch.iter().map(|v| v.get(idx)));
199        Ok(ret)
200    }
201
202    /// Slices the `Batch`, returning a new `Batch`.
203    pub fn slice(&self, offset: usize, length: usize) -> Result<Batch, EvalError> {
204        let batch = self
205            .batch()
206            .iter()
207            .map(|v| v.slice(offset, length))
208            .collect_vec();
209        Batch::try_new(batch, length)
210    }
211
212    /// append another batch to self
213    ///
214    /// NOTE: This is expensive since it will create new vectors for each column
215    pub fn append_batch(&mut self, other: Batch) -> Result<(), EvalError> {
216        ensure!(
217            self.batch.len() == other.batch.len()
218                || self.batch.is_empty()
219                || other.batch.is_empty(),
220            InvalidArgumentSnafu {
221                reason: format!(
222                    "Expect two batch to have same numbers of column, found {} and {} columns",
223                    self.batch.len(),
224                    other.batch.len()
225                )
226            }
227        );
228
229        if self.batch.is_empty() {
230            self.batch = other.batch;
231            self.row_count = other.row_count;
232            return Ok(());
233        } else if other.batch.is_empty() {
234            return Ok(());
235        }
236
237        let dts = {
238            let max_len = self.batch.len().max(other.batch.len());
239            let mut dts = Vec::with_capacity(max_len);
240            for i in 0..max_len {
241                if let Some(v) = self.batch().get(i)
242                    && !v.data_type().is_null()
243                {
244                    dts.push(v.data_type())
245                } else if let Some(v) = other.batch().get(i)
246                    && !v.data_type().is_null()
247                {
248                    dts.push(v.data_type())
249                } else {
250                    // both are null, so we will push null type
251                    dts.push(datatypes::prelude::ConcreteDataType::null_datatype())
252                }
253            }
254
255            dts
256        };
257
258        let batch_builders = dts
259            .iter()
260            .map(|dt| dt.create_mutable_vector(self.row_count() + other.row_count()))
261            .collect_vec();
262
263        let mut result = vec![];
264        let self_row_count = self.row_count();
265        let other_row_count = other.row_count();
266        for (idx, mut builder) in batch_builders.into_iter().enumerate() {
267            builder
268                .extend_slice_of(self.batch()[idx].as_ref(), 0, self_row_count)
269                .context(DataTypeSnafu {
270                    msg: "Failed to extend vector",
271                })?;
272            builder
273                .extend_slice_of(other.batch()[idx].as_ref(), 0, other_row_count)
274                .context(DataTypeSnafu {
275                    msg: "Failed to extend vector",
276                })?;
277            result.push(builder.to_vector());
278        }
279        self.batch = result;
280        self.row_count = self_row_count + other_row_count;
281        Ok(())
282    }
283
284    /// filter the batch with given predicate
285    pub fn filter(&self, predicate: &BooleanVector) -> Result<Self, EvalError> {
286        let len = predicate.as_boolean_array().true_count();
287        let filter_builder = FilterBuilder::new(predicate.as_boolean_array()).optimize();
288        let filter_pred = filter_builder.build();
289        let filtered = self
290            .batch()
291            .iter()
292            .map(|col| filter_pred.filter(col.to_arrow_array().as_ref()))
293            .try_collect::<_, Vec<_>, _>()
294            .context(ArrowSnafu {
295                context: "Failed to filter val batches",
296            })?;
297        let res_vector = Helper::try_into_vectors(&filtered).context(DataTypeSnafu {
298            msg: "can't convert arrow array to vector",
299        })?;
300        Self::try_new(res_vector, len)
301    }
302}
303
304/// Vector with diff to note the insert and delete
305pub(crate) struct VectorDiff {
306    vector: VectorRef,
307    diff: Option<VectorRef>,
308}
309
310impl From<VectorRef> for VectorDiff {
311    fn from(vector: VectorRef) -> Self {
312        Self { vector, diff: None }
313    }
314}
315
316impl VectorDiff {
317    fn len(&self) -> usize {
318        self.vector.len()
319    }
320
321    fn try_new(vector: VectorRef, diff: Option<VectorRef>) -> Result<Self, EvalError> {
322        ensure!(
323            diff.as_ref().is_none_or(|diff| diff.len() == vector.len()),
324            InvalidArgumentSnafu {
325                reason: "Length of vector and diff should be the same"
326            }
327        );
328        Ok(Self { vector, diff })
329    }
330}
331
332impl IntoIterator for VectorDiff {
333    type Item = (Value, Diff);
334    type IntoIter = VectorDiffIter;
335
336    fn into_iter(self) -> Self::IntoIter {
337        VectorDiffIter {
338            vector: self.vector,
339            diff: self.diff,
340            idx: 0,
341        }
342    }
343}
344
345/// iterator for VectorDiff
346pub(crate) struct VectorDiffIter {
347    vector: VectorRef,
348    diff: Option<VectorRef>,
349    idx: usize,
350}
351
352impl std::iter::Iterator for VectorDiffIter {
353    type Item = (Value, Diff);
354
355    fn next(&mut self) -> Option<Self::Item> {
356        if self.idx >= self.vector.len() {
357            return None;
358        }
359        let value = self.vector.get(self.idx);
360        // +1 means insert, -1 means delete, and default to +1 insert when diff is not provided
361        let diff = if let Some(diff) = self.diff.as_ref() {
362            if let Ok(diff_at) = diff.get(self.idx).try_into() {
363                diff_at
364            } else {
365                common_telemetry::warn!("Invalid diff value at index {}", self.idx);
366                return None;
367            }
368        } else {
369            1
370        };
371
372        self.idx += 1;
373        Some((value, diff))
374    }
375}