use datafusion_expr::{Expr as DfExpr, Operator};
use index::inverted_index::search::predicate::{Bound, Predicate, Range, RangePredicate};
use index::Bytes;
use crate::error::Result;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
impl InvertedIndexApplierBuilder<'_> {
pub(crate) fn collect_comparison_expr(
&mut self,
left: &DfExpr,
op: &Operator,
right: &DfExpr,
) -> Result<()> {
match op {
Operator::Lt => {
if matches!(right, DfExpr::Column(_)) {
self.collect_column_gt_lit(right, left)
} else {
self.collect_column_lt_lit(left, right)
}
}
Operator::LtEq => {
if matches!(right, DfExpr::Column(_)) {
self.collect_column_ge_lit(right, left)
} else {
self.collect_column_le_lit(left, right)
}
}
Operator::Gt => {
if matches!(right, DfExpr::Column(_)) {
self.collect_column_lt_lit(right, left)
} else {
self.collect_column_gt_lit(left, right)
}
}
Operator::GtEq => {
if matches!(right, DfExpr::Column(_)) {
self.collect_column_le_lit(right, left)
} else {
self.collect_column_ge_lit(left, right)
}
}
_ => Ok(()),
}
}
fn collect_column_lt_lit(&mut self, left: &DfExpr, right: &DfExpr) -> Result<()> {
self.collect_column_cmp_lit(left, right, |value| Range {
lower: None,
upper: Some(Bound {
inclusive: false,
value,
}),
})
}
fn collect_column_gt_lit(&mut self, left: &DfExpr, right: &DfExpr) -> Result<()> {
self.collect_column_cmp_lit(left, right, |value| Range {
lower: Some(Bound {
inclusive: false,
value,
}),
upper: None,
})
}
fn collect_column_le_lit(&mut self, left: &DfExpr, right: &DfExpr) -> Result<()> {
self.collect_column_cmp_lit(left, right, |value| Range {
lower: None,
upper: Some(Bound {
inclusive: true,
value,
}),
})
}
fn collect_column_ge_lit(&mut self, left: &DfExpr, right: &DfExpr) -> Result<()> {
self.collect_column_cmp_lit(left, right, |value| Range {
lower: Some(Bound {
inclusive: true,
value,
}),
upper: None,
})
}
fn collect_column_cmp_lit(
&mut self,
column: &DfExpr,
literal: &DfExpr,
range_builder: impl FnOnce(Bytes) -> Range,
) -> Result<()> {
let Some(column_name) = Self::column_name(column) else {
return Ok(());
};
let Some(lit) = Self::nonnull_lit(literal) else {
return Ok(());
};
let Some((column_id, data_type)) = self.column_id_and_type(column_name)? else {
return Ok(());
};
let predicate = Predicate::Range(RangePredicate {
range: range_builder(Self::encode_lit(lit, data_type)?),
});
self.add_predicate(column_id, predicate);
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use super::*;
use crate::error::Error;
use crate::sst::index::inverted_index::applier::builder::tests::{
encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column,
test_object_store, test_region_metadata,
};
use crate::sst::index::puffin_manager::PuffinManagerFactory;
#[test]
fn test_collect_comparison_basic() {
let cases = [
(
(&tag_column(), &Operator::Lt, &string_lit("123")),
Range {
lower: None,
upper: Some(Bound {
inclusive: false,
value: encoded_string("123"),
}),
},
),
(
(&string_lit("123"), &Operator::Lt, &tag_column()),
Range {
lower: Some(Bound {
inclusive: false,
value: encoded_string("123"),
}),
upper: None,
},
),
(
(&tag_column(), &Operator::LtEq, &string_lit("123")),
Range {
lower: None,
upper: Some(Bound {
inclusive: true,
value: encoded_string("123"),
}),
},
),
(
(&string_lit("123"), &Operator::LtEq, &tag_column()),
Range {
lower: Some(Bound {
inclusive: true,
value: encoded_string("123"),
}),
upper: None,
},
),
(
(&tag_column(), &Operator::Gt, &string_lit("123")),
Range {
lower: Some(Bound {
inclusive: false,
value: encoded_string("123"),
}),
upper: None,
},
),
(
(&string_lit("123"), &Operator::Gt, &tag_column()),
Range {
lower: None,
upper: Some(Bound {
inclusive: false,
value: encoded_string("123"),
}),
},
),
(
(&tag_column(), &Operator::GtEq, &string_lit("123")),
Range {
lower: Some(Bound {
inclusive: true,
value: encoded_string("123"),
}),
upper: None,
},
),
(
(&string_lit("123"), &Operator::GtEq, &tag_column()),
Range {
lower: None,
upper: Some(Bound {
inclusive: true,
value: encoded_string("123"),
}),
},
),
];
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_comparison_basic_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
);
for ((left, op, right), _) in &cases {
builder.collect_comparison_expr(left, op, right).unwrap();
}
let predicates = builder.output.get(&1).unwrap();
assert_eq!(predicates.len(), cases.len());
for ((_, expected), actual) in cases.into_iter().zip(predicates) {
assert_eq!(
actual,
&Predicate::Range(RangePredicate { range: expected })
);
}
}
#[test]
fn test_collect_comparison_type_mismatch() {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_comparison_type_mismatch_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
);
let res = builder.collect_comparison_expr(&tag_column(), &Operator::Lt, &int64_lit(10));
assert!(matches!(res, Err(Error::FieldTypeMismatch { .. })));
assert!(builder.output.is_empty());
}
#[test]
fn test_collect_comparison_field_column() {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_comparison_field_column_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
);
builder
.collect_comparison_expr(&field_column(), &Operator::Lt, &string_lit("abc"))
.unwrap();
let predicates = builder.output.get(&3).unwrap();
assert_eq!(predicates.len(), 1);
assert_eq!(
predicates[0],
Predicate::Range(RangePredicate {
range: Range {
lower: None,
upper: Some(Bound {
inclusive: false,
value: encoded_string("abc"),
}),
}
})
);
}
#[test]
fn test_collect_comparison_nonexistent_column() {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_comparison_nonexistent_column_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
);
let res = builder.collect_comparison_expr(
&nonexistent_column(),
&Operator::Lt,
&string_lit("abc"),
);
assert!(matches!(res, Err(Error::ColumnNotFound { .. })));
assert!(builder.output.is_empty());
}
}