1use datafusion_expr::{Expr as DfExpr, Operator};
16use index::inverted_index::search::predicate::{Bound, Predicate, Range, RangePredicate};
17use index::Bytes;
18
19use crate::error::Result;
20use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
21
22impl InvertedIndexApplierBuilder<'_> {
23 pub(crate) fn collect_comparison_expr(
27 &mut self,
28 left: &DfExpr,
29 op: &Operator,
30 right: &DfExpr,
31 ) -> Result<()> {
32 match op {
33 Operator::Lt => {
34 if matches!(right, DfExpr::Column(_)) {
35 self.collect_column_gt_lit(right, left)
36 } else {
37 self.collect_column_lt_lit(left, right)
38 }
39 }
40 Operator::LtEq => {
41 if matches!(right, DfExpr::Column(_)) {
42 self.collect_column_ge_lit(right, left)
43 } else {
44 self.collect_column_le_lit(left, right)
45 }
46 }
47 Operator::Gt => {
48 if matches!(right, DfExpr::Column(_)) {
49 self.collect_column_lt_lit(right, left)
50 } else {
51 self.collect_column_gt_lit(left, right)
52 }
53 }
54 Operator::GtEq => {
55 if matches!(right, DfExpr::Column(_)) {
56 self.collect_column_le_lit(right, left)
57 } else {
58 self.collect_column_ge_lit(left, right)
59 }
60 }
61 _ => Ok(()),
62 }
63 }
64
65 fn collect_column_lt_lit(&mut self, left: &DfExpr, right: &DfExpr) -> Result<()> {
66 self.collect_column_cmp_lit(left, right, |value| Range {
67 lower: None,
68 upper: Some(Bound {
69 inclusive: false,
70 value,
71 }),
72 })
73 }
74
75 fn collect_column_gt_lit(&mut self, left: &DfExpr, right: &DfExpr) -> Result<()> {
76 self.collect_column_cmp_lit(left, right, |value| Range {
77 lower: Some(Bound {
78 inclusive: false,
79 value,
80 }),
81 upper: None,
82 })
83 }
84
85 fn collect_column_le_lit(&mut self, left: &DfExpr, right: &DfExpr) -> Result<()> {
86 self.collect_column_cmp_lit(left, right, |value| Range {
87 lower: None,
88 upper: Some(Bound {
89 inclusive: true,
90 value,
91 }),
92 })
93 }
94
95 fn collect_column_ge_lit(&mut self, left: &DfExpr, right: &DfExpr) -> Result<()> {
96 self.collect_column_cmp_lit(left, right, |value| Range {
97 lower: Some(Bound {
98 inclusive: true,
99 value,
100 }),
101 upper: None,
102 })
103 }
104
105 fn collect_column_cmp_lit(
106 &mut self,
107 column: &DfExpr,
108 literal: &DfExpr,
109 range_builder: impl FnOnce(Bytes) -> Range,
110 ) -> Result<()> {
111 let Some(column_name) = Self::column_name(column) else {
112 return Ok(());
113 };
114 let Some(lit) = Self::nonnull_lit(literal) else {
115 return Ok(());
116 };
117 let Some((column_id, data_type)) = self.column_id_and_type(column_name)? else {
118 return Ok(());
119 };
120
121 let predicate = Predicate::Range(RangePredicate {
122 range: range_builder(Self::encode_lit(lit, data_type)?),
123 });
124
125 self.add_predicate(column_id, predicate);
126 Ok(())
127 }
128}
129
130#[cfg(test)]
131mod tests {
132
133 use std::collections::HashSet;
134
135 use super::*;
136 use crate::error::Error;
137 use crate::sst::index::inverted_index::applier::builder::tests::{
138 encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column,
139 test_object_store, test_region_metadata,
140 };
141 use crate::sst::index::puffin_manager::PuffinManagerFactory;
142
143 #[test]
144 fn test_collect_comparison_basic() {
145 let cases = [
146 (
147 (&tag_column(), &Operator::Lt, &string_lit("123")),
148 Range {
149 lower: None,
150 upper: Some(Bound {
151 inclusive: false,
152 value: encoded_string("123"),
153 }),
154 },
155 ),
156 (
157 (&string_lit("123"), &Operator::Lt, &tag_column()),
158 Range {
159 lower: Some(Bound {
160 inclusive: false,
161 value: encoded_string("123"),
162 }),
163 upper: None,
164 },
165 ),
166 (
167 (&tag_column(), &Operator::LtEq, &string_lit("123")),
168 Range {
169 lower: None,
170 upper: Some(Bound {
171 inclusive: true,
172 value: encoded_string("123"),
173 }),
174 },
175 ),
176 (
177 (&string_lit("123"), &Operator::LtEq, &tag_column()),
178 Range {
179 lower: Some(Bound {
180 inclusive: true,
181 value: encoded_string("123"),
182 }),
183 upper: None,
184 },
185 ),
186 (
187 (&tag_column(), &Operator::Gt, &string_lit("123")),
188 Range {
189 lower: Some(Bound {
190 inclusive: false,
191 value: encoded_string("123"),
192 }),
193 upper: None,
194 },
195 ),
196 (
197 (&string_lit("123"), &Operator::Gt, &tag_column()),
198 Range {
199 lower: None,
200 upper: Some(Bound {
201 inclusive: false,
202 value: encoded_string("123"),
203 }),
204 },
205 ),
206 (
207 (&tag_column(), &Operator::GtEq, &string_lit("123")),
208 Range {
209 lower: Some(Bound {
210 inclusive: true,
211 value: encoded_string("123"),
212 }),
213 upper: None,
214 },
215 ),
216 (
217 (&string_lit("123"), &Operator::GtEq, &tag_column()),
218 Range {
219 lower: None,
220 upper: Some(Bound {
221 inclusive: true,
222 value: encoded_string("123"),
223 }),
224 },
225 ),
226 ];
227
228 let (_d, facotry) =
229 PuffinManagerFactory::new_for_test_block("test_collect_comparison_basic_");
230 let metadata = test_region_metadata();
231 let mut builder = InvertedIndexApplierBuilder::new(
232 "test".to_string(),
233 test_object_store(),
234 &metadata,
235 HashSet::from_iter([1, 2, 3]),
236 facotry,
237 );
238
239 for ((left, op, right), _) in &cases {
240 builder.collect_comparison_expr(left, op, right).unwrap();
241 }
242
243 let predicates = builder.output.get(&1).unwrap();
244 assert_eq!(predicates.len(), cases.len());
245 for ((_, expected), actual) in cases.into_iter().zip(predicates) {
246 assert_eq!(
247 actual,
248 &Predicate::Range(RangePredicate { range: expected })
249 );
250 }
251 }
252
253 #[test]
254 fn test_collect_comparison_type_mismatch() {
255 let (_d, facotry) =
256 PuffinManagerFactory::new_for_test_block("test_collect_comparison_type_mismatch_");
257 let metadata = test_region_metadata();
258 let mut builder = InvertedIndexApplierBuilder::new(
259 "test".to_string(),
260 test_object_store(),
261 &metadata,
262 HashSet::from_iter([1, 2, 3]),
263 facotry,
264 );
265
266 let res = builder.collect_comparison_expr(&tag_column(), &Operator::Lt, &int64_lit(10));
267 assert!(matches!(res, Err(Error::FieldTypeMismatch { .. })));
268 assert!(builder.output.is_empty());
269 }
270
271 #[test]
272 fn test_collect_comparison_field_column() {
273 let (_d, facotry) =
274 PuffinManagerFactory::new_for_test_block("test_collect_comparison_field_column_");
275 let metadata = test_region_metadata();
276 let mut builder = InvertedIndexApplierBuilder::new(
277 "test".to_string(),
278 test_object_store(),
279 &metadata,
280 HashSet::from_iter([1, 2, 3]),
281 facotry,
282 );
283
284 builder
285 .collect_comparison_expr(&field_column(), &Operator::Lt, &string_lit("abc"))
286 .unwrap();
287
288 let predicates = builder.output.get(&3).unwrap();
289 assert_eq!(predicates.len(), 1);
290 assert_eq!(
291 predicates[0],
292 Predicate::Range(RangePredicate {
293 range: Range {
294 lower: None,
295 upper: Some(Bound {
296 inclusive: false,
297 value: encoded_string("abc"),
298 }),
299 }
300 })
301 );
302 }
303
304 #[test]
305 fn test_collect_comparison_nonexistent_column() {
306 let (_d, facotry) =
307 PuffinManagerFactory::new_for_test_block("test_collect_comparison_nonexistent_column_");
308 let metadata = test_region_metadata();
309 let mut builder = InvertedIndexApplierBuilder::new(
310 "test".to_string(),
311 test_object_store(),
312 &metadata,
313 HashSet::from_iter([1, 2, 3]),
314 facotry,
315 );
316
317 let res = builder.collect_comparison_expr(
318 &nonexistent_column(),
319 &Operator::Lt,
320 &string_lit("abc"),
321 );
322 assert!(matches!(res, Err(Error::ColumnNotFound { .. })));
323 assert!(builder.output.is_empty());
324 }
325}