mito2/sst/index/inverted_index/applier/
builder.rs1mod between;
16mod comparison;
17mod eq_list;
18mod in_list;
19mod regex_match;
20
21use std::collections::{BTreeMap, HashSet};
22
23use common_telemetry::warn;
24use datafusion_common::ScalarValue;
25use datafusion_expr::{BinaryExpr, Expr, Operator};
26use datatypes::data_type::ConcreteDataType;
27use datatypes::value::Value;
28use index::inverted_index::search::index_apply::PredicatesIndexApplier;
29use index::inverted_index::search::predicate::Predicate;
30use mito_codec::index::IndexValueCodec;
31use mito_codec::row_converter::SortField;
32use object_store::ObjectStore;
33use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
34use snafu::{OptionExt, ResultExt};
35use store_api::metadata::RegionMetadata;
36use store_api::storage::ColumnId;
37
38use crate::cache::file_cache::FileCacheRef;
39use crate::cache::index::inverted_index::InvertedIndexCacheRef;
40use crate::error::{
41 BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, EncodeSnafu, Result,
42};
43use crate::sst::index::inverted_index::applier::InvertedIndexApplier;
44use crate::sst::index::puffin_manager::PuffinManagerFactory;
45
46pub(crate) struct InvertedIndexApplierBuilder<'a> {
48 region_dir: String,
50
51 object_store: ObjectStore,
53
54 file_cache: Option<FileCacheRef>,
56
57 metadata: &'a RegionMetadata,
59
60 indexed_column_ids: HashSet<ColumnId>,
62
63 output: BTreeMap<ColumnId, Vec<Predicate>>,
65
66 puffin_manager_factory: PuffinManagerFactory,
68
69 inverted_index_cache: Option<InvertedIndexCacheRef>,
71
72 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
74}
75
76impl<'a> InvertedIndexApplierBuilder<'a> {
77 pub fn new(
79 region_dir: String,
80 object_store: ObjectStore,
81 metadata: &'a RegionMetadata,
82 indexed_column_ids: HashSet<ColumnId>,
83 puffin_manager_factory: PuffinManagerFactory,
84 ) -> Self {
85 Self {
86 region_dir,
87 object_store,
88 metadata,
89 indexed_column_ids,
90 output: BTreeMap::default(),
91 puffin_manager_factory,
92 file_cache: None,
93 inverted_index_cache: None,
94 puffin_metadata_cache: None,
95 }
96 }
97
98 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
100 self.file_cache = file_cache;
101 self
102 }
103
104 pub fn with_puffin_metadata_cache(
106 mut self,
107 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
108 ) -> Self {
109 self.puffin_metadata_cache = puffin_metadata_cache;
110 self
111 }
112
113 pub fn with_inverted_index_cache(
115 mut self,
116 inverted_index_cache: Option<InvertedIndexCacheRef>,
117 ) -> Self {
118 self.inverted_index_cache = inverted_index_cache;
119 self
120 }
121
122 pub fn build(mut self, exprs: &[Expr]) -> Result<Option<InvertedIndexApplier>> {
125 for expr in exprs {
126 self.traverse_and_collect(expr);
127 }
128
129 if self.output.is_empty() {
130 return Ok(None);
131 }
132
133 let predicates = self
134 .output
135 .iter()
136 .map(|(column_id, predicates)| (column_id.to_string(), predicates.clone()))
137 .collect();
138 let applier = PredicatesIndexApplier::try_from(predicates);
139
140 Ok(Some(
141 InvertedIndexApplier::new(
142 self.region_dir,
143 self.metadata.region_id,
144 self.object_store,
145 Box::new(applier.context(BuildIndexApplierSnafu)?),
146 self.puffin_manager_factory,
147 self.output,
148 )
149 .with_file_cache(self.file_cache)
150 .with_puffin_metadata_cache(self.puffin_metadata_cache)
151 .with_index_cache(self.inverted_index_cache),
152 ))
153 }
154
155 fn traverse_and_collect(&mut self, expr: &Expr) {
158 let res = match expr {
159 Expr::Between(between) => self.collect_between(between),
160
161 Expr::InList(in_list) => self.collect_inlist(in_list),
162 Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
163 Operator::And => {
164 self.traverse_and_collect(left);
165 self.traverse_and_collect(right);
166 Ok(())
167 }
168 Operator::Or => self.collect_or_eq_list(left, right),
169 Operator::Eq => self.collect_eq(left, right),
170 Operator::Lt | Operator::LtEq | Operator::Gt | Operator::GtEq => {
171 self.collect_comparison_expr(left, op, right)
172 }
173 Operator::RegexMatch => self.collect_regex_match(left, right),
174 _ => Ok(()),
175 },
176
177 _ => Ok(()),
179 };
180
181 if let Err(err) = res {
182 warn!(err; "Failed to collect predicates, ignore it. expr: {expr}");
183 }
184 }
185
186 fn add_predicate(&mut self, column_id: ColumnId, predicate: Predicate) {
188 self.output.entry(column_id).or_default().push(predicate);
189 }
190
191 fn column_id_and_type(
194 &self,
195 column_name: &str,
196 ) -> Result<Option<(ColumnId, ConcreteDataType)>> {
197 let column = self
198 .metadata
199 .column_by_name(column_name)
200 .context(ColumnNotFoundSnafu {
201 column: column_name,
202 })?;
203
204 if !self.indexed_column_ids.contains(&column.column_id) {
205 return Ok(None);
206 }
207
208 Ok(Some((
209 column.column_id,
210 column.column_schema.data_type.clone(),
211 )))
212 }
213
214 fn nonnull_lit(expr: &Expr) -> Option<&ScalarValue> {
216 match expr {
217 Expr::Literal(lit) if !lit.is_null() => Some(lit),
218 _ => None,
219 }
220 }
221
222 fn column_name(expr: &Expr) -> Option<&str> {
224 match expr {
225 Expr::Column(column) => Some(&column.name),
226 _ => None,
227 }
228 }
229
230 fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result<Vec<u8>> {
232 let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
233 let mut bytes = vec![];
234 let field = SortField::new(data_type);
235 IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)
236 .context(EncodeSnafu)?;
237 Ok(bytes)
238 }
239}
240
241#[cfg(test)]
242mod tests {
243 use api::v1::SemanticType;
244 use datafusion_common::Column;
245 use datafusion_expr::Between;
246 use datatypes::data_type::ConcreteDataType;
247 use datatypes::schema::ColumnSchema;
248 use index::inverted_index::search::predicate::{
249 Bound, Range, RangePredicate, RegexMatchPredicate,
250 };
251 use object_store::services::Memory;
252 use object_store::ObjectStore;
253 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
254 use store_api::storage::RegionId;
255
256 use super::*;
257
258 pub(crate) fn test_region_metadata() -> RegionMetadata {
259 let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678));
260 builder
261 .push_column_metadata(ColumnMetadata {
262 column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
263 semantic_type: SemanticType::Tag,
264 column_id: 1,
265 })
266 .push_column_metadata(ColumnMetadata {
267 column_schema: ColumnSchema::new("b", ConcreteDataType::int64_datatype(), false),
268 semantic_type: SemanticType::Tag,
269 column_id: 2,
270 })
271 .push_column_metadata(ColumnMetadata {
272 column_schema: ColumnSchema::new("c", ConcreteDataType::string_datatype(), false),
273 semantic_type: SemanticType::Field,
274 column_id: 3,
275 })
276 .push_column_metadata(ColumnMetadata {
277 column_schema: ColumnSchema::new(
278 "d",
279 ConcreteDataType::timestamp_millisecond_datatype(),
280 false,
281 ),
282 semantic_type: SemanticType::Timestamp,
283 column_id: 4,
284 })
285 .primary_key(vec![1, 2]);
286 builder.build().unwrap()
287 }
288
289 pub(crate) fn test_object_store() -> ObjectStore {
290 ObjectStore::new(Memory::default()).unwrap().finish()
291 }
292
293 pub(crate) fn tag_column() -> Expr {
294 Expr::Column(Column::from_name("a"))
295 }
296
297 pub(crate) fn tag_column2() -> Expr {
298 Expr::Column(Column::from_name("b"))
299 }
300
301 pub(crate) fn field_column() -> Expr {
302 Expr::Column(Column::from_name("c"))
303 }
304
305 pub(crate) fn nonexistent_column() -> Expr {
306 Expr::Column(Column::from_name("nonexistence"))
307 }
308
309 pub(crate) fn string_lit(s: impl Into<String>) -> Expr {
310 Expr::Literal(ScalarValue::Utf8(Some(s.into())))
311 }
312
313 pub(crate) fn int64_lit(i: impl Into<i64>) -> Expr {
314 Expr::Literal(ScalarValue::Int64(Some(i.into())))
315 }
316
317 pub(crate) fn encoded_string(s: impl Into<String>) -> Vec<u8> {
318 let mut bytes = vec![];
319 IndexValueCodec::encode_nonnull_value(
320 Value::from(s.into()).as_value_ref(),
321 &SortField::new(ConcreteDataType::string_datatype()),
322 &mut bytes,
323 )
324 .unwrap();
325 bytes
326 }
327
328 pub(crate) fn encoded_int64(s: impl Into<i64>) -> Vec<u8> {
329 let mut bytes = vec![];
330 IndexValueCodec::encode_nonnull_value(
331 Value::from(s.into()).as_value_ref(),
332 &SortField::new(ConcreteDataType::int64_datatype()),
333 &mut bytes,
334 )
335 .unwrap();
336 bytes
337 }
338
339 #[test]
340 fn test_collect_and_basic() {
341 let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_and_basic_");
342
343 let metadata = test_region_metadata();
344 let mut builder = InvertedIndexApplierBuilder::new(
345 "test".to_string(),
346 test_object_store(),
347 &metadata,
348 HashSet::from_iter([1, 2, 3]),
349 facotry,
350 );
351
352 let expr = Expr::BinaryExpr(BinaryExpr {
353 left: Box::new(Expr::BinaryExpr(BinaryExpr {
354 left: Box::new(tag_column()),
355 op: Operator::RegexMatch,
356 right: Box::new(string_lit("bar")),
357 })),
358 op: Operator::And,
359 right: Box::new(Expr::Between(Between {
360 expr: Box::new(tag_column2()),
361 negated: false,
362 low: Box::new(int64_lit(123)),
363 high: Box::new(int64_lit(456)),
364 })),
365 });
366
367 builder.traverse_and_collect(&expr);
368 let predicates = builder.output.get(&1).unwrap();
369 assert_eq!(predicates.len(), 1);
370 assert_eq!(
371 predicates[0],
372 Predicate::RegexMatch(RegexMatchPredicate {
373 pattern: "bar".to_string()
374 })
375 );
376 let predicates = builder.output.get(&2).unwrap();
377 assert_eq!(predicates.len(), 1);
378 assert_eq!(
379 predicates[0],
380 Predicate::Range(RangePredicate {
381 range: Range {
382 lower: Some(Bound {
383 inclusive: true,
384 value: encoded_int64(123),
385 }),
386 upper: Some(Bound {
387 inclusive: true,
388 value: encoded_int64(456),
389 }),
390 }
391 })
392 );
393 }
394}