mito2/sst/index/inverted_index/applier/
builder.rs1mod between;
16mod comparison;
17mod eq_list;
18mod in_list;
19mod regex_match;
20
21use std::collections::{BTreeMap, HashSet};
22
23use common_telemetry::warn;
24use datafusion_common::ScalarValue;
25use datafusion_expr::{BinaryExpr, Expr, Operator};
26use datatypes::data_type::ConcreteDataType;
27use datatypes::value::Value;
28use index::inverted_index::search::index_apply::PredicatesIndexApplier;
29use index::inverted_index::search::predicate::Predicate;
30use object_store::ObjectStore;
31use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
32use snafu::{OptionExt, ResultExt};
33use store_api::metadata::RegionMetadata;
34use store_api::storage::ColumnId;
35
36use crate::cache::file_cache::FileCacheRef;
37use crate::cache::index::inverted_index::InvertedIndexCacheRef;
38use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Result};
39use crate::row_converter::SortField;
40use crate::sst::index::codec::IndexValueCodec;
41use crate::sst::index::inverted_index::applier::InvertedIndexApplier;
42use crate::sst::index::puffin_manager::PuffinManagerFactory;
43
44pub(crate) struct InvertedIndexApplierBuilder<'a> {
46 region_dir: String,
48
49 object_store: ObjectStore,
51
52 file_cache: Option<FileCacheRef>,
54
55 metadata: &'a RegionMetadata,
57
58 indexed_column_ids: HashSet<ColumnId>,
60
61 output: BTreeMap<ColumnId, Vec<Predicate>>,
63
64 puffin_manager_factory: PuffinManagerFactory,
66
67 inverted_index_cache: Option<InvertedIndexCacheRef>,
69
70 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
72}
73
74impl<'a> InvertedIndexApplierBuilder<'a> {
75 pub fn new(
77 region_dir: String,
78 object_store: ObjectStore,
79 metadata: &'a RegionMetadata,
80 indexed_column_ids: HashSet<ColumnId>,
81 puffin_manager_factory: PuffinManagerFactory,
82 ) -> Self {
83 Self {
84 region_dir,
85 object_store,
86 metadata,
87 indexed_column_ids,
88 output: BTreeMap::default(),
89 puffin_manager_factory,
90 file_cache: None,
91 inverted_index_cache: None,
92 puffin_metadata_cache: None,
93 }
94 }
95
96 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
98 self.file_cache = file_cache;
99 self
100 }
101
102 pub fn with_puffin_metadata_cache(
104 mut self,
105 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
106 ) -> Self {
107 self.puffin_metadata_cache = puffin_metadata_cache;
108 self
109 }
110
111 pub fn with_inverted_index_cache(
113 mut self,
114 inverted_index_cache: Option<InvertedIndexCacheRef>,
115 ) -> Self {
116 self.inverted_index_cache = inverted_index_cache;
117 self
118 }
119
120 pub fn build(mut self, exprs: &[Expr]) -> Result<Option<InvertedIndexApplier>> {
123 for expr in exprs {
124 self.traverse_and_collect(expr);
125 }
126
127 if self.output.is_empty() {
128 return Ok(None);
129 }
130
131 let predicates = self
132 .output
133 .iter()
134 .map(|(column_id, predicates)| (column_id.to_string(), predicates.clone()))
135 .collect();
136 let applier = PredicatesIndexApplier::try_from(predicates);
137
138 Ok(Some(
139 InvertedIndexApplier::new(
140 self.region_dir,
141 self.metadata.region_id,
142 self.object_store,
143 Box::new(applier.context(BuildIndexApplierSnafu)?),
144 self.puffin_manager_factory,
145 self.output,
146 )
147 .with_file_cache(self.file_cache)
148 .with_puffin_metadata_cache(self.puffin_metadata_cache)
149 .with_index_cache(self.inverted_index_cache),
150 ))
151 }
152
153 fn traverse_and_collect(&mut self, expr: &Expr) {
156 let res = match expr {
157 Expr::Between(between) => self.collect_between(between),
158
159 Expr::InList(in_list) => self.collect_inlist(in_list),
160 Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
161 Operator::And => {
162 self.traverse_and_collect(left);
163 self.traverse_and_collect(right);
164 Ok(())
165 }
166 Operator::Or => self.collect_or_eq_list(left, right),
167 Operator::Eq => self.collect_eq(left, right),
168 Operator::Lt | Operator::LtEq | Operator::Gt | Operator::GtEq => {
169 self.collect_comparison_expr(left, op, right)
170 }
171 Operator::RegexMatch => self.collect_regex_match(left, right),
172 _ => Ok(()),
173 },
174
175 _ => Ok(()),
177 };
178
179 if let Err(err) = res {
180 warn!(err; "Failed to collect predicates, ignore it. expr: {expr}");
181 }
182 }
183
184 fn add_predicate(&mut self, column_id: ColumnId, predicate: Predicate) {
186 self.output.entry(column_id).or_default().push(predicate);
187 }
188
189 fn column_id_and_type(
192 &self,
193 column_name: &str,
194 ) -> Result<Option<(ColumnId, ConcreteDataType)>> {
195 let column = self
196 .metadata
197 .column_by_name(column_name)
198 .context(ColumnNotFoundSnafu {
199 column: column_name,
200 })?;
201
202 if !self.indexed_column_ids.contains(&column.column_id) {
203 return Ok(None);
204 }
205
206 Ok(Some((
207 column.column_id,
208 column.column_schema.data_type.clone(),
209 )))
210 }
211
212 fn nonnull_lit(expr: &Expr) -> Option<&ScalarValue> {
214 match expr {
215 Expr::Literal(lit) if !lit.is_null() => Some(lit),
216 _ => None,
217 }
218 }
219
220 fn column_name(expr: &Expr) -> Option<&str> {
222 match expr {
223 Expr::Column(column) => Some(&column.name),
224 _ => None,
225 }
226 }
227
228 fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result<Vec<u8>> {
230 let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
231 let mut bytes = vec![];
232 let field = SortField::new(data_type);
233 IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)?;
234 Ok(bytes)
235 }
236}
237
238#[cfg(test)]
239mod tests {
240 use api::v1::SemanticType;
241 use datafusion_common::Column;
242 use datafusion_expr::Between;
243 use datatypes::data_type::ConcreteDataType;
244 use datatypes::schema::ColumnSchema;
245 use index::inverted_index::search::predicate::{
246 Bound, Range, RangePredicate, RegexMatchPredicate,
247 };
248 use object_store::services::Memory;
249 use object_store::ObjectStore;
250 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
251 use store_api::storage::RegionId;
252
253 use super::*;
254
255 pub(crate) fn test_region_metadata() -> RegionMetadata {
256 let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678));
257 builder
258 .push_column_metadata(ColumnMetadata {
259 column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
260 semantic_type: SemanticType::Tag,
261 column_id: 1,
262 })
263 .push_column_metadata(ColumnMetadata {
264 column_schema: ColumnSchema::new("b", ConcreteDataType::int64_datatype(), false),
265 semantic_type: SemanticType::Tag,
266 column_id: 2,
267 })
268 .push_column_metadata(ColumnMetadata {
269 column_schema: ColumnSchema::new("c", ConcreteDataType::string_datatype(), false),
270 semantic_type: SemanticType::Field,
271 column_id: 3,
272 })
273 .push_column_metadata(ColumnMetadata {
274 column_schema: ColumnSchema::new(
275 "d",
276 ConcreteDataType::timestamp_millisecond_datatype(),
277 false,
278 ),
279 semantic_type: SemanticType::Timestamp,
280 column_id: 4,
281 })
282 .primary_key(vec![1, 2]);
283 builder.build().unwrap()
284 }
285
286 pub(crate) fn test_object_store() -> ObjectStore {
287 ObjectStore::new(Memory::default()).unwrap().finish()
288 }
289
290 pub(crate) fn tag_column() -> Expr {
291 Expr::Column(Column::from_name("a"))
292 }
293
294 pub(crate) fn tag_column2() -> Expr {
295 Expr::Column(Column::from_name("b"))
296 }
297
298 pub(crate) fn field_column() -> Expr {
299 Expr::Column(Column::from_name("c"))
300 }
301
302 pub(crate) fn nonexistent_column() -> Expr {
303 Expr::Column(Column::from_name("nonexistence"))
304 }
305
306 pub(crate) fn string_lit(s: impl Into<String>) -> Expr {
307 Expr::Literal(ScalarValue::Utf8(Some(s.into())))
308 }
309
310 pub(crate) fn int64_lit(i: impl Into<i64>) -> Expr {
311 Expr::Literal(ScalarValue::Int64(Some(i.into())))
312 }
313
314 pub(crate) fn encoded_string(s: impl Into<String>) -> Vec<u8> {
315 let mut bytes = vec![];
316 IndexValueCodec::encode_nonnull_value(
317 Value::from(s.into()).as_value_ref(),
318 &SortField::new(ConcreteDataType::string_datatype()),
319 &mut bytes,
320 )
321 .unwrap();
322 bytes
323 }
324
325 pub(crate) fn encoded_int64(s: impl Into<i64>) -> Vec<u8> {
326 let mut bytes = vec![];
327 IndexValueCodec::encode_nonnull_value(
328 Value::from(s.into()).as_value_ref(),
329 &SortField::new(ConcreteDataType::int64_datatype()),
330 &mut bytes,
331 )
332 .unwrap();
333 bytes
334 }
335
336 #[test]
337 fn test_collect_and_basic() {
338 let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_and_basic_");
339
340 let metadata = test_region_metadata();
341 let mut builder = InvertedIndexApplierBuilder::new(
342 "test".to_string(),
343 test_object_store(),
344 &metadata,
345 HashSet::from_iter([1, 2, 3]),
346 facotry,
347 );
348
349 let expr = Expr::BinaryExpr(BinaryExpr {
350 left: Box::new(Expr::BinaryExpr(BinaryExpr {
351 left: Box::new(tag_column()),
352 op: Operator::RegexMatch,
353 right: Box::new(string_lit("bar")),
354 })),
355 op: Operator::And,
356 right: Box::new(Expr::Between(Between {
357 expr: Box::new(tag_column2()),
358 negated: false,
359 low: Box::new(int64_lit(123)),
360 high: Box::new(int64_lit(456)),
361 })),
362 });
363
364 builder.traverse_and_collect(&expr);
365 let predicates = builder.output.get(&1).unwrap();
366 assert_eq!(predicates.len(), 1);
367 assert_eq!(
368 predicates[0],
369 Predicate::RegexMatch(RegexMatchPredicate {
370 pattern: "bar".to_string()
371 })
372 );
373 let predicates = builder.output.get(&2).unwrap();
374 assert_eq!(predicates.len(), 1);
375 assert_eq!(
376 predicates[0],
377 Predicate::Range(RangePredicate {
378 range: Range {
379 lower: Some(Bound {
380 inclusive: true,
381 value: encoded_int64(123),
382 }),
383 upper: Some(Bound {
384 inclusive: true,
385 value: encoded_int64(456),
386 }),
387 }
388 })
389 );
390 }
391}