1use datafusion_expr::{Expr as DfExpr, Operator};
16use index::inverted_index::search::predicate::{Bound, Predicate, Range, RangePredicate};
17use index::Bytes;
18
19use crate::error::Result;
20use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
21
22impl InvertedIndexApplierBuilder<'_> {
23 pub(crate) fn collect_comparison_expr(
27 &mut self,
28 left: &DfExpr,
29 op: &Operator,
30 right: &DfExpr,
31 ) -> Result<()> {
32 match op {
33 Operator::Lt => {
34 if matches!(right, DfExpr::Column(_)) {
35 self.collect_column_gt_lit(right, left)
36 } else {
37 self.collect_column_lt_lit(left, right)
38 }
39 }
40 Operator::LtEq => {
41 if matches!(right, DfExpr::Column(_)) {
42 self.collect_column_ge_lit(right, left)
43 } else {
44 self.collect_column_le_lit(left, right)
45 }
46 }
47 Operator::Gt => {
48 if matches!(right, DfExpr::Column(_)) {
49 self.collect_column_lt_lit(right, left)
50 } else {
51 self.collect_column_gt_lit(left, right)
52 }
53 }
54 Operator::GtEq => {
55 if matches!(right, DfExpr::Column(_)) {
56 self.collect_column_le_lit(right, left)
57 } else {
58 self.collect_column_ge_lit(left, right)
59 }
60 }
61 _ => Ok(()),
62 }
63 }
64
65 fn collect_column_lt_lit(&mut self, left: &DfExpr, right: &DfExpr) -> Result<()> {
66 self.collect_column_cmp_lit(left, right, |value| Range {
67 lower: None,
68 upper: Some(Bound {
69 inclusive: false,
70 value,
71 }),
72 })
73 }
74
75 fn collect_column_gt_lit(&mut self, left: &DfExpr, right: &DfExpr) -> Result<()> {
76 self.collect_column_cmp_lit(left, right, |value| Range {
77 lower: Some(Bound {
78 inclusive: false,
79 value,
80 }),
81 upper: None,
82 })
83 }
84
85 fn collect_column_le_lit(&mut self, left: &DfExpr, right: &DfExpr) -> Result<()> {
86 self.collect_column_cmp_lit(left, right, |value| Range {
87 lower: None,
88 upper: Some(Bound {
89 inclusive: true,
90 value,
91 }),
92 })
93 }
94
95 fn collect_column_ge_lit(&mut self, left: &DfExpr, right: &DfExpr) -> Result<()> {
96 self.collect_column_cmp_lit(left, right, |value| Range {
97 lower: Some(Bound {
98 inclusive: true,
99 value,
100 }),
101 upper: None,
102 })
103 }
104
105 fn collect_column_cmp_lit(
106 &mut self,
107 column: &DfExpr,
108 literal: &DfExpr,
109 range_builder: impl FnOnce(Bytes) -> Range,
110 ) -> Result<()> {
111 let Some(column_name) = Self::column_name(column) else {
112 return Ok(());
113 };
114 let Some(lit) = Self::nonnull_lit(literal) else {
115 return Ok(());
116 };
117 let Some((column_id, data_type)) = self.column_id_and_type(column_name)? else {
118 return Ok(());
119 };
120
121 let predicate = Predicate::Range(RangePredicate {
122 range: range_builder(Self::encode_lit(lit, data_type)?),
123 });
124
125 self.add_predicate(column_id, predicate);
126 Ok(())
127 }
128}
129
130#[cfg(test)]
131mod tests {
132
133 use std::collections::HashSet;
134
135 use store_api::region_request::PathType;
136
137 use super::*;
138 use crate::error::Error;
139 use crate::sst::index::inverted_index::applier::builder::tests::{
140 encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column,
141 test_object_store, test_region_metadata,
142 };
143 use crate::sst::index::puffin_manager::PuffinManagerFactory;
144
145 #[test]
146 fn test_collect_comparison_basic() {
147 let cases = [
148 (
149 (&tag_column(), &Operator::Lt, &string_lit("123")),
150 Range {
151 lower: None,
152 upper: Some(Bound {
153 inclusive: false,
154 value: encoded_string("123"),
155 }),
156 },
157 ),
158 (
159 (&string_lit("123"), &Operator::Lt, &tag_column()),
160 Range {
161 lower: Some(Bound {
162 inclusive: false,
163 value: encoded_string("123"),
164 }),
165 upper: None,
166 },
167 ),
168 (
169 (&tag_column(), &Operator::LtEq, &string_lit("123")),
170 Range {
171 lower: None,
172 upper: Some(Bound {
173 inclusive: true,
174 value: encoded_string("123"),
175 }),
176 },
177 ),
178 (
179 (&string_lit("123"), &Operator::LtEq, &tag_column()),
180 Range {
181 lower: Some(Bound {
182 inclusive: true,
183 value: encoded_string("123"),
184 }),
185 upper: None,
186 },
187 ),
188 (
189 (&tag_column(), &Operator::Gt, &string_lit("123")),
190 Range {
191 lower: Some(Bound {
192 inclusive: false,
193 value: encoded_string("123"),
194 }),
195 upper: None,
196 },
197 ),
198 (
199 (&string_lit("123"), &Operator::Gt, &tag_column()),
200 Range {
201 lower: None,
202 upper: Some(Bound {
203 inclusive: false,
204 value: encoded_string("123"),
205 }),
206 },
207 ),
208 (
209 (&tag_column(), &Operator::GtEq, &string_lit("123")),
210 Range {
211 lower: Some(Bound {
212 inclusive: true,
213 value: encoded_string("123"),
214 }),
215 upper: None,
216 },
217 ),
218 (
219 (&string_lit("123"), &Operator::GtEq, &tag_column()),
220 Range {
221 lower: None,
222 upper: Some(Bound {
223 inclusive: true,
224 value: encoded_string("123"),
225 }),
226 },
227 ),
228 ];
229
230 let (_d, facotry) =
231 PuffinManagerFactory::new_for_test_block("test_collect_comparison_basic_");
232 let metadata = test_region_metadata();
233 let mut builder = InvertedIndexApplierBuilder::new(
234 "test".to_string(),
235 PathType::Bare,
236 test_object_store(),
237 &metadata,
238 HashSet::from_iter([1, 2, 3]),
239 facotry,
240 );
241
242 for ((left, op, right), _) in &cases {
243 builder.collect_comparison_expr(left, op, right).unwrap();
244 }
245
246 let predicates = builder.output.get(&1).unwrap();
247 assert_eq!(predicates.len(), cases.len());
248 for ((_, expected), actual) in cases.into_iter().zip(predicates) {
249 assert_eq!(
250 actual,
251 &Predicate::Range(RangePredicate { range: expected })
252 );
253 }
254 }
255
256 #[test]
257 fn test_collect_comparison_type_mismatch() {
258 let (_d, facotry) =
259 PuffinManagerFactory::new_for_test_block("test_collect_comparison_type_mismatch_");
260 let metadata = test_region_metadata();
261 let mut builder = InvertedIndexApplierBuilder::new(
262 "test".to_string(),
263 PathType::Bare,
264 test_object_store(),
265 &metadata,
266 HashSet::from_iter([1, 2, 3]),
267 facotry,
268 );
269
270 let res = builder.collect_comparison_expr(&tag_column(), &Operator::Lt, &int64_lit(10));
271 assert!(matches!(res, Err(Error::Encode { .. })));
272 assert!(builder.output.is_empty());
273 }
274
275 #[test]
276 fn test_collect_comparison_field_column() {
277 let (_d, facotry) =
278 PuffinManagerFactory::new_for_test_block("test_collect_comparison_field_column_");
279 let metadata = test_region_metadata();
280 let mut builder = InvertedIndexApplierBuilder::new(
281 "test".to_string(),
282 PathType::Bare,
283 test_object_store(),
284 &metadata,
285 HashSet::from_iter([1, 2, 3]),
286 facotry,
287 );
288
289 builder
290 .collect_comparison_expr(&field_column(), &Operator::Lt, &string_lit("abc"))
291 .unwrap();
292
293 let predicates = builder.output.get(&3).unwrap();
294 assert_eq!(predicates.len(), 1);
295 assert_eq!(
296 predicates[0],
297 Predicate::Range(RangePredicate {
298 range: Range {
299 lower: None,
300 upper: Some(Bound {
301 inclusive: false,
302 value: encoded_string("abc"),
303 }),
304 }
305 })
306 );
307 }
308
309 #[test]
310 fn test_collect_comparison_nonexistent_column() {
311 let (_d, facotry) =
312 PuffinManagerFactory::new_for_test_block("test_collect_comparison_nonexistent_column_");
313 let metadata = test_region_metadata();
314 let mut builder = InvertedIndexApplierBuilder::new(
315 "test".to_string(),
316 PathType::Bare,
317 test_object_store(),
318 &metadata,
319 HashSet::from_iter([1, 2, 3]),
320 facotry,
321 );
322
323 let res = builder.collect_comparison_expr(
324 &nonexistent_column(),
325 &Operator::Lt,
326 &string_lit("abc"),
327 );
328 assert!(matches!(res, Err(Error::ColumnNotFound { .. })));
329 assert!(builder.output.is_empty());
330 }
331}