1mod df_func;
18pub(crate) mod error;
19pub(crate) mod func;
20mod id;
21mod linear;
22pub(crate) mod relation;
23mod scalar;
24mod signature;
25pub(crate) mod utils;
26
27use arrow::compute::FilterBuilder;
28use common_recordbatch::RecordBatch;
29use datatypes::prelude::{ConcreteDataType, DataType};
30use datatypes::value::Value;
31use datatypes::vectors::{BooleanVector, Helper, VectorRef};
32pub(crate) use df_func::{DfScalarFunction, RawDfScalarFn};
33pub(crate) use error::{EvalError, InvalidArgumentSnafu};
34pub(crate) use func::{BinaryFunc, UnaryFunc, UnmaterializableFunc, VariadicFunc};
35pub(crate) use id::{GlobalId, Id, LocalId};
36use itertools::Itertools;
37pub(crate) use linear::{MapFilterProject, MfpPlan, SafeMfpPlan};
38pub(crate) use relation::{Accum, Accumulator, AggregateExpr, AggregateFunc};
39pub(crate) use scalar::{ScalarExpr, TypedExpr};
40use snafu::{ResultExt, ensure};
41
42use crate::Error;
43use crate::error::DatatypesSnafu;
44use crate::expr::error::{ArrowSnafu, DataTypeSnafu};
45use crate::repr::Diff;
46
47pub const TUMBLE_START: &str = "tumble_start";
48pub const TUMBLE_END: &str = "tumble_end";
49
50#[derive(Debug, Clone)]
54pub struct Batch {
55 batch: Vec<VectorRef>,
56 row_count: usize,
57 diffs: Option<VectorRef>,
59}
60
61impl TryFrom<RecordBatch> for Batch {
62 type Error = Error;
63
64 fn try_from(value: RecordBatch) -> Result<Self, Self::Error> {
65 let columns = value.columns();
66 let batch = Helper::try_into_vectors(columns).context(DatatypesSnafu {
67 extra: "failed to convert Arrow array to vector when building Flow batch",
68 })?;
69 Ok(Self {
70 row_count: value.num_rows(),
71 batch,
72 diffs: None,
73 })
74 }
75}
76
77impl PartialEq for Batch {
78 fn eq(&self, other: &Self) -> bool {
79 let mut batch_eq = true;
80 if self.batch.len() != other.batch.len() {
81 return false;
82 }
83 for (left, right) in self.batch.iter().zip(other.batch.iter()) {
84 batch_eq = batch_eq
85 && <dyn arrow::array::Array>::eq(&left.to_arrow_array(), &right.to_arrow_array());
86 }
87
88 let diff_eq = match (&self.diffs, &other.diffs) {
89 (Some(left), Some(right)) => {
90 <dyn arrow::array::Array>::eq(&left.to_arrow_array(), &right.to_arrow_array())
91 }
92 (None, None) => true,
93 _ => false,
94 };
95 batch_eq && diff_eq && self.row_count == other.row_count
96 }
97}
98
99impl Eq for Batch {}
100
101impl Default for Batch {
102 fn default() -> Self {
103 Self::empty()
104 }
105}
106
107impl Batch {
108 pub fn try_from_rows_with_types(
110 rows: Vec<crate::repr::Row>,
111 batch_datatypes: &[ConcreteDataType],
112 ) -> Result<Self, EvalError> {
113 if rows.is_empty() {
114 return Ok(Self::empty());
115 }
116 let len = rows.len();
117 let mut builder = batch_datatypes
118 .iter()
119 .map(|ty| ty.create_mutable_vector(len))
120 .collect_vec();
121 for row in rows {
122 ensure!(
123 row.len() == builder.len(),
124 InvalidArgumentSnafu {
125 reason: format!(
126 "row length not match, expect {}, found {}",
127 builder.len(),
128 row.len()
129 )
130 }
131 );
132 for (idx, value) in row.iter().enumerate() {
133 builder[idx]
134 .try_push_value_ref(&value.as_value_ref())
135 .context(DataTypeSnafu {
136 msg: "Failed to convert rows to columns",
137 })?;
138 }
139 }
140
141 let columns = builder.into_iter().map(|mut b| b.to_vector()).collect_vec();
142 let batch = Self::try_new(columns, len)?;
143 Ok(batch)
144 }
145
146 pub fn empty() -> Self {
147 Self {
148 batch: vec![],
149 row_count: 0,
150 diffs: None,
151 }
152 }
153 pub fn try_new(batch: Vec<VectorRef>, row_count: usize) -> Result<Self, EvalError> {
154 ensure!(
155 batch.iter().map(|v| v.len()).all_equal()
156 && batch.first().map(|v| v.len() == row_count).unwrap_or(true),
157 InvalidArgumentSnafu {
158 reason: "All columns should have same length".to_string()
159 }
160 );
161 Ok(Self {
162 batch,
163 row_count,
164 diffs: None,
165 })
166 }
167
168 pub fn new_unchecked(batch: Vec<VectorRef>, row_count: usize) -> Self {
169 Self {
170 batch,
171 row_count,
172 diffs: None,
173 }
174 }
175
176 pub fn batch(&self) -> &[VectorRef] {
177 &self.batch
178 }
179
180 pub fn batch_mut(&mut self) -> &mut Vec<VectorRef> {
181 &mut self.batch
182 }
183
184 pub fn row_count(&self) -> usize {
185 self.row_count
186 }
187
188 pub fn set_row_count(&mut self, row_count: usize) {
189 self.row_count = row_count;
190 }
191
192 pub fn column_count(&self) -> usize {
193 self.batch.len()
194 }
195
196 pub fn get_row(&self, idx: usize) -> Result<Vec<Value>, EvalError> {
197 ensure!(
198 idx < self.row_count,
199 InvalidArgumentSnafu {
200 reason: format!(
201 "Expect row index to be less than {}, found {}",
202 self.row_count, idx
203 )
204 }
205 );
206 let mut ret = Vec::with_capacity(self.column_count());
207 ret.extend(self.batch.iter().map(|v| v.get(idx)));
208 Ok(ret)
209 }
210
211 pub fn slice(&self, offset: usize, length: usize) -> Result<Batch, EvalError> {
213 let batch = self
214 .batch()
215 .iter()
216 .map(|v| v.slice(offset, length))
217 .collect_vec();
218 Batch::try_new(batch, length)
219 }
220
221 pub fn append_batch(&mut self, other: Batch) -> Result<(), EvalError> {
225 ensure!(
226 self.batch.len() == other.batch.len()
227 || self.batch.is_empty()
228 || other.batch.is_empty(),
229 InvalidArgumentSnafu {
230 reason: format!(
231 "Expect two batch to have same numbers of column, found {} and {} columns",
232 self.batch.len(),
233 other.batch.len()
234 )
235 }
236 );
237
238 if self.batch.is_empty() {
239 self.batch = other.batch;
240 self.row_count = other.row_count;
241 return Ok(());
242 } else if other.batch.is_empty() {
243 return Ok(());
244 }
245
246 let dts = {
247 let max_len = self.batch.len().max(other.batch.len());
248 let mut dts = Vec::with_capacity(max_len);
249 for i in 0..max_len {
250 if let Some(v) = self.batch().get(i)
251 && !v.data_type().is_null()
252 {
253 dts.push(v.data_type())
254 } else if let Some(v) = other.batch().get(i)
255 && !v.data_type().is_null()
256 {
257 dts.push(v.data_type())
258 } else {
259 dts.push(datatypes::prelude::ConcreteDataType::null_datatype())
261 }
262 }
263
264 dts
265 };
266
267 let batch_builders = dts
268 .iter()
269 .map(|dt| dt.create_mutable_vector(self.row_count() + other.row_count()))
270 .collect_vec();
271
272 let mut result = vec![];
273 let self_row_count = self.row_count();
274 let other_row_count = other.row_count();
275 for (idx, mut builder) in batch_builders.into_iter().enumerate() {
276 builder
277 .extend_slice_of(self.batch()[idx].as_ref(), 0, self_row_count)
278 .context(DataTypeSnafu {
279 msg: "Failed to extend vector",
280 })?;
281 builder
282 .extend_slice_of(other.batch()[idx].as_ref(), 0, other_row_count)
283 .context(DataTypeSnafu {
284 msg: "Failed to extend vector",
285 })?;
286 result.push(builder.to_vector());
287 }
288 self.batch = result;
289 self.row_count = self_row_count + other_row_count;
290 Ok(())
291 }
292
293 pub fn filter(&self, predicate: &BooleanVector) -> Result<Self, EvalError> {
295 let len = predicate.as_boolean_array().true_count();
296 let filter_builder = FilterBuilder::new(predicate.as_boolean_array()).optimize();
297 let filter_pred = filter_builder.build();
298 let filtered = self
299 .batch()
300 .iter()
301 .map(|col| filter_pred.filter(col.to_arrow_array().as_ref()))
302 .try_collect::<_, Vec<_>, _>()
303 .context(ArrowSnafu {
304 context: "Failed to filter val batches",
305 })?;
306 let res_vector = Helper::try_into_vectors(&filtered).context(DataTypeSnafu {
307 msg: "can't convert arrow array to vector",
308 })?;
309 Self::try_new(res_vector, len)
310 }
311}
312
313pub(crate) struct VectorDiff {
315 vector: VectorRef,
316 diff: Option<VectorRef>,
317}
318
319impl From<VectorRef> for VectorDiff {
320 fn from(vector: VectorRef) -> Self {
321 Self { vector, diff: None }
322 }
323}
324
325impl VectorDiff {
326 fn len(&self) -> usize {
327 self.vector.len()
328 }
329
330 fn try_new(vector: VectorRef, diff: Option<VectorRef>) -> Result<Self, EvalError> {
331 ensure!(
332 diff.as_ref().is_none_or(|diff| diff.len() == vector.len()),
333 InvalidArgumentSnafu {
334 reason: "Length of vector and diff should be the same"
335 }
336 );
337 Ok(Self { vector, diff })
338 }
339}
340
341impl IntoIterator for VectorDiff {
342 type Item = (Value, Diff);
343 type IntoIter = VectorDiffIter;
344
345 fn into_iter(self) -> Self::IntoIter {
346 VectorDiffIter {
347 vector: self.vector,
348 diff: self.diff,
349 idx: 0,
350 }
351 }
352}
353
354pub(crate) struct VectorDiffIter {
356 vector: VectorRef,
357 diff: Option<VectorRef>,
358 idx: usize,
359}
360
361impl std::iter::Iterator for VectorDiffIter {
362 type Item = (Value, Diff);
363
364 fn next(&mut self) -> Option<Self::Item> {
365 if self.idx >= self.vector.len() {
366 return None;
367 }
368 let value = self.vector.get(self.idx);
369 let diff = if let Some(diff) = self.diff.as_ref() {
371 if let Ok(diff_at) = diff.get(self.idx).try_into() {
372 diff_at
373 } else {
374 common_telemetry::warn!("Invalid diff value at index {}", self.idx);
375 return None;
376 }
377 } else {
378 1
379 };
380
381 self.idx += 1;
382 Some((value, diff))
383 }
384}