mod df_func;
pub(crate) mod error;
pub(crate) mod func;
mod id;
mod linear;
pub(crate) mod relation;
mod scalar;
mod signature;
pub(crate) mod utils;
use arrow::compute::FilterBuilder;
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::value::Value;
use datatypes::vectors::{BooleanVector, Helper, VectorRef};
pub(crate) use df_func::{DfScalarFunction, RawDfScalarFn};
pub(crate) use error::{EvalError, InvalidArgumentSnafu};
pub(crate) use func::{BinaryFunc, UnaryFunc, UnmaterializableFunc, VariadicFunc};
pub(crate) use id::{GlobalId, Id, LocalId};
use itertools::Itertools;
pub(crate) use linear::{MapFilterProject, MfpPlan, SafeMfpPlan};
pub(crate) use relation::{Accum, Accumulator, AggregateExpr, AggregateFunc};
pub(crate) use scalar::{ScalarExpr, TypedExpr};
use snafu::{ensure, ResultExt};
use crate::expr::error::{ArrowSnafu, DataTypeSnafu};
use crate::repr::Diff;
pub const TUMBLE_START: &str = "tumble_start";
pub const TUMBLE_END: &str = "tumble_end";
#[derive(Debug, Clone)]
pub struct Batch {
batch: Vec<VectorRef>,
row_count: usize,
diffs: Option<VectorRef>,
}
impl From<common_recordbatch::RecordBatch> for Batch {
fn from(value: common_recordbatch::RecordBatch) -> Self {
Self {
row_count: value.num_rows(),
batch: value.columns,
diffs: None,
}
}
}
impl PartialEq for Batch {
fn eq(&self, other: &Self) -> bool {
let mut batch_eq = true;
if self.batch.len() != other.batch.len() {
return false;
}
for (left, right) in self.batch.iter().zip(other.batch.iter()) {
batch_eq = batch_eq
&& <dyn arrow::array::Array>::eq(&left.to_arrow_array(), &right.to_arrow_array());
}
let diff_eq = match (&self.diffs, &other.diffs) {
(Some(left), Some(right)) => {
<dyn arrow::array::Array>::eq(&left.to_arrow_array(), &right.to_arrow_array())
}
(None, None) => true,
_ => false,
};
batch_eq && diff_eq && self.row_count == other.row_count
}
}
impl Eq for Batch {}
impl Default for Batch {
fn default() -> Self {
Self::empty()
}
}
impl Batch {
pub fn try_from_rows_with_types(
rows: Vec<crate::repr::Row>,
batch_datatypes: &[ConcreteDataType],
) -> Result<Self, EvalError> {
if rows.is_empty() {
return Ok(Self::empty());
}
let len = rows.len();
let mut builder = batch_datatypes
.iter()
.map(|ty| ty.create_mutable_vector(len))
.collect_vec();
for row in rows {
ensure!(
row.len() == builder.len(),
InvalidArgumentSnafu {
reason: format!(
"row length not match, expect {}, found {}",
builder.len(),
row.len()
)
}
);
for (idx, value) in row.iter().enumerate() {
builder[idx]
.try_push_value_ref(value.as_value_ref())
.context(DataTypeSnafu {
msg: "Failed to convert rows to columns",
})?;
}
}
let columns = builder.into_iter().map(|mut b| b.to_vector()).collect_vec();
let batch = Self::try_new(columns, len)?;
Ok(batch)
}
pub fn empty() -> Self {
Self {
batch: vec![],
row_count: 0,
diffs: None,
}
}
pub fn try_new(batch: Vec<VectorRef>, row_count: usize) -> Result<Self, EvalError> {
ensure!(
batch.iter().map(|v| v.len()).all_equal()
&& batch.first().map(|v| v.len() == row_count).unwrap_or(true),
InvalidArgumentSnafu {
reason: "All columns should have same length".to_string()
}
);
Ok(Self {
batch,
row_count,
diffs: None,
})
}
pub fn new_unchecked(batch: Vec<VectorRef>, row_count: usize) -> Self {
Self {
batch,
row_count,
diffs: None,
}
}
pub fn batch(&self) -> &[VectorRef] {
&self.batch
}
pub fn batch_mut(&mut self) -> &mut Vec<VectorRef> {
&mut self.batch
}
pub fn row_count(&self) -> usize {
self.row_count
}
pub fn set_row_count(&mut self, row_count: usize) {
self.row_count = row_count;
}
pub fn column_count(&self) -> usize {
self.batch.len()
}
pub fn get_row(&self, idx: usize) -> Result<Vec<Value>, EvalError> {
ensure!(
idx < self.row_count,
InvalidArgumentSnafu {
reason: format!(
"Expect row index to be less than {}, found {}",
self.row_count, idx
)
}
);
let mut ret = Vec::with_capacity(self.column_count());
ret.extend(self.batch.iter().map(|v| v.get(idx)));
Ok(ret)
}
pub fn slice(&self, offset: usize, length: usize) -> Result<Batch, EvalError> {
let batch = self
.batch()
.iter()
.map(|v| v.slice(offset, length))
.collect_vec();
Batch::try_new(batch, length)
}
pub fn append_batch(&mut self, other: Batch) -> Result<(), EvalError> {
ensure!(
self.batch.len() == other.batch.len()
|| self.batch.is_empty()
|| other.batch.is_empty(),
InvalidArgumentSnafu {
reason: format!(
"Expect two batch to have same numbers of column, found {} and {} columns",
self.batch.len(),
other.batch.len()
)
}
);
if self.batch.is_empty() {
self.batch = other.batch;
self.row_count = other.row_count;
return Ok(());
} else if other.batch.is_empty() {
return Ok(());
}
let dts = {
let max_len = self.batch.len().max(other.batch.len());
let mut dts = Vec::with_capacity(max_len);
for i in 0..max_len {
if let Some(v) = self.batch().get(i)
&& !v.data_type().is_null()
{
dts.push(v.data_type())
} else if let Some(v) = other.batch().get(i)
&& !v.data_type().is_null()
{
dts.push(v.data_type())
} else {
dts.push(datatypes::prelude::ConcreteDataType::null_datatype())
}
}
dts
};
let batch_builders = dts
.iter()
.map(|dt| dt.create_mutable_vector(self.row_count() + other.row_count()))
.collect_vec();
let mut result = vec![];
let self_row_count = self.row_count();
let other_row_count = other.row_count();
for (idx, mut builder) in batch_builders.into_iter().enumerate() {
builder
.extend_slice_of(self.batch()[idx].as_ref(), 0, self_row_count)
.context(DataTypeSnafu {
msg: "Failed to extend vector",
})?;
builder
.extend_slice_of(other.batch()[idx].as_ref(), 0, other_row_count)
.context(DataTypeSnafu {
msg: "Failed to extend vector",
})?;
result.push(builder.to_vector());
}
self.batch = result;
self.row_count = self_row_count + other_row_count;
Ok(())
}
pub fn filter(&self, predicate: &BooleanVector) -> Result<Self, EvalError> {
let len = predicate.as_boolean_array().true_count();
let filter_builder = FilterBuilder::new(predicate.as_boolean_array()).optimize();
let filter_pred = filter_builder.build();
let filtered = self
.batch()
.iter()
.map(|col| filter_pred.filter(col.to_arrow_array().as_ref()))
.try_collect::<_, Vec<_>, _>()
.context(ArrowSnafu {
context: "Failed to filter val batches",
})?;
let res_vector = Helper::try_into_vectors(&filtered).context(DataTypeSnafu {
msg: "can't convert arrow array to vector",
})?;
Self::try_new(res_vector, len)
}
}
pub(crate) struct VectorDiff {
vector: VectorRef,
diff: Option<VectorRef>,
}
impl From<VectorRef> for VectorDiff {
fn from(vector: VectorRef) -> Self {
Self { vector, diff: None }
}
}
impl VectorDiff {
fn len(&self) -> usize {
self.vector.len()
}
fn try_new(vector: VectorRef, diff: Option<VectorRef>) -> Result<Self, EvalError> {
ensure!(
diff.as_ref().is_none_or(|diff| diff.len() == vector.len()),
InvalidArgumentSnafu {
reason: "Length of vector and diff should be the same"
}
);
Ok(Self { vector, diff })
}
}
impl IntoIterator for VectorDiff {
type Item = (Value, Diff);
type IntoIter = VectorDiffIter;
fn into_iter(self) -> Self::IntoIter {
VectorDiffIter {
vector: self.vector,
diff: self.diff,
idx: 0,
}
}
}
pub(crate) struct VectorDiffIter {
vector: VectorRef,
diff: Option<VectorRef>,
idx: usize,
}
impl std::iter::Iterator for VectorDiffIter {
type Item = (Value, Diff);
fn next(&mut self) -> Option<Self::Item> {
if self.idx >= self.vector.len() {
return None;
}
let value = self.vector.get(self.idx);
let diff = if let Some(diff) = self.diff.as_ref() {
if let Ok(diff_at) = diff.get(self.idx).try_into() {
diff_at
} else {
common_telemetry::warn!("Invalid diff value at index {}", self.idx);
return None;
}
} else {
1
};
self.idx += 1;
Some((value, diff))
}
}