mito2/sst/index/inverted_index/applier/
builder.rs1mod between;
16mod comparison;
17mod eq_list;
18mod in_list;
19mod regex_match;
20
21use std::collections::{BTreeMap, HashSet};
22
23use common_telemetry::warn;
24use datafusion_common::ScalarValue;
25use datafusion_expr::{BinaryExpr, Expr, Operator};
26use datatypes::data_type::ConcreteDataType;
27use datatypes::value::Value;
28use index::inverted_index::search::predicate::Predicate;
29use mito_codec::index::IndexValueCodec;
30use mito_codec::row_converter::SortField;
31use object_store::ObjectStore;
32use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
33use snafu::{OptionExt, ResultExt};
34use store_api::metadata::RegionMetadata;
35use store_api::region_request::PathType;
36use store_api::storage::ColumnId;
37
38use crate::cache::file_cache::FileCacheRef;
39use crate::cache::index::inverted_index::InvertedIndexCacheRef;
40use crate::error::{ColumnNotFoundSnafu, ConvertValueSnafu, EncodeSnafu, Result};
41use crate::sst::index::inverted_index::applier::InvertedIndexApplier;
42use crate::sst::index::puffin_manager::PuffinManagerFactory;
43
44pub(crate) struct InvertedIndexApplierBuilder<'a> {
46 table_dir: String,
48
49 path_type: PathType,
51
52 object_store: ObjectStore,
54
55 file_cache: Option<FileCacheRef>,
57
58 metadata: &'a RegionMetadata,
60
61 indexed_column_ids: HashSet<ColumnId>,
63
64 output: BTreeMap<ColumnId, Vec<Predicate>>,
66
67 puffin_manager_factory: PuffinManagerFactory,
69
70 inverted_index_cache: Option<InvertedIndexCacheRef>,
72
73 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
75}
76
77impl<'a> InvertedIndexApplierBuilder<'a> {
78 pub fn new(
80 table_dir: String,
81 path_type: PathType,
82 object_store: ObjectStore,
83 metadata: &'a RegionMetadata,
84 indexed_column_ids: HashSet<ColumnId>,
85 puffin_manager_factory: PuffinManagerFactory,
86 ) -> Self {
87 Self {
88 table_dir,
89 path_type,
90 object_store,
91 metadata,
92 indexed_column_ids,
93 output: BTreeMap::default(),
94 puffin_manager_factory,
95 file_cache: None,
96 inverted_index_cache: None,
97 puffin_metadata_cache: None,
98 }
99 }
100
101 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
103 self.file_cache = file_cache;
104 self
105 }
106
107 pub fn with_puffin_metadata_cache(
109 mut self,
110 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
111 ) -> Self {
112 self.puffin_metadata_cache = puffin_metadata_cache;
113 self
114 }
115
116 pub fn with_inverted_index_cache(
118 mut self,
119 inverted_index_cache: Option<InvertedIndexCacheRef>,
120 ) -> Self {
121 self.inverted_index_cache = inverted_index_cache;
122 self
123 }
124
125 pub fn build(mut self, exprs: &[Expr]) -> Result<Option<InvertedIndexApplier>> {
128 for expr in exprs {
129 self.traverse_and_collect(expr);
130 }
131
132 if self.output.is_empty() {
133 return Ok(None);
134 }
135
136 let expected_predicate_column_types = self.expected_predicate_column_types();
137
138 Ok(Some(
139 InvertedIndexApplier::new(
140 self.table_dir,
141 self.path_type,
142 self.object_store,
143 self.puffin_manager_factory,
144 self.output,
145 expected_predicate_column_types,
146 )?
147 .with_file_cache(self.file_cache)
148 .with_puffin_metadata_cache(self.puffin_metadata_cache)
149 .with_index_cache(self.inverted_index_cache),
150 ))
151 }
152
153 fn expected_predicate_column_types(&self) -> BTreeMap<ColumnId, ConcreteDataType> {
159 self.output
160 .keys()
161 .filter_map(|col_id| {
162 let col = self.metadata.column_by_id(*col_id)?;
163 Some((*col_id, col.column_schema.data_type.clone()))
164 })
165 .collect()
166 }
167
168 fn traverse_and_collect(&mut self, expr: &Expr) {
171 let res = match expr {
172 Expr::Between(between) => self.collect_between(between),
173
174 Expr::InList(in_list) => self.collect_inlist(in_list),
175 Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
176 Operator::And => {
177 self.traverse_and_collect(left);
178 self.traverse_and_collect(right);
179 Ok(())
180 }
181 Operator::Or => self.collect_or_eq_list(left, right),
182 Operator::Eq => self.collect_eq(left, right),
183 Operator::Lt | Operator::LtEq | Operator::Gt | Operator::GtEq => {
184 self.collect_comparison_expr(left, op, right)
185 }
186 Operator::RegexMatch => self.collect_regex_match(left, right),
187 _ => Ok(()),
188 },
189
190 _ => Ok(()),
192 };
193
194 if let Err(err) = res {
195 warn!(err; "Failed to collect predicates, ignore it. expr: {expr}");
196 }
197 }
198
199 fn add_predicate(&mut self, column_id: ColumnId, predicate: Predicate) {
201 self.output.entry(column_id).or_default().push(predicate);
202 }
203
204 fn column_id_and_type(
207 &self,
208 column_name: &str,
209 ) -> Result<Option<(ColumnId, ConcreteDataType)>> {
210 let column = self
211 .metadata
212 .column_by_name(column_name)
213 .context(ColumnNotFoundSnafu {
214 column: column_name,
215 })?;
216
217 if !self.indexed_column_ids.contains(&column.column_id) {
218 return Ok(None);
219 }
220
221 Ok(Some((
222 column.column_id,
223 column.column_schema.data_type.clone(),
224 )))
225 }
226
227 fn nonnull_lit(expr: &Expr) -> Option<&ScalarValue> {
229 match expr {
230 Expr::Literal(lit, _) if !lit.is_null() => Some(lit),
231 _ => None,
232 }
233 }
234
235 fn column_name(expr: &Expr) -> Option<&str> {
237 match expr {
238 Expr::Column(column) => Some(&column.name),
239 _ => None,
240 }
241 }
242
243 fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result<Vec<u8>> {
245 let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
246 let mut bytes = vec![];
247 let field = SortField::new(data_type);
248 IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)
249 .context(EncodeSnafu)?;
250 Ok(bytes)
251 }
252}
253
254#[cfg(test)]
255mod tests {
256 use api::v1::SemanticType;
257 use datafusion_common::Column;
258 use datafusion_expr::{Between, Literal};
259 use datatypes::data_type::ConcreteDataType;
260 use datatypes::schema::ColumnSchema;
261 use index::inverted_index::search::predicate::{
262 Bound, Range, RangePredicate, RegexMatchPredicate,
263 };
264 use object_store::ObjectStore;
265 use object_store::services::Memory;
266 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
267 use store_api::storage::RegionId;
268
269 use super::*;
270
271 pub(crate) fn test_region_metadata() -> RegionMetadata {
272 let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678));
273 builder
274 .push_column_metadata(ColumnMetadata {
275 column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
276 semantic_type: SemanticType::Tag,
277 column_id: 1,
278 })
279 .push_column_metadata(ColumnMetadata {
280 column_schema: ColumnSchema::new("b", ConcreteDataType::int64_datatype(), false),
281 semantic_type: SemanticType::Tag,
282 column_id: 2,
283 })
284 .push_column_metadata(ColumnMetadata {
285 column_schema: ColumnSchema::new("c", ConcreteDataType::string_datatype(), false),
286 semantic_type: SemanticType::Field,
287 column_id: 3,
288 })
289 .push_column_metadata(ColumnMetadata {
290 column_schema: ColumnSchema::new(
291 "d",
292 ConcreteDataType::timestamp_millisecond_datatype(),
293 false,
294 ),
295 semantic_type: SemanticType::Timestamp,
296 column_id: 4,
297 })
298 .primary_key(vec![1, 2]);
299 builder.build().unwrap()
300 }
301
302 pub(crate) fn test_object_store() -> ObjectStore {
303 ObjectStore::new(Memory::default()).unwrap().finish()
304 }
305
306 pub(crate) fn tag_column() -> Expr {
307 Expr::Column(Column::from_name("a"))
308 }
309
310 pub(crate) fn tag_column2() -> Expr {
311 Expr::Column(Column::from_name("b"))
312 }
313
314 pub(crate) fn field_column() -> Expr {
315 Expr::Column(Column::from_name("c"))
316 }
317
318 pub(crate) fn nonexistent_column() -> Expr {
319 Expr::Column(Column::from_name("nonexistence"))
320 }
321
322 pub(crate) fn string_lit(s: impl Into<String>) -> Expr {
323 s.into().lit()
324 }
325
326 pub(crate) fn int64_lit(i: impl Into<i64>) -> Expr {
327 i.into().lit()
328 }
329
330 pub(crate) fn encoded_string(s: impl Into<String>) -> Vec<u8> {
331 let mut bytes = vec![];
332 IndexValueCodec::encode_nonnull_value(
333 Value::from(s.into()).as_value_ref(),
334 &SortField::new(ConcreteDataType::string_datatype()),
335 &mut bytes,
336 )
337 .unwrap();
338 bytes
339 }
340
341 pub(crate) fn encoded_int64(s: impl Into<i64>) -> Vec<u8> {
342 let mut bytes = vec![];
343 IndexValueCodec::encode_nonnull_value(
344 Value::from(s.into()).as_value_ref(),
345 &SortField::new(ConcreteDataType::int64_datatype()),
346 &mut bytes,
347 )
348 .unwrap();
349 bytes
350 }
351
352 #[test]
353 fn test_collect_and_basic() {
354 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_collect_and_basic_");
355
356 let metadata = test_region_metadata();
357 let mut builder = InvertedIndexApplierBuilder::new(
358 "test".to_string(),
359 PathType::Bare,
360 test_object_store(),
361 &metadata,
362 HashSet::from_iter([1, 2, 3]),
363 factory,
364 );
365
366 let expr = Expr::BinaryExpr(BinaryExpr {
367 left: Box::new(Expr::BinaryExpr(BinaryExpr {
368 left: Box::new(tag_column()),
369 op: Operator::RegexMatch,
370 right: Box::new(string_lit("bar")),
371 })),
372 op: Operator::And,
373 right: Box::new(Expr::Between(Between {
374 expr: Box::new(tag_column2()),
375 negated: false,
376 low: Box::new(int64_lit(123)),
377 high: Box::new(int64_lit(456)),
378 })),
379 });
380
381 builder.traverse_and_collect(&expr);
382 let predicates = builder.output.get(&1).unwrap();
383 assert_eq!(predicates.len(), 1);
384 assert_eq!(
385 predicates[0],
386 Predicate::RegexMatch(RegexMatchPredicate {
387 pattern: "bar".to_string()
388 })
389 );
390 let predicates = builder.output.get(&2).unwrap();
391 assert_eq!(predicates.len(), 1);
392 assert_eq!(
393 predicates[0],
394 Predicate::Range(RangePredicate {
395 range: Range {
396 lower: Some(Bound {
397 inclusive: true,
398 value: encoded_int64(123),
399 }),
400 upper: Some(Bound {
401 inclusive: true,
402 value: encoded_int64(456),
403 }),
404 }
405 })
406 );
407 }
408}