mito2/sst/index/inverted_index/applier/
builder.rs1mod between;
16mod comparison;
17mod eq_list;
18mod in_list;
19mod regex_match;
20
21use std::collections::{BTreeMap, HashSet};
22
23use common_telemetry::warn;
24use datafusion_common::ScalarValue;
25use datafusion_expr::{BinaryExpr, Expr, Operator};
26use datatypes::data_type::ConcreteDataType;
27use datatypes::value::Value;
28use index::inverted_index::search::index_apply::PredicatesIndexApplier;
29use index::inverted_index::search::predicate::Predicate;
30use mito_codec::index::IndexValueCodec;
31use mito_codec::row_converter::SortField;
32use object_store::ObjectStore;
33use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
34use snafu::{OptionExt, ResultExt};
35use store_api::metadata::RegionMetadata;
36use store_api::region_request::PathType;
37use store_api::storage::ColumnId;
38
39use crate::cache::file_cache::FileCacheRef;
40use crate::cache::index::inverted_index::InvertedIndexCacheRef;
41use crate::error::{
42 BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, EncodeSnafu, Result,
43};
44use crate::sst::index::inverted_index::applier::InvertedIndexApplier;
45use crate::sst::index::puffin_manager::PuffinManagerFactory;
46
47pub(crate) struct InvertedIndexApplierBuilder<'a> {
49 table_dir: String,
51
52 path_type: PathType,
54
55 object_store: ObjectStore,
57
58 file_cache: Option<FileCacheRef>,
60
61 metadata: &'a RegionMetadata,
63
64 indexed_column_ids: HashSet<ColumnId>,
66
67 output: BTreeMap<ColumnId, Vec<Predicate>>,
69
70 puffin_manager_factory: PuffinManagerFactory,
72
73 inverted_index_cache: Option<InvertedIndexCacheRef>,
75
76 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
78}
79
80impl<'a> InvertedIndexApplierBuilder<'a> {
81 pub fn new(
83 table_dir: String,
84 path_type: PathType,
85 object_store: ObjectStore,
86 metadata: &'a RegionMetadata,
87 indexed_column_ids: HashSet<ColumnId>,
88 puffin_manager_factory: PuffinManagerFactory,
89 ) -> Self {
90 Self {
91 table_dir,
92 path_type,
93 object_store,
94 metadata,
95 indexed_column_ids,
96 output: BTreeMap::default(),
97 puffin_manager_factory,
98 file_cache: None,
99 inverted_index_cache: None,
100 puffin_metadata_cache: None,
101 }
102 }
103
104 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
106 self.file_cache = file_cache;
107 self
108 }
109
110 pub fn with_puffin_metadata_cache(
112 mut self,
113 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
114 ) -> Self {
115 self.puffin_metadata_cache = puffin_metadata_cache;
116 self
117 }
118
119 pub fn with_inverted_index_cache(
121 mut self,
122 inverted_index_cache: Option<InvertedIndexCacheRef>,
123 ) -> Self {
124 self.inverted_index_cache = inverted_index_cache;
125 self
126 }
127
128 pub fn build(mut self, exprs: &[Expr]) -> Result<Option<InvertedIndexApplier>> {
131 for expr in exprs {
132 self.traverse_and_collect(expr);
133 }
134
135 if self.output.is_empty() {
136 return Ok(None);
137 }
138
139 let predicates = self
140 .output
141 .iter()
142 .map(|(column_id, predicates)| (column_id.to_string(), predicates.clone()))
143 .collect();
144 let applier = PredicatesIndexApplier::try_from(predicates);
145
146 Ok(Some(
147 InvertedIndexApplier::new(
148 self.table_dir,
149 self.path_type,
150 self.object_store,
151 Box::new(applier.context(BuildIndexApplierSnafu)?),
152 self.puffin_manager_factory,
153 self.output,
154 )
155 .with_file_cache(self.file_cache)
156 .with_puffin_metadata_cache(self.puffin_metadata_cache)
157 .with_index_cache(self.inverted_index_cache),
158 ))
159 }
160
161 fn traverse_and_collect(&mut self, expr: &Expr) {
164 let res = match expr {
165 Expr::Between(between) => self.collect_between(between),
166
167 Expr::InList(in_list) => self.collect_inlist(in_list),
168 Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
169 Operator::And => {
170 self.traverse_and_collect(left);
171 self.traverse_and_collect(right);
172 Ok(())
173 }
174 Operator::Or => self.collect_or_eq_list(left, right),
175 Operator::Eq => self.collect_eq(left, right),
176 Operator::Lt | Operator::LtEq | Operator::Gt | Operator::GtEq => {
177 self.collect_comparison_expr(left, op, right)
178 }
179 Operator::RegexMatch => self.collect_regex_match(left, right),
180 _ => Ok(()),
181 },
182
183 _ => Ok(()),
185 };
186
187 if let Err(err) = res {
188 warn!(err; "Failed to collect predicates, ignore it. expr: {expr}");
189 }
190 }
191
192 fn add_predicate(&mut self, column_id: ColumnId, predicate: Predicate) {
194 self.output.entry(column_id).or_default().push(predicate);
195 }
196
197 fn column_id_and_type(
200 &self,
201 column_name: &str,
202 ) -> Result<Option<(ColumnId, ConcreteDataType)>> {
203 let column = self
204 .metadata
205 .column_by_name(column_name)
206 .context(ColumnNotFoundSnafu {
207 column: column_name,
208 })?;
209
210 if !self.indexed_column_ids.contains(&column.column_id) {
211 return Ok(None);
212 }
213
214 Ok(Some((
215 column.column_id,
216 column.column_schema.data_type.clone(),
217 )))
218 }
219
220 fn nonnull_lit(expr: &Expr) -> Option<&ScalarValue> {
222 match expr {
223 Expr::Literal(lit) if !lit.is_null() => Some(lit),
224 _ => None,
225 }
226 }
227
228 fn column_name(expr: &Expr) -> Option<&str> {
230 match expr {
231 Expr::Column(column) => Some(&column.name),
232 _ => None,
233 }
234 }
235
236 fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result<Vec<u8>> {
238 let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
239 let mut bytes = vec![];
240 let field = SortField::new(data_type);
241 IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)
242 .context(EncodeSnafu)?;
243 Ok(bytes)
244 }
245}
246
247#[cfg(test)]
248mod tests {
249 use api::v1::SemanticType;
250 use datafusion_common::Column;
251 use datafusion_expr::Between;
252 use datatypes::data_type::ConcreteDataType;
253 use datatypes::schema::ColumnSchema;
254 use index::inverted_index::search::predicate::{
255 Bound, Range, RangePredicate, RegexMatchPredicate,
256 };
257 use object_store::services::Memory;
258 use object_store::ObjectStore;
259 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
260 use store_api::storage::RegionId;
261
262 use super::*;
263
264 pub(crate) fn test_region_metadata() -> RegionMetadata {
265 let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678));
266 builder
267 .push_column_metadata(ColumnMetadata {
268 column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
269 semantic_type: SemanticType::Tag,
270 column_id: 1,
271 })
272 .push_column_metadata(ColumnMetadata {
273 column_schema: ColumnSchema::new("b", ConcreteDataType::int64_datatype(), false),
274 semantic_type: SemanticType::Tag,
275 column_id: 2,
276 })
277 .push_column_metadata(ColumnMetadata {
278 column_schema: ColumnSchema::new("c", ConcreteDataType::string_datatype(), false),
279 semantic_type: SemanticType::Field,
280 column_id: 3,
281 })
282 .push_column_metadata(ColumnMetadata {
283 column_schema: ColumnSchema::new(
284 "d",
285 ConcreteDataType::timestamp_millisecond_datatype(),
286 false,
287 ),
288 semantic_type: SemanticType::Timestamp,
289 column_id: 4,
290 })
291 .primary_key(vec![1, 2]);
292 builder.build().unwrap()
293 }
294
295 pub(crate) fn test_object_store() -> ObjectStore {
296 ObjectStore::new(Memory::default()).unwrap().finish()
297 }
298
299 pub(crate) fn tag_column() -> Expr {
300 Expr::Column(Column::from_name("a"))
301 }
302
303 pub(crate) fn tag_column2() -> Expr {
304 Expr::Column(Column::from_name("b"))
305 }
306
307 pub(crate) fn field_column() -> Expr {
308 Expr::Column(Column::from_name("c"))
309 }
310
311 pub(crate) fn nonexistent_column() -> Expr {
312 Expr::Column(Column::from_name("nonexistence"))
313 }
314
315 pub(crate) fn string_lit(s: impl Into<String>) -> Expr {
316 Expr::Literal(ScalarValue::Utf8(Some(s.into())))
317 }
318
319 pub(crate) fn int64_lit(i: impl Into<i64>) -> Expr {
320 Expr::Literal(ScalarValue::Int64(Some(i.into())))
321 }
322
323 pub(crate) fn encoded_string(s: impl Into<String>) -> Vec<u8> {
324 let mut bytes = vec![];
325 IndexValueCodec::encode_nonnull_value(
326 Value::from(s.into()).as_value_ref(),
327 &SortField::new(ConcreteDataType::string_datatype()),
328 &mut bytes,
329 )
330 .unwrap();
331 bytes
332 }
333
334 pub(crate) fn encoded_int64(s: impl Into<i64>) -> Vec<u8> {
335 let mut bytes = vec![];
336 IndexValueCodec::encode_nonnull_value(
337 Value::from(s.into()).as_value_ref(),
338 &SortField::new(ConcreteDataType::int64_datatype()),
339 &mut bytes,
340 )
341 .unwrap();
342 bytes
343 }
344
345 #[test]
346 fn test_collect_and_basic() {
347 let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_and_basic_");
348
349 let metadata = test_region_metadata();
350 let mut builder = InvertedIndexApplierBuilder::new(
351 "test".to_string(),
352 PathType::Bare,
353 test_object_store(),
354 &metadata,
355 HashSet::from_iter([1, 2, 3]),
356 facotry,
357 );
358
359 let expr = Expr::BinaryExpr(BinaryExpr {
360 left: Box::new(Expr::BinaryExpr(BinaryExpr {
361 left: Box::new(tag_column()),
362 op: Operator::RegexMatch,
363 right: Box::new(string_lit("bar")),
364 })),
365 op: Operator::And,
366 right: Box::new(Expr::Between(Between {
367 expr: Box::new(tag_column2()),
368 negated: false,
369 low: Box::new(int64_lit(123)),
370 high: Box::new(int64_lit(456)),
371 })),
372 });
373
374 builder.traverse_and_collect(&expr);
375 let predicates = builder.output.get(&1).unwrap();
376 assert_eq!(predicates.len(), 1);
377 assert_eq!(
378 predicates[0],
379 Predicate::RegexMatch(RegexMatchPredicate {
380 pattern: "bar".to_string()
381 })
382 );
383 let predicates = builder.output.get(&2).unwrap();
384 assert_eq!(predicates.len(), 1);
385 assert_eq!(
386 predicates[0],
387 Predicate::Range(RangePredicate {
388 range: Range {
389 lower: Some(Bound {
390 inclusive: true,
391 value: encoded_int64(123),
392 }),
393 upper: Some(Bound {
394 inclusive: true,
395 value: encoded_int64(456),
396 }),
397 }
398 })
399 );
400 }
401}