mito2/sst/index/inverted_index/applier/
builder.rs1mod between;
16mod comparison;
17mod eq_list;
18mod in_list;
19mod regex_match;
20
21use std::collections::{BTreeMap, HashSet};
22
23use common_telemetry::warn;
24use datafusion_common::ScalarValue;
25use datafusion_expr::{BinaryExpr, Expr, Operator};
26use datatypes::data_type::ConcreteDataType;
27use datatypes::value::Value;
28use index::inverted_index::search::index_apply::PredicatesIndexApplier;
29use index::inverted_index::search::predicate::Predicate;
30use index::target::IndexTarget;
31use mito_codec::index::IndexValueCodec;
32use mito_codec::row_converter::SortField;
33use object_store::ObjectStore;
34use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
35use snafu::{OptionExt, ResultExt};
36use store_api::metadata::RegionMetadata;
37use store_api::region_request::PathType;
38use store_api::storage::ColumnId;
39
40use crate::cache::file_cache::FileCacheRef;
41use crate::cache::index::inverted_index::InvertedIndexCacheRef;
42use crate::error::{
43 BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, EncodeSnafu, Result,
44};
45use crate::sst::index::inverted_index::applier::InvertedIndexApplier;
46use crate::sst::index::puffin_manager::PuffinManagerFactory;
47
48pub(crate) struct InvertedIndexApplierBuilder<'a> {
50 table_dir: String,
52
53 path_type: PathType,
55
56 object_store: ObjectStore,
58
59 file_cache: Option<FileCacheRef>,
61
62 metadata: &'a RegionMetadata,
64
65 indexed_column_ids: HashSet<ColumnId>,
67
68 output: BTreeMap<ColumnId, Vec<Predicate>>,
70
71 puffin_manager_factory: PuffinManagerFactory,
73
74 inverted_index_cache: Option<InvertedIndexCacheRef>,
76
77 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
79}
80
81impl<'a> InvertedIndexApplierBuilder<'a> {
82 pub fn new(
84 table_dir: String,
85 path_type: PathType,
86 object_store: ObjectStore,
87 metadata: &'a RegionMetadata,
88 indexed_column_ids: HashSet<ColumnId>,
89 puffin_manager_factory: PuffinManagerFactory,
90 ) -> Self {
91 Self {
92 table_dir,
93 path_type,
94 object_store,
95 metadata,
96 indexed_column_ids,
97 output: BTreeMap::default(),
98 puffin_manager_factory,
99 file_cache: None,
100 inverted_index_cache: None,
101 puffin_metadata_cache: None,
102 }
103 }
104
105 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
107 self.file_cache = file_cache;
108 self
109 }
110
111 pub fn with_puffin_metadata_cache(
113 mut self,
114 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
115 ) -> Self {
116 self.puffin_metadata_cache = puffin_metadata_cache;
117 self
118 }
119
120 pub fn with_inverted_index_cache(
122 mut self,
123 inverted_index_cache: Option<InvertedIndexCacheRef>,
124 ) -> Self {
125 self.inverted_index_cache = inverted_index_cache;
126 self
127 }
128
129 pub fn build(mut self, exprs: &[Expr]) -> Result<Option<InvertedIndexApplier>> {
132 for expr in exprs {
133 self.traverse_and_collect(expr);
134 }
135
136 if self.output.is_empty() {
137 return Ok(None);
138 }
139
140 let predicates = self
141 .output
142 .iter()
143 .map(|(column_id, predicates)| {
144 (
145 format!("{}", IndexTarget::ColumnId(*column_id)),
146 predicates.clone(),
147 )
148 })
149 .collect::<Vec<_>>();
150 let applier = PredicatesIndexApplier::try_from(predicates);
151
152 Ok(Some(
153 InvertedIndexApplier::new(
154 self.table_dir,
155 self.path_type,
156 self.object_store,
157 Box::new(applier.context(BuildIndexApplierSnafu)?),
158 self.puffin_manager_factory,
159 self.output,
160 )
161 .with_file_cache(self.file_cache)
162 .with_puffin_metadata_cache(self.puffin_metadata_cache)
163 .with_index_cache(self.inverted_index_cache),
164 ))
165 }
166
167 fn traverse_and_collect(&mut self, expr: &Expr) {
170 let res = match expr {
171 Expr::Between(between) => self.collect_between(between),
172
173 Expr::InList(in_list) => self.collect_inlist(in_list),
174 Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
175 Operator::And => {
176 self.traverse_and_collect(left);
177 self.traverse_and_collect(right);
178 Ok(())
179 }
180 Operator::Or => self.collect_or_eq_list(left, right),
181 Operator::Eq => self.collect_eq(left, right),
182 Operator::Lt | Operator::LtEq | Operator::Gt | Operator::GtEq => {
183 self.collect_comparison_expr(left, op, right)
184 }
185 Operator::RegexMatch => self.collect_regex_match(left, right),
186 _ => Ok(()),
187 },
188
189 _ => Ok(()),
191 };
192
193 if let Err(err) = res {
194 warn!(err; "Failed to collect predicates, ignore it. expr: {expr}");
195 }
196 }
197
198 fn add_predicate(&mut self, column_id: ColumnId, predicate: Predicate) {
200 self.output.entry(column_id).or_default().push(predicate);
201 }
202
203 fn column_id_and_type(
206 &self,
207 column_name: &str,
208 ) -> Result<Option<(ColumnId, ConcreteDataType)>> {
209 let column = self
210 .metadata
211 .column_by_name(column_name)
212 .context(ColumnNotFoundSnafu {
213 column: column_name,
214 })?;
215
216 if !self.indexed_column_ids.contains(&column.column_id) {
217 return Ok(None);
218 }
219
220 Ok(Some((
221 column.column_id,
222 column.column_schema.data_type.clone(),
223 )))
224 }
225
226 fn nonnull_lit(expr: &Expr) -> Option<&ScalarValue> {
228 match expr {
229 Expr::Literal(lit, _) if !lit.is_null() => Some(lit),
230 _ => None,
231 }
232 }
233
234 fn column_name(expr: &Expr) -> Option<&str> {
236 match expr {
237 Expr::Column(column) => Some(&column.name),
238 _ => None,
239 }
240 }
241
242 fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result<Vec<u8>> {
244 let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
245 let mut bytes = vec![];
246 let field = SortField::new(data_type);
247 IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)
248 .context(EncodeSnafu)?;
249 Ok(bytes)
250 }
251}
252
253#[cfg(test)]
254mod tests {
255 use api::v1::SemanticType;
256 use datafusion_common::Column;
257 use datafusion_expr::{Between, Literal};
258 use datatypes::data_type::ConcreteDataType;
259 use datatypes::schema::ColumnSchema;
260 use index::inverted_index::search::predicate::{
261 Bound, Range, RangePredicate, RegexMatchPredicate,
262 };
263 use object_store::ObjectStore;
264 use object_store::services::Memory;
265 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
266 use store_api::storage::RegionId;
267
268 use super::*;
269
270 pub(crate) fn test_region_metadata() -> RegionMetadata {
271 let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678));
272 builder
273 .push_column_metadata(ColumnMetadata {
274 column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
275 semantic_type: SemanticType::Tag,
276 column_id: 1,
277 })
278 .push_column_metadata(ColumnMetadata {
279 column_schema: ColumnSchema::new("b", ConcreteDataType::int64_datatype(), false),
280 semantic_type: SemanticType::Tag,
281 column_id: 2,
282 })
283 .push_column_metadata(ColumnMetadata {
284 column_schema: ColumnSchema::new("c", ConcreteDataType::string_datatype(), false),
285 semantic_type: SemanticType::Field,
286 column_id: 3,
287 })
288 .push_column_metadata(ColumnMetadata {
289 column_schema: ColumnSchema::new(
290 "d",
291 ConcreteDataType::timestamp_millisecond_datatype(),
292 false,
293 ),
294 semantic_type: SemanticType::Timestamp,
295 column_id: 4,
296 })
297 .primary_key(vec![1, 2]);
298 builder.build().unwrap()
299 }
300
301 pub(crate) fn test_object_store() -> ObjectStore {
302 ObjectStore::new(Memory::default()).unwrap().finish()
303 }
304
305 pub(crate) fn tag_column() -> Expr {
306 Expr::Column(Column::from_name("a"))
307 }
308
309 pub(crate) fn tag_column2() -> Expr {
310 Expr::Column(Column::from_name("b"))
311 }
312
313 pub(crate) fn field_column() -> Expr {
314 Expr::Column(Column::from_name("c"))
315 }
316
317 pub(crate) fn nonexistent_column() -> Expr {
318 Expr::Column(Column::from_name("nonexistence"))
319 }
320
321 pub(crate) fn string_lit(s: impl Into<String>) -> Expr {
322 s.into().lit()
323 }
324
325 pub(crate) fn int64_lit(i: impl Into<i64>) -> Expr {
326 i.into().lit()
327 }
328
329 pub(crate) fn encoded_string(s: impl Into<String>) -> Vec<u8> {
330 let mut bytes = vec![];
331 IndexValueCodec::encode_nonnull_value(
332 Value::from(s.into()).as_value_ref(),
333 &SortField::new(ConcreteDataType::string_datatype()),
334 &mut bytes,
335 )
336 .unwrap();
337 bytes
338 }
339
340 pub(crate) fn encoded_int64(s: impl Into<i64>) -> Vec<u8> {
341 let mut bytes = vec![];
342 IndexValueCodec::encode_nonnull_value(
343 Value::from(s.into()).as_value_ref(),
344 &SortField::new(ConcreteDataType::int64_datatype()),
345 &mut bytes,
346 )
347 .unwrap();
348 bytes
349 }
350
351 #[test]
352 fn test_collect_and_basic() {
353 let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_and_basic_");
354
355 let metadata = test_region_metadata();
356 let mut builder = InvertedIndexApplierBuilder::new(
357 "test".to_string(),
358 PathType::Bare,
359 test_object_store(),
360 &metadata,
361 HashSet::from_iter([1, 2, 3]),
362 facotry,
363 );
364
365 let expr = Expr::BinaryExpr(BinaryExpr {
366 left: Box::new(Expr::BinaryExpr(BinaryExpr {
367 left: Box::new(tag_column()),
368 op: Operator::RegexMatch,
369 right: Box::new(string_lit("bar")),
370 })),
371 op: Operator::And,
372 right: Box::new(Expr::Between(Between {
373 expr: Box::new(tag_column2()),
374 negated: false,
375 low: Box::new(int64_lit(123)),
376 high: Box::new(int64_lit(456)),
377 })),
378 });
379
380 builder.traverse_and_collect(&expr);
381 let predicates = builder.output.get(&1).unwrap();
382 assert_eq!(predicates.len(), 1);
383 assert_eq!(
384 predicates[0],
385 Predicate::RegexMatch(RegexMatchPredicate {
386 pattern: "bar".to_string()
387 })
388 );
389 let predicates = builder.output.get(&2).unwrap();
390 assert_eq!(predicates.len(), 1);
391 assert_eq!(
392 predicates[0],
393 Predicate::Range(RangePredicate {
394 range: Range {
395 lower: Some(Bound {
396 inclusive: true,
397 value: encoded_int64(123),
398 }),
399 upper: Some(Bound {
400 inclusive: true,
401 value: encoded_int64(456),
402 }),
403 }
404 })
405 );
406 }
407}