1mod df_func;
18pub(crate) mod error;
19pub(crate) mod func;
20mod id;
21mod linear;
22pub(crate) mod relation;
23mod scalar;
24mod signature;
25pub(crate) mod utils;
26
27use arrow::compute::FilterBuilder;
28use datatypes::prelude::{ConcreteDataType, DataType};
29use datatypes::value::Value;
30use datatypes::vectors::{BooleanVector, Helper, VectorRef};
31pub(crate) use df_func::{DfScalarFunction, RawDfScalarFn};
32pub(crate) use error::{EvalError, InvalidArgumentSnafu};
33pub(crate) use func::{BinaryFunc, UnaryFunc, UnmaterializableFunc, VariadicFunc};
34pub(crate) use id::{GlobalId, Id, LocalId};
35use itertools::Itertools;
36pub(crate) use linear::{MapFilterProject, MfpPlan, SafeMfpPlan};
37pub(crate) use relation::{Accum, Accumulator, AggregateExpr, AggregateFunc};
38pub(crate) use scalar::{ScalarExpr, TypedExpr};
39use snafu::{ensure, ResultExt};
40
41use crate::expr::error::{ArrowSnafu, DataTypeSnafu};
42use crate::repr::Diff;
43
44pub const TUMBLE_START: &str = "tumble_start";
45pub const TUMBLE_END: &str = "tumble_end";
46
47#[derive(Debug, Clone)]
51pub struct Batch {
52 batch: Vec<VectorRef>,
53 row_count: usize,
54 diffs: Option<VectorRef>,
56}
57
58impl From<common_recordbatch::RecordBatch> for Batch {
59 fn from(value: common_recordbatch::RecordBatch) -> Self {
60 Self {
61 row_count: value.num_rows(),
62 batch: value.columns,
63 diffs: None,
64 }
65 }
66}
67
68impl PartialEq for Batch {
69 fn eq(&self, other: &Self) -> bool {
70 let mut batch_eq = true;
71 if self.batch.len() != other.batch.len() {
72 return false;
73 }
74 for (left, right) in self.batch.iter().zip(other.batch.iter()) {
75 batch_eq = batch_eq
76 && <dyn arrow::array::Array>::eq(&left.to_arrow_array(), &right.to_arrow_array());
77 }
78
79 let diff_eq = match (&self.diffs, &other.diffs) {
80 (Some(left), Some(right)) => {
81 <dyn arrow::array::Array>::eq(&left.to_arrow_array(), &right.to_arrow_array())
82 }
83 (None, None) => true,
84 _ => false,
85 };
86 batch_eq && diff_eq && self.row_count == other.row_count
87 }
88}
89
90impl Eq for Batch {}
91
92impl Default for Batch {
93 fn default() -> Self {
94 Self::empty()
95 }
96}
97
98impl Batch {
99 pub fn try_from_rows_with_types(
101 rows: Vec<crate::repr::Row>,
102 batch_datatypes: &[ConcreteDataType],
103 ) -> Result<Self, EvalError> {
104 if rows.is_empty() {
105 return Ok(Self::empty());
106 }
107 let len = rows.len();
108 let mut builder = batch_datatypes
109 .iter()
110 .map(|ty| ty.create_mutable_vector(len))
111 .collect_vec();
112 for row in rows {
113 ensure!(
114 row.len() == builder.len(),
115 InvalidArgumentSnafu {
116 reason: format!(
117 "row length not match, expect {}, found {}",
118 builder.len(),
119 row.len()
120 )
121 }
122 );
123 for (idx, value) in row.iter().enumerate() {
124 builder[idx]
125 .try_push_value_ref(value.as_value_ref())
126 .context(DataTypeSnafu {
127 msg: "Failed to convert rows to columns",
128 })?;
129 }
130 }
131
132 let columns = builder.into_iter().map(|mut b| b.to_vector()).collect_vec();
133 let batch = Self::try_new(columns, len)?;
134 Ok(batch)
135 }
136
137 pub fn empty() -> Self {
138 Self {
139 batch: vec![],
140 row_count: 0,
141 diffs: None,
142 }
143 }
144 pub fn try_new(batch: Vec<VectorRef>, row_count: usize) -> Result<Self, EvalError> {
145 ensure!(
146 batch.iter().map(|v| v.len()).all_equal()
147 && batch.first().map(|v| v.len() == row_count).unwrap_or(true),
148 InvalidArgumentSnafu {
149 reason: "All columns should have same length".to_string()
150 }
151 );
152 Ok(Self {
153 batch,
154 row_count,
155 diffs: None,
156 })
157 }
158
159 pub fn new_unchecked(batch: Vec<VectorRef>, row_count: usize) -> Self {
160 Self {
161 batch,
162 row_count,
163 diffs: None,
164 }
165 }
166
167 pub fn batch(&self) -> &[VectorRef] {
168 &self.batch
169 }
170
171 pub fn batch_mut(&mut self) -> &mut Vec<VectorRef> {
172 &mut self.batch
173 }
174
175 pub fn row_count(&self) -> usize {
176 self.row_count
177 }
178
179 pub fn set_row_count(&mut self, row_count: usize) {
180 self.row_count = row_count;
181 }
182
183 pub fn column_count(&self) -> usize {
184 self.batch.len()
185 }
186
187 pub fn get_row(&self, idx: usize) -> Result<Vec<Value>, EvalError> {
188 ensure!(
189 idx < self.row_count,
190 InvalidArgumentSnafu {
191 reason: format!(
192 "Expect row index to be less than {}, found {}",
193 self.row_count, idx
194 )
195 }
196 );
197 let mut ret = Vec::with_capacity(self.column_count());
198 ret.extend(self.batch.iter().map(|v| v.get(idx)));
199 Ok(ret)
200 }
201
202 pub fn slice(&self, offset: usize, length: usize) -> Result<Batch, EvalError> {
204 let batch = self
205 .batch()
206 .iter()
207 .map(|v| v.slice(offset, length))
208 .collect_vec();
209 Batch::try_new(batch, length)
210 }
211
212 pub fn append_batch(&mut self, other: Batch) -> Result<(), EvalError> {
216 ensure!(
217 self.batch.len() == other.batch.len()
218 || self.batch.is_empty()
219 || other.batch.is_empty(),
220 InvalidArgumentSnafu {
221 reason: format!(
222 "Expect two batch to have same numbers of column, found {} and {} columns",
223 self.batch.len(),
224 other.batch.len()
225 )
226 }
227 );
228
229 if self.batch.is_empty() {
230 self.batch = other.batch;
231 self.row_count = other.row_count;
232 return Ok(());
233 } else if other.batch.is_empty() {
234 return Ok(());
235 }
236
237 let dts = {
238 let max_len = self.batch.len().max(other.batch.len());
239 let mut dts = Vec::with_capacity(max_len);
240 for i in 0..max_len {
241 if let Some(v) = self.batch().get(i)
242 && !v.data_type().is_null()
243 {
244 dts.push(v.data_type())
245 } else if let Some(v) = other.batch().get(i)
246 && !v.data_type().is_null()
247 {
248 dts.push(v.data_type())
249 } else {
250 dts.push(datatypes::prelude::ConcreteDataType::null_datatype())
252 }
253 }
254
255 dts
256 };
257
258 let batch_builders = dts
259 .iter()
260 .map(|dt| dt.create_mutable_vector(self.row_count() + other.row_count()))
261 .collect_vec();
262
263 let mut result = vec![];
264 let self_row_count = self.row_count();
265 let other_row_count = other.row_count();
266 for (idx, mut builder) in batch_builders.into_iter().enumerate() {
267 builder
268 .extend_slice_of(self.batch()[idx].as_ref(), 0, self_row_count)
269 .context(DataTypeSnafu {
270 msg: "Failed to extend vector",
271 })?;
272 builder
273 .extend_slice_of(other.batch()[idx].as_ref(), 0, other_row_count)
274 .context(DataTypeSnafu {
275 msg: "Failed to extend vector",
276 })?;
277 result.push(builder.to_vector());
278 }
279 self.batch = result;
280 self.row_count = self_row_count + other_row_count;
281 Ok(())
282 }
283
284 pub fn filter(&self, predicate: &BooleanVector) -> Result<Self, EvalError> {
286 let len = predicate.as_boolean_array().true_count();
287 let filter_builder = FilterBuilder::new(predicate.as_boolean_array()).optimize();
288 let filter_pred = filter_builder.build();
289 let filtered = self
290 .batch()
291 .iter()
292 .map(|col| filter_pred.filter(col.to_arrow_array().as_ref()))
293 .try_collect::<_, Vec<_>, _>()
294 .context(ArrowSnafu {
295 context: "Failed to filter val batches",
296 })?;
297 let res_vector = Helper::try_into_vectors(&filtered).context(DataTypeSnafu {
298 msg: "can't convert arrow array to vector",
299 })?;
300 Self::try_new(res_vector, len)
301 }
302}
303
304pub(crate) struct VectorDiff {
306 vector: VectorRef,
307 diff: Option<VectorRef>,
308}
309
310impl From<VectorRef> for VectorDiff {
311 fn from(vector: VectorRef) -> Self {
312 Self { vector, diff: None }
313 }
314}
315
316impl VectorDiff {
317 fn len(&self) -> usize {
318 self.vector.len()
319 }
320
321 fn try_new(vector: VectorRef, diff: Option<VectorRef>) -> Result<Self, EvalError> {
322 ensure!(
323 diff.as_ref().is_none_or(|diff| diff.len() == vector.len()),
324 InvalidArgumentSnafu {
325 reason: "Length of vector and diff should be the same"
326 }
327 );
328 Ok(Self { vector, diff })
329 }
330}
331
332impl IntoIterator for VectorDiff {
333 type Item = (Value, Diff);
334 type IntoIter = VectorDiffIter;
335
336 fn into_iter(self) -> Self::IntoIter {
337 VectorDiffIter {
338 vector: self.vector,
339 diff: self.diff,
340 idx: 0,
341 }
342 }
343}
344
345pub(crate) struct VectorDiffIter {
347 vector: VectorRef,
348 diff: Option<VectorRef>,
349 idx: usize,
350}
351
352impl std::iter::Iterator for VectorDiffIter {
353 type Item = (Value, Diff);
354
355 fn next(&mut self) -> Option<Self::Item> {
356 if self.idx >= self.vector.len() {
357 return None;
358 }
359 let value = self.vector.get(self.idx);
360 let diff = if let Some(diff) = self.diff.as_ref() {
362 if let Ok(diff_at) = diff.get(self.idx).try_into() {
363 diff_at
364 } else {
365 common_telemetry::warn!("Invalid diff value at index {}", self.idx);
366 return None;
367 }
368 } else {
369 1
370 };
371
372 self.idx += 1;
373 Some((value, diff))
374 }
375}