mito2/sst/index/inverted_index/
applier.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16
17use std::collections::BTreeMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use common_base::range_read::RangeReader;
22use common_telemetry::warn;
23use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReadMetrics};
24use index::inverted_index::search::index_apply::{
25    ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
26};
27use index::inverted_index::search::predicate::Predicate;
28use object_store::ObjectStore;
29use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
30use puffin::puffin_manager::{PuffinManager, PuffinReader};
31use snafu::ResultExt;
32use store_api::region_request::PathType;
33use store_api::storage::ColumnId;
34
35use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
36use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
37use crate::cache::index::inverted_index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef};
38use crate::cache::index::result_cache::PredicateKey;
39use crate::error::{
40    ApplyInvertedIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result,
41};
42use crate::metrics::{INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE};
43use crate::sst::file::RegionFileId;
44use crate::sst::index::TYPE_INVERTED_INDEX;
45use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
46use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
47
48/// Metrics for tracking inverted index apply operations.
49#[derive(Default, Clone)]
50pub struct InvertedIndexApplyMetrics {
51    /// Total time spent applying the index.
52    pub apply_elapsed: std::time::Duration,
53    /// Number of blob cache misses (0 or 1).
54    pub blob_cache_miss: usize,
55    /// Total size of blobs read (in bytes).
56    pub blob_read_bytes: u64,
57    /// Metrics for inverted index reads.
58    pub inverted_index_read_metrics: InvertedIndexReadMetrics,
59}
60
61impl std::fmt::Debug for InvertedIndexApplyMetrics {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        let Self {
64            apply_elapsed,
65            blob_cache_miss,
66            blob_read_bytes,
67            inverted_index_read_metrics,
68        } = self;
69
70        if self.is_empty() {
71            return write!(f, "{{}}");
72        }
73        write!(f, "{{")?;
74
75        write!(f, "\"apply_elapsed\":\"{:?}\"", apply_elapsed)?;
76
77        if *blob_cache_miss > 0 {
78            write!(f, ", \"blob_cache_miss\":{}", blob_cache_miss)?;
79        }
80        if *blob_read_bytes > 0 {
81            write!(f, ", \"blob_read_bytes\":{}", blob_read_bytes)?;
82        }
83        write!(
84            f,
85            ", \"inverted_index_read_metrics\":{:?}",
86            inverted_index_read_metrics
87        )?;
88
89        write!(f, "}}")
90    }
91}
92
93impl InvertedIndexApplyMetrics {
94    /// Returns true if the metrics are empty (contain no meaningful data).
95    pub fn is_empty(&self) -> bool {
96        self.apply_elapsed.is_zero()
97    }
98
99    /// Merges another metrics into this one.
100    pub fn merge_from(&mut self, other: &Self) {
101        self.apply_elapsed += other.apply_elapsed;
102        self.blob_cache_miss += other.blob_cache_miss;
103        self.blob_read_bytes += other.blob_read_bytes;
104        self.inverted_index_read_metrics
105            .merge_from(&other.inverted_index_read_metrics);
106    }
107}
108
109/// `InvertedIndexApplier` is responsible for applying predicates to the provided SST files
110/// and returning the relevant row group ids for further scan.
111pub(crate) struct InvertedIndexApplier {
112    /// The root directory of the table.
113    table_dir: String,
114
115    /// Path type for generating file paths.
116    path_type: PathType,
117
118    /// Store responsible for accessing remote index files.
119    store: ObjectStore,
120
121    /// The cache of index files.
122    file_cache: Option<FileCacheRef>,
123
124    /// Predefined index applier used to apply predicates to index files
125    /// and return the relevant row group ids for further scan.
126    index_applier: Box<dyn IndexApplier>,
127
128    /// The puffin manager factory.
129    puffin_manager_factory: PuffinManagerFactory,
130
131    /// In-memory cache for inverted index.
132    inverted_index_cache: Option<InvertedIndexCacheRef>,
133
134    /// Puffin metadata cache.
135    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
136
137    /// Predicate key. Used to identify the predicate and fetch result from cache.
138    predicate_key: PredicateKey,
139}
140
141pub(crate) type InvertedIndexApplierRef = Arc<InvertedIndexApplier>;
142
143impl InvertedIndexApplier {
144    /// Creates a new `InvertedIndexApplier`.
145    pub fn new(
146        table_dir: String,
147        path_type: PathType,
148        store: ObjectStore,
149        index_applier: Box<dyn IndexApplier>,
150        puffin_manager_factory: PuffinManagerFactory,
151        predicates: BTreeMap<ColumnId, Vec<Predicate>>,
152    ) -> Self {
153        INDEX_APPLY_MEMORY_USAGE.add(index_applier.memory_usage() as i64);
154
155        Self {
156            table_dir,
157            path_type,
158            store,
159            file_cache: None,
160            index_applier,
161            puffin_manager_factory,
162            inverted_index_cache: None,
163            puffin_metadata_cache: None,
164            predicate_key: PredicateKey::new_inverted(Arc::new(predicates)),
165        }
166    }
167
168    /// Sets the file cache.
169    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
170        self.file_cache = file_cache;
171        self
172    }
173
174    /// Sets the index cache.
175    pub fn with_index_cache(mut self, index_cache: Option<InvertedIndexCacheRef>) -> Self {
176        self.inverted_index_cache = index_cache;
177        self
178    }
179
180    /// Sets the puffin metadata cache.
181    pub fn with_puffin_metadata_cache(
182        mut self,
183        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
184    ) -> Self {
185        self.puffin_metadata_cache = puffin_metadata_cache;
186        self
187    }
188
189    /// Applies predicates to the provided SST file id and returns the relevant row group ids.
190    ///
191    /// # Arguments
192    /// * `file_id` - The region file ID to apply predicates to
193    /// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
194    /// * `metrics` - Optional mutable reference to collect metrics on demand
195    pub async fn apply(
196        &self,
197        file_id: RegionFileId,
198        file_size_hint: Option<u64>,
199        mut metrics: Option<&mut InvertedIndexApplyMetrics>,
200    ) -> Result<ApplyOutput> {
201        let start = Instant::now();
202
203        let context = SearchContext {
204            // Encountering a non-existing column indicates that it doesn't match predicates.
205            index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
206        };
207
208        let mut cache_miss = 0;
209        let blob = match self.cached_blob_reader(file_id, file_size_hint).await {
210            Ok(Some(puffin_reader)) => puffin_reader,
211            other => {
212                cache_miss += 1;
213                if let Err(err) = other {
214                    warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
215                }
216                self.remote_blob_reader(file_id, file_size_hint).await?
217            }
218        };
219
220        let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
221
222        let result = if let Some(index_cache) = &self.inverted_index_cache {
223            let mut index_reader = CachedInvertedIndexBlobReader::new(
224                file_id.file_id(),
225                blob_size,
226                InvertedIndexBlobReader::new(blob),
227                index_cache.clone(),
228            );
229            self.index_applier
230                .apply(
231                    context,
232                    &mut index_reader,
233                    metrics
234                        .as_deref_mut()
235                        .map(|m| &mut m.inverted_index_read_metrics),
236                )
237                .await
238                .context(ApplyInvertedIndexSnafu)
239        } else {
240            let mut index_reader = InvertedIndexBlobReader::new(blob);
241            self.index_applier
242                .apply(
243                    context,
244                    &mut index_reader,
245                    metrics
246                        .as_deref_mut()
247                        .map(|m| &mut m.inverted_index_read_metrics),
248                )
249                .await
250                .context(ApplyInvertedIndexSnafu)
251        };
252
253        // Record elapsed time to histogram and collect metrics if requested
254        let elapsed = start.elapsed();
255        INDEX_APPLY_ELAPSED
256            .with_label_values(&[TYPE_INVERTED_INDEX])
257            .observe(elapsed.as_secs_f64());
258
259        if let Some(metrics) = metrics {
260            metrics.apply_elapsed = elapsed;
261            metrics.blob_cache_miss = cache_miss;
262            metrics.blob_read_bytes = blob_size;
263        }
264
265        result
266    }
267
268    /// Creates a blob reader from the cached index file.
269    async fn cached_blob_reader(
270        &self,
271        file_id: RegionFileId,
272        file_size_hint: Option<u64>,
273    ) -> Result<Option<BlobReader>> {
274        let Some(file_cache) = &self.file_cache else {
275            return Ok(None);
276        };
277
278        let index_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin);
279        if file_cache.get(index_key).await.is_none() {
280            return Ok(None);
281        };
282
283        let puffin_manager = self.puffin_manager_factory.build(
284            file_cache.local_store(),
285            WriteCachePathProvider::new(file_cache.clone()),
286        );
287
288        // Adds file size hint to the puffin reader to avoid extra metadata read.
289        let reader = puffin_manager
290            .reader(&file_id)
291            .await
292            .context(PuffinBuildReaderSnafu)?
293            .with_file_size_hint(file_size_hint)
294            .blob(INDEX_BLOB_TYPE)
295            .await
296            .context(PuffinReadBlobSnafu)?
297            .reader()
298            .await
299            .context(PuffinBuildReaderSnafu)?;
300        Ok(Some(reader))
301    }
302
303    /// Creates a blob reader from the remote index file.
304    async fn remote_blob_reader(
305        &self,
306        file_id: RegionFileId,
307        file_size_hint: Option<u64>,
308    ) -> Result<BlobReader> {
309        let puffin_manager = self
310            .puffin_manager_factory
311            .build(
312                self.store.clone(),
313                RegionFilePathFactory::new(self.table_dir.clone(), self.path_type),
314            )
315            .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
316
317        puffin_manager
318            .reader(&file_id)
319            .await
320            .context(PuffinBuildReaderSnafu)?
321            .with_file_size_hint(file_size_hint)
322            .blob(INDEX_BLOB_TYPE)
323            .await
324            .context(PuffinReadBlobSnafu)?
325            .reader()
326            .await
327            .context(PuffinBuildReaderSnafu)
328    }
329
330    /// Returns the predicate key.
331    pub fn predicate_key(&self) -> &PredicateKey {
332        &self.predicate_key
333    }
334}
335
336impl Drop for InvertedIndexApplier {
337    fn drop(&mut self) {
338        INDEX_APPLY_MEMORY_USAGE.sub(self.index_applier.memory_usage() as i64);
339    }
340}
341
342#[cfg(test)]
343mod tests {
344    use futures::io::Cursor;
345    use index::bitmap::Bitmap;
346    use index::inverted_index::search::index_apply::MockIndexApplier;
347    use object_store::services::Memory;
348    use puffin::puffin_manager::PuffinWriter;
349    use store_api::storage::FileId;
350
351    use super::*;
352
353    #[tokio::test]
354    async fn test_index_applier_apply_basic() {
355        let (_d, puffin_manager_factory) =
356            PuffinManagerFactory::new_for_test_async("test_index_applier_apply_basic_").await;
357        let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
358        let file_id = RegionFileId::new(0.into(), FileId::random());
359        let table_dir = "table_dir".to_string();
360
361        let puffin_manager = puffin_manager_factory.build(
362            object_store.clone(),
363            RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
364        );
365        let mut writer = puffin_manager.writer(&file_id).await.unwrap();
366        writer
367            .put_blob(
368                INDEX_BLOB_TYPE,
369                Cursor::new(vec![]),
370                Default::default(),
371                Default::default(),
372            )
373            .await
374            .unwrap();
375        writer.finish().await.unwrap();
376
377        let mut mock_index_applier = MockIndexApplier::new();
378        mock_index_applier.expect_memory_usage().returning(|| 100);
379        mock_index_applier.expect_apply().returning(|_, _, _| {
380            Ok(ApplyOutput {
381                matched_segment_ids: Bitmap::new_bitvec(),
382                total_row_count: 100,
383                segment_row_count: 10,
384            })
385        });
386
387        let sst_index_applier = InvertedIndexApplier::new(
388            table_dir.clone(),
389            PathType::Bare,
390            object_store,
391            Box::new(mock_index_applier),
392            puffin_manager_factory,
393            Default::default(),
394        );
395        let output = sst_index_applier.apply(file_id, None, None).await.unwrap();
396        assert_eq!(
397            output,
398            ApplyOutput {
399                matched_segment_ids: Bitmap::new_bitvec(),
400                total_row_count: 100,
401                segment_row_count: 10,
402            }
403        );
404    }
405
406    #[tokio::test]
407    async fn test_index_applier_apply_invalid_blob_type() {
408        let (_d, puffin_manager_factory) =
409            PuffinManagerFactory::new_for_test_async("test_index_applier_apply_invalid_blob_type_")
410                .await;
411        let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
412        let file_id = RegionFileId::new(0.into(), FileId::random());
413        let table_dir = "table_dir".to_string();
414
415        let puffin_manager = puffin_manager_factory.build(
416            object_store.clone(),
417            RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
418        );
419        let mut writer = puffin_manager.writer(&file_id).await.unwrap();
420        writer
421            .put_blob(
422                "invalid_blob_type",
423                Cursor::new(vec![]),
424                Default::default(),
425                Default::default(),
426            )
427            .await
428            .unwrap();
429        writer.finish().await.unwrap();
430
431        let mut mock_index_applier = MockIndexApplier::new();
432        mock_index_applier.expect_memory_usage().returning(|| 100);
433        mock_index_applier.expect_apply().never();
434
435        let sst_index_applier = InvertedIndexApplier::new(
436            table_dir.clone(),
437            PathType::Bare,
438            object_store,
439            Box::new(mock_index_applier),
440            puffin_manager_factory,
441            Default::default(),
442        );
443        let res = sst_index_applier.apply(file_id, None, None).await;
444        assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found"));
445    }
446}