Skip to main content

mito2/sst/index/inverted_index/
applier.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16
17use std::collections::BTreeMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use common_base::range_read::RangeReader;
22use common_telemetry::warn;
23use datatypes::data_type::ConcreteDataType;
24use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReadMetrics};
25use index::inverted_index::search::index_apply::{
26    ApplyOutput, IndexApplier, IndexNotFoundStrategy, PredicatesIndexApplier, SearchContext,
27};
28use index::inverted_index::search::predicate::Predicate;
29use index::target::IndexTarget;
30use object_store::ObjectStore;
31use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
32use puffin::puffin_manager::{PuffinManager, PuffinReader};
33use snafu::ResultExt;
34use store_api::metadata::RegionMetadataRef;
35use store_api::region_request::PathType;
36use store_api::storage::ColumnId;
37
38use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
39use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
40use crate::cache::index::inverted_index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef};
41use crate::cache::index::result_cache::PredicateKey;
42use crate::error::{
43    ApplyInvertedIndexSnafu, BuildIndexApplierSnafu, MetadataSnafu, PuffinBuildReaderSnafu,
44    PuffinReadBlobSnafu, Result,
45};
46use crate::metrics::{INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE};
47use crate::sst::file::RegionIndexId;
48use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
49use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
50use crate::sst::index::{TYPE_INVERTED_INDEX, trigger_index_background_download};
51
52/// Metrics for tracking inverted index apply operations.
53#[derive(Default, Clone)]
54pub struct InvertedIndexApplyMetrics {
55    /// Total time spent applying the index.
56    pub apply_elapsed: std::time::Duration,
57    /// Number of blob cache misses (0 or 1).
58    pub blob_cache_miss: usize,
59    /// Total size of blobs read (in bytes).
60    pub blob_read_bytes: u64,
61    /// Metrics for inverted index reads.
62    pub inverted_index_read_metrics: InvertedIndexReadMetrics,
63}
64
65impl std::fmt::Debug for InvertedIndexApplyMetrics {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        let Self {
68            apply_elapsed,
69            blob_cache_miss,
70            blob_read_bytes,
71            inverted_index_read_metrics,
72        } = self;
73
74        if self.is_empty() {
75            return write!(f, "{{}}");
76        }
77        write!(f, "{{")?;
78
79        write!(f, "\"apply_elapsed\":\"{:?}\"", apply_elapsed)?;
80
81        if *blob_cache_miss > 0 {
82            write!(f, ", \"blob_cache_miss\":{}", blob_cache_miss)?;
83        }
84        if *blob_read_bytes > 0 {
85            write!(f, ", \"blob_read_bytes\":{}", blob_read_bytes)?;
86        }
87        write!(
88            f,
89            ", \"inverted_index_read_metrics\":{:?}",
90            inverted_index_read_metrics
91        )?;
92
93        write!(f, "}}")
94    }
95}
96
97impl InvertedIndexApplyMetrics {
98    /// Returns true if the metrics are empty (contain no meaningful data).
99    pub fn is_empty(&self) -> bool {
100        self.apply_elapsed.is_zero()
101    }
102
103    /// Merges another metrics into this one.
104    pub fn merge_from(&mut self, other: &Self) {
105        self.apply_elapsed += other.apply_elapsed;
106        self.blob_cache_miss += other.blob_cache_miss;
107        self.blob_read_bytes += other.blob_read_bytes;
108        self.inverted_index_read_metrics
109            .merge_from(&other.inverted_index_read_metrics);
110    }
111}
112
113/// `InvertedIndexApplier` is responsible for applying predicates to the provided SST files
114/// and returning the relevant row group ids for further scan.
115pub(crate) struct InvertedIndexApplier {
116    /// The root directory of the table.
117    table_dir: String,
118
119    /// Path type for generating file paths.
120    path_type: PathType,
121
122    /// Store responsible for accessing remote index files.
123    store: ObjectStore,
124
125    /// The cache of index files.
126    file_cache: Option<FileCacheRef>,
127
128    /// The puffin manager factory.
129    puffin_manager_factory: PuffinManagerFactory,
130
131    /// In-memory cache for inverted index.
132    inverted_index_cache: Option<InvertedIndexCacheRef>,
133
134    /// Puffin metadata cache.
135    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
136
137    /// All collected predicates.
138    predicates: BTreeMap<ColumnId, Vec<Predicate>>,
139
140    /// Default apply plan built from all collected predicates.
141    default_plan: SstApplyPlan,
142
143    /// Expected predicate column types from the latest region metadata.
144    expected_predicate_col_types: BTreeMap<ColumnId, ConcreteDataType>,
145}
146
147pub(crate) type InvertedIndexApplierRef = Arc<InvertedIndexApplier>;
148
149#[derive(Clone)]
150pub(crate) struct SstApplyPlan {
151    pub predicate_key: PredicateKey,
152    pub index_applier: Arc<PredicatesIndexApplier>,
153}
154
155impl InvertedIndexApplier {
156    /// Creates a new `InvertedIndexApplier`.
157    pub fn new(
158        table_dir: String,
159        path_type: PathType,
160        store: ObjectStore,
161        puffin_manager_factory: PuffinManagerFactory,
162        predicates: BTreeMap<ColumnId, Vec<Predicate>>,
163        expected_predicate_col_types: BTreeMap<ColumnId, ConcreteDataType>,
164    ) -> Result<Self> {
165        let default_plan = Self::build_apply_plan(&predicates)?;
166        INDEX_APPLY_MEMORY_USAGE.add(default_plan.index_applier.memory_usage() as i64);
167
168        Ok(Self {
169            table_dir,
170            path_type,
171            store,
172            file_cache: None,
173            puffin_manager_factory,
174            inverted_index_cache: None,
175            puffin_metadata_cache: None,
176            predicates,
177            default_plan,
178            expected_predicate_col_types,
179        })
180    }
181
182    /// Sets the file cache.
183    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
184        self.file_cache = file_cache;
185        self
186    }
187
188    /// Sets the index cache.
189    pub fn with_index_cache(mut self, index_cache: Option<InvertedIndexCacheRef>) -> Self {
190        self.inverted_index_cache = index_cache;
191        self
192    }
193
194    /// Sets the puffin metadata cache.
195    pub fn with_puffin_metadata_cache(
196        mut self,
197        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
198    ) -> Self {
199        self.puffin_metadata_cache = puffin_metadata_cache;
200        self
201    }
202
203    /// Applies predicates to one SST file with the provided index applier.
204    ///
205    /// # Arguments
206    /// * `file_id` - The region file ID to apply predicates to
207    /// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
208    /// * `index_applier` - Inverted index applier produced by `plan_for_sst`.
209    /// * `metrics` - Optional mutable reference to collect metrics on demand
210    #[tracing::instrument(
211        skip_all,
212        fields(file_id = %file_id)
213    )]
214    pub async fn apply(
215        &self,
216        file_id: RegionIndexId,
217        file_size_hint: Option<u64>,
218        index_applier: &PredicatesIndexApplier,
219        mut metrics: Option<&mut InvertedIndexApplyMetrics>,
220    ) -> Result<ApplyOutput> {
221        let start = Instant::now();
222
223        let context = SearchContext {
224            // Encountering a non-existing column indicates that it doesn't match predicates.
225            index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
226        };
227
228        let mut cache_miss = 0;
229        let blob = match self.cached_blob_reader(file_id, file_size_hint).await {
230            Ok(Some(puffin_reader)) => puffin_reader,
231            other => {
232                cache_miss += 1;
233                if let Err(err) = other {
234                    warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
235                }
236                self.remote_blob_reader(file_id, file_size_hint).await?
237            }
238        };
239
240        let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
241
242        let result = if let Some(index_cache) = &self.inverted_index_cache {
243            let mut index_reader = CachedInvertedIndexBlobReader::new(
244                file_id.file_id(),
245                file_id.version,
246                blob_size,
247                InvertedIndexBlobReader::new(blob),
248                index_cache.clone(),
249            );
250            index_applier
251                .apply(
252                    context,
253                    &mut index_reader,
254                    metrics
255                        .as_deref_mut()
256                        .map(|m| &mut m.inverted_index_read_metrics),
257                )
258                .await
259                .context(ApplyInvertedIndexSnafu)
260        } else {
261            let mut index_reader = InvertedIndexBlobReader::new(blob);
262            index_applier
263                .apply(
264                    context,
265                    &mut index_reader,
266                    metrics
267                        .as_deref_mut()
268                        .map(|m| &mut m.inverted_index_read_metrics),
269                )
270                .await
271                .context(ApplyInvertedIndexSnafu)
272        };
273
274        // Record elapsed time to histogram and collect metrics if requested
275        let elapsed = start.elapsed();
276        INDEX_APPLY_ELAPSED
277            .with_label_values(&[TYPE_INVERTED_INDEX])
278            .observe(elapsed.as_secs_f64());
279
280        if let Some(metrics) = metrics {
281            metrics.apply_elapsed = elapsed;
282            metrics.blob_cache_miss = cache_miss;
283            metrics.blob_read_bytes = blob_size;
284        }
285
286        result
287    }
288
289    /// Creates a blob reader from the cached index file.
290    async fn cached_blob_reader(
291        &self,
292        file_id: RegionIndexId,
293        file_size_hint: Option<u64>,
294    ) -> Result<Option<BlobReader>> {
295        let Some(file_cache) = &self.file_cache else {
296            return Ok(None);
297        };
298
299        let index_key = IndexKey::new(
300            file_id.region_id(),
301            file_id.file_id(),
302            FileType::Puffin(file_id.version),
303        );
304        if file_cache.get(index_key).await.is_none() {
305            return Ok(None);
306        };
307
308        let puffin_manager = self.puffin_manager_factory.build(
309            file_cache.local_store(),
310            WriteCachePathProvider::new(file_cache.clone()),
311        );
312
313        // Adds file size hint to the puffin reader to avoid extra metadata read.
314        let reader = puffin_manager
315            .reader(&file_id)
316            .await
317            .context(PuffinBuildReaderSnafu)?
318            .with_file_size_hint(file_size_hint)
319            .blob(INDEX_BLOB_TYPE)
320            .await
321            .context(PuffinReadBlobSnafu)?
322            .reader()
323            .await
324            .context(PuffinBuildReaderSnafu)?;
325        Ok(Some(reader))
326    }
327
328    /// Creates a blob reader from the remote index file.
329    async fn remote_blob_reader(
330        &self,
331        file_id: RegionIndexId,
332        file_size_hint: Option<u64>,
333    ) -> Result<BlobReader> {
334        let path_factory = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
335
336        // Trigger background download if file cache and file size are available
337        trigger_index_background_download(
338            self.file_cache.as_ref(),
339            &file_id,
340            file_size_hint,
341            &path_factory,
342            &self.store,
343        );
344
345        let puffin_manager = self
346            .puffin_manager_factory
347            .build(self.store.clone(), path_factory)
348            .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
349
350        puffin_manager
351            .reader(&file_id)
352            .await
353            .context(PuffinBuildReaderSnafu)?
354            .with_file_size_hint(file_size_hint)
355            .blob(INDEX_BLOB_TYPE)
356            .await
357            .context(PuffinReadBlobSnafu)?
358            .reader()
359            .await
360            .context(PuffinBuildReaderSnafu)
361    }
362
363    /// Builds a per-SST apply plan.
364    ///
365    /// Returns `None` when no compatible predicate remains for this SST.
366    pub fn plan_for_sst(&self, sst_metadata: &RegionMetadataRef) -> Result<Option<SstApplyPlan>> {
367        let mut compatible_predicates = BTreeMap::new();
368        let mut has_type_mismatch = false;
369
370        for (col_id, expected) in &self.expected_predicate_col_types {
371            if let Some(sst_col) = sst_metadata.column_by_id(*col_id)
372                && sst_col.column_schema.data_type != *expected
373            {
374                has_type_mismatch = true;
375                continue;
376            }
377
378            if let Some(predicates) = self.predicates.get(col_id) {
379                compatible_predicates.insert(*col_id, predicates.clone());
380            }
381        }
382
383        if compatible_predicates.is_empty() {
384            return Ok(None);
385        }
386
387        if !has_type_mismatch {
388            return Ok(Some(self.default_plan.clone()));
389        }
390
391        let plan = Self::build_apply_plan(&compatible_predicates)?;
392        Ok(Some(plan))
393    }
394
395    fn build_apply_plan(
396        predicates_by_col: &BTreeMap<ColumnId, Vec<Predicate>>,
397    ) -> Result<SstApplyPlan> {
398        let predicates = predicates_by_col
399            .iter()
400            .map(|(col_id, preds)| (format!("{}", IndexTarget::ColumnId(*col_id)), preds.clone()))
401            .collect();
402
403        let index_applier =
404            PredicatesIndexApplier::try_from(predicates).context(BuildIndexApplierSnafu)?;
405
406        let predicate_key = PredicateKey::new_inverted(Arc::new(predicates_by_col.clone()));
407        Ok(SstApplyPlan {
408            predicate_key,
409            index_applier: Arc::new(index_applier),
410        })
411    }
412}
413
414impl Drop for InvertedIndexApplier {
415    fn drop(&mut self) {
416        INDEX_APPLY_MEMORY_USAGE.sub(self.default_plan.index_applier.memory_usage() as i64);
417    }
418}
419
420#[cfg(test)]
421mod tests {
422    use api::v1::SemanticType;
423    use datatypes::data_type::ConcreteDataType;
424    use datatypes::schema::ColumnSchema;
425    use futures::io::Cursor;
426    use index::inverted_index::search::predicate::RegexMatchPredicate;
427    use object_store::services::Memory;
428    use puffin::puffin_manager::PuffinWriter;
429    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
430    use store_api::storage::{FileId, RegionId};
431
432    use super::*;
433    use crate::sst::index::RegionFileId;
434
435    #[tokio::test]
436    async fn test_plan_for_sst() {
437        let (_d, puffin_manager_factory) =
438            PuffinManagerFactory::new_for_test_async("test_plan_for_sst_basic_").await;
439        let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
440        let table_dir = "table_dir".to_string();
441
442        let mut predicates = BTreeMap::new();
443        predicates.insert(
444            1,
445            vec![Predicate::RegexMatch(RegexMatchPredicate {
446                pattern: "foo".to_string(),
447            })],
448        );
449        let expected_predicate_col_types =
450            BTreeMap::from_iter([(1, ConcreteDataType::string_datatype())]);
451
452        let sst_index_applier = InvertedIndexApplier::new(
453            table_dir,
454            PathType::Bare,
455            object_store,
456            puffin_manager_factory,
457            predicates,
458            expected_predicate_col_types,
459        )
460        .unwrap();
461        let plan = sst_index_applier
462            .plan_for_sst(&mock_region_metadata())
463            .unwrap();
464        assert!(plan.is_some());
465    }
466
467    #[tokio::test]
468    async fn test_plan_for_sst_type_mismatch() {
469        let (_d, puffin_manager_factory) =
470            PuffinManagerFactory::new_for_test_async("test_plan_for_sst_type_mismatch_").await;
471        let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
472        let table_dir = "table_dir".to_string();
473
474        let mut predicates = BTreeMap::new();
475        predicates.insert(
476            1,
477            vec![Predicate::RegexMatch(RegexMatchPredicate {
478                pattern: "foo".to_string(),
479            })],
480        );
481        // Column id 1 is String in `mock_region_metadata`, set expected type to Int64.
482        let expected_predicate_col_types =
483            BTreeMap::from_iter([(1, ConcreteDataType::int64_datatype())]);
484
485        let sst_index_applier = InvertedIndexApplier::new(
486            table_dir,
487            PathType::Bare,
488            object_store,
489            puffin_manager_factory,
490            predicates,
491            expected_predicate_col_types,
492        )
493        .unwrap();
494        let plan = sst_index_applier
495            .plan_for_sst(&mock_region_metadata())
496            .unwrap();
497        assert!(plan.is_none());
498    }
499
500    #[tokio::test]
501    async fn test_index_applier_apply_invalid_blob_type() {
502        let (_d, puffin_manager_factory) =
503            PuffinManagerFactory::new_for_test_async("test_index_applier_apply_invalid_blob_type_")
504                .await;
505        let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
506        let file_id = RegionFileId::new(0.into(), FileId::random());
507        let index_id = RegionIndexId::new(file_id, 0);
508        let table_dir = "table_dir".to_string();
509
510        let puffin_manager = puffin_manager_factory.build(
511            object_store.clone(),
512            RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
513        );
514        let mut writer = puffin_manager.writer(&index_id).await.unwrap();
515        writer
516            .put_blob(
517                "invalid_blob_type",
518                Cursor::new(vec![]),
519                Default::default(),
520                Default::default(),
521            )
522            .await
523            .unwrap();
524        writer.finish().await.unwrap();
525
526        let mut predicates = BTreeMap::new();
527        predicates.insert(
528            1,
529            vec![Predicate::RegexMatch(RegexMatchPredicate {
530                pattern: "foo".to_string(),
531            })],
532        );
533        let expected_predicate_col_types =
534            BTreeMap::from_iter([(1, ConcreteDataType::string_datatype())]);
535        let sst_index_applier = InvertedIndexApplier::new(
536            table_dir.clone(),
537            PathType::Bare,
538            object_store,
539            puffin_manager_factory,
540            predicates,
541            expected_predicate_col_types,
542        )
543        .unwrap();
544        let plan = sst_index_applier
545            .plan_for_sst(&mock_region_metadata())
546            .unwrap()
547            .unwrap();
548        let res = sst_index_applier
549            .apply(index_id, None, &plan.index_applier, None)
550            .await;
551        assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found"));
552    }
553
554    fn mock_region_metadata() -> RegionMetadataRef {
555        let mut builder = RegionMetadataBuilder::new(RegionId::new(1024, 1));
556        builder
557            .push_column_metadata(ColumnMetadata {
558                column_schema: ColumnSchema::new("tag", ConcreteDataType::string_datatype(), false),
559                semantic_type: SemanticType::Tag,
560                column_id: 1,
561            })
562            .push_column_metadata(ColumnMetadata {
563                column_schema: ColumnSchema::new(
564                    "ts",
565                    ConcreteDataType::timestamp_millisecond_datatype(),
566                    false,
567                ),
568                semantic_type: SemanticType::Timestamp,
569                column_id: 2,
570            })
571            .primary_key(vec![1]);
572        Arc::new(builder.build().unwrap())
573    }
574}