mito2/sst/index/inverted_index/
applier.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16
17use std::collections::BTreeMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use common_base::range_read::RangeReader;
22use common_telemetry::warn;
23use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReadMetrics};
24use index::inverted_index::search::index_apply::{
25    ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
26};
27use index::inverted_index::search::predicate::Predicate;
28use object_store::ObjectStore;
29use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
30use puffin::puffin_manager::{PuffinManager, PuffinReader};
31use snafu::ResultExt;
32use store_api::region_request::PathType;
33use store_api::storage::ColumnId;
34
35use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
36use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
37use crate::cache::index::inverted_index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef};
38use crate::cache::index::result_cache::PredicateKey;
39use crate::error::{
40    ApplyInvertedIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result,
41};
42use crate::metrics::{INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE};
43use crate::sst::file::RegionIndexId;
44use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
45use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
46use crate::sst::index::{TYPE_INVERTED_INDEX, trigger_index_background_download};
47
48/// Metrics for tracking inverted index apply operations.
49#[derive(Default, Clone)]
50pub struct InvertedIndexApplyMetrics {
51    /// Total time spent applying the index.
52    pub apply_elapsed: std::time::Duration,
53    /// Number of blob cache misses (0 or 1).
54    pub blob_cache_miss: usize,
55    /// Total size of blobs read (in bytes).
56    pub blob_read_bytes: u64,
57    /// Metrics for inverted index reads.
58    pub inverted_index_read_metrics: InvertedIndexReadMetrics,
59}
60
61impl std::fmt::Debug for InvertedIndexApplyMetrics {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        let Self {
64            apply_elapsed,
65            blob_cache_miss,
66            blob_read_bytes,
67            inverted_index_read_metrics,
68        } = self;
69
70        if self.is_empty() {
71            return write!(f, "{{}}");
72        }
73        write!(f, "{{")?;
74
75        write!(f, "\"apply_elapsed\":\"{:?}\"", apply_elapsed)?;
76
77        if *blob_cache_miss > 0 {
78            write!(f, ", \"blob_cache_miss\":{}", blob_cache_miss)?;
79        }
80        if *blob_read_bytes > 0 {
81            write!(f, ", \"blob_read_bytes\":{}", blob_read_bytes)?;
82        }
83        write!(
84            f,
85            ", \"inverted_index_read_metrics\":{:?}",
86            inverted_index_read_metrics
87        )?;
88
89        write!(f, "}}")
90    }
91}
92
93impl InvertedIndexApplyMetrics {
94    /// Returns true if the metrics are empty (contain no meaningful data).
95    pub fn is_empty(&self) -> bool {
96        self.apply_elapsed.is_zero()
97    }
98
99    /// Merges another metrics into this one.
100    pub fn merge_from(&mut self, other: &Self) {
101        self.apply_elapsed += other.apply_elapsed;
102        self.blob_cache_miss += other.blob_cache_miss;
103        self.blob_read_bytes += other.blob_read_bytes;
104        self.inverted_index_read_metrics
105            .merge_from(&other.inverted_index_read_metrics);
106    }
107}
108
109/// `InvertedIndexApplier` is responsible for applying predicates to the provided SST files
110/// and returning the relevant row group ids for further scan.
111pub(crate) struct InvertedIndexApplier {
112    /// The root directory of the table.
113    table_dir: String,
114
115    /// Path type for generating file paths.
116    path_type: PathType,
117
118    /// Store responsible for accessing remote index files.
119    store: ObjectStore,
120
121    /// The cache of index files.
122    file_cache: Option<FileCacheRef>,
123
124    /// Predefined index applier used to apply predicates to index files
125    /// and return the relevant row group ids for further scan.
126    index_applier: Box<dyn IndexApplier>,
127
128    /// The puffin manager factory.
129    puffin_manager_factory: PuffinManagerFactory,
130
131    /// In-memory cache for inverted index.
132    inverted_index_cache: Option<InvertedIndexCacheRef>,
133
134    /// Puffin metadata cache.
135    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
136
137    /// Predicate key. Used to identify the predicate and fetch result from cache.
138    predicate_key: PredicateKey,
139}
140
141pub(crate) type InvertedIndexApplierRef = Arc<InvertedIndexApplier>;
142
143impl InvertedIndexApplier {
144    /// Creates a new `InvertedIndexApplier`.
145    pub fn new(
146        table_dir: String,
147        path_type: PathType,
148        store: ObjectStore,
149        index_applier: Box<dyn IndexApplier>,
150        puffin_manager_factory: PuffinManagerFactory,
151        predicates: BTreeMap<ColumnId, Vec<Predicate>>,
152    ) -> Self {
153        INDEX_APPLY_MEMORY_USAGE.add(index_applier.memory_usage() as i64);
154
155        Self {
156            table_dir,
157            path_type,
158            store,
159            file_cache: None,
160            index_applier,
161            puffin_manager_factory,
162            inverted_index_cache: None,
163            puffin_metadata_cache: None,
164            predicate_key: PredicateKey::new_inverted(Arc::new(predicates)),
165        }
166    }
167
168    /// Sets the file cache.
169    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
170        self.file_cache = file_cache;
171        self
172    }
173
174    /// Sets the index cache.
175    pub fn with_index_cache(mut self, index_cache: Option<InvertedIndexCacheRef>) -> Self {
176        self.inverted_index_cache = index_cache;
177        self
178    }
179
180    /// Sets the puffin metadata cache.
181    pub fn with_puffin_metadata_cache(
182        mut self,
183        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
184    ) -> Self {
185        self.puffin_metadata_cache = puffin_metadata_cache;
186        self
187    }
188
189    /// Applies predicates to the provided SST file id and returns the relevant row group ids.
190    ///
191    /// # Arguments
192    /// * `file_id` - The region file ID to apply predicates to
193    /// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
194    /// * `metrics` - Optional mutable reference to collect metrics on demand
195    #[tracing::instrument(
196        skip_all,
197        fields(file_id = %file_id)
198    )]
199    pub async fn apply(
200        &self,
201        file_id: RegionIndexId,
202        file_size_hint: Option<u64>,
203        mut metrics: Option<&mut InvertedIndexApplyMetrics>,
204    ) -> Result<ApplyOutput> {
205        let start = Instant::now();
206
207        let context = SearchContext {
208            // Encountering a non-existing column indicates that it doesn't match predicates.
209            index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
210        };
211
212        let mut cache_miss = 0;
213        let blob = match self.cached_blob_reader(file_id, file_size_hint).await {
214            Ok(Some(puffin_reader)) => puffin_reader,
215            other => {
216                cache_miss += 1;
217                if let Err(err) = other {
218                    warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
219                }
220                self.remote_blob_reader(file_id, file_size_hint).await?
221            }
222        };
223
224        let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
225
226        let result = if let Some(index_cache) = &self.inverted_index_cache {
227            let mut index_reader = CachedInvertedIndexBlobReader::new(
228                file_id.file_id(),
229                file_id.version,
230                blob_size,
231                InvertedIndexBlobReader::new(blob),
232                index_cache.clone(),
233            );
234            self.index_applier
235                .apply(
236                    context,
237                    &mut index_reader,
238                    metrics
239                        .as_deref_mut()
240                        .map(|m| &mut m.inverted_index_read_metrics),
241                )
242                .await
243                .context(ApplyInvertedIndexSnafu)
244        } else {
245            let mut index_reader = InvertedIndexBlobReader::new(blob);
246            self.index_applier
247                .apply(
248                    context,
249                    &mut index_reader,
250                    metrics
251                        .as_deref_mut()
252                        .map(|m| &mut m.inverted_index_read_metrics),
253                )
254                .await
255                .context(ApplyInvertedIndexSnafu)
256        };
257
258        // Record elapsed time to histogram and collect metrics if requested
259        let elapsed = start.elapsed();
260        INDEX_APPLY_ELAPSED
261            .with_label_values(&[TYPE_INVERTED_INDEX])
262            .observe(elapsed.as_secs_f64());
263
264        if let Some(metrics) = metrics {
265            metrics.apply_elapsed = elapsed;
266            metrics.blob_cache_miss = cache_miss;
267            metrics.blob_read_bytes = blob_size;
268        }
269
270        result
271    }
272
273    /// Creates a blob reader from the cached index file.
274    async fn cached_blob_reader(
275        &self,
276        file_id: RegionIndexId,
277        file_size_hint: Option<u64>,
278    ) -> Result<Option<BlobReader>> {
279        let Some(file_cache) = &self.file_cache else {
280            return Ok(None);
281        };
282
283        let index_key = IndexKey::new(
284            file_id.region_id(),
285            file_id.file_id(),
286            FileType::Puffin(file_id.version),
287        );
288        if file_cache.get(index_key).await.is_none() {
289            return Ok(None);
290        };
291
292        let puffin_manager = self.puffin_manager_factory.build(
293            file_cache.local_store(),
294            WriteCachePathProvider::new(file_cache.clone()),
295        );
296
297        // Adds file size hint to the puffin reader to avoid extra metadata read.
298        let reader = puffin_manager
299            .reader(&file_id)
300            .await
301            .context(PuffinBuildReaderSnafu)?
302            .with_file_size_hint(file_size_hint)
303            .blob(INDEX_BLOB_TYPE)
304            .await
305            .context(PuffinReadBlobSnafu)?
306            .reader()
307            .await
308            .context(PuffinBuildReaderSnafu)?;
309        Ok(Some(reader))
310    }
311
312    /// Creates a blob reader from the remote index file.
313    async fn remote_blob_reader(
314        &self,
315        file_id: RegionIndexId,
316        file_size_hint: Option<u64>,
317    ) -> Result<BlobReader> {
318        let path_factory = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
319
320        // Trigger background download if file cache and file size are available
321        trigger_index_background_download(
322            self.file_cache.as_ref(),
323            &file_id,
324            file_size_hint,
325            &path_factory,
326            &self.store,
327        );
328
329        let puffin_manager = self
330            .puffin_manager_factory
331            .build(self.store.clone(), path_factory)
332            .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
333
334        puffin_manager
335            .reader(&file_id)
336            .await
337            .context(PuffinBuildReaderSnafu)?
338            .with_file_size_hint(file_size_hint)
339            .blob(INDEX_BLOB_TYPE)
340            .await
341            .context(PuffinReadBlobSnafu)?
342            .reader()
343            .await
344            .context(PuffinBuildReaderSnafu)
345    }
346
347    /// Returns the predicate key.
348    pub fn predicate_key(&self) -> &PredicateKey {
349        &self.predicate_key
350    }
351}
352
353impl Drop for InvertedIndexApplier {
354    fn drop(&mut self) {
355        INDEX_APPLY_MEMORY_USAGE.sub(self.index_applier.memory_usage() as i64);
356    }
357}
358
359#[cfg(test)]
360mod tests {
361    use futures::io::Cursor;
362    use index::bitmap::Bitmap;
363    use index::inverted_index::search::index_apply::MockIndexApplier;
364    use object_store::services::Memory;
365    use puffin::puffin_manager::PuffinWriter;
366    use store_api::storage::FileId;
367
368    use super::*;
369    use crate::sst::index::RegionFileId;
370
371    #[tokio::test]
372    async fn test_index_applier_apply_basic() {
373        let (_d, puffin_manager_factory) =
374            PuffinManagerFactory::new_for_test_async("test_index_applier_apply_basic_").await;
375        let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
376        let file_id = RegionFileId::new(0.into(), FileId::random());
377        let index_id = RegionIndexId::new(file_id, 0);
378        let table_dir = "table_dir".to_string();
379
380        let puffin_manager = puffin_manager_factory.build(
381            object_store.clone(),
382            RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
383        );
384        let mut writer = puffin_manager.writer(&index_id).await.unwrap();
385        writer
386            .put_blob(
387                INDEX_BLOB_TYPE,
388                Cursor::new(vec![]),
389                Default::default(),
390                Default::default(),
391            )
392            .await
393            .unwrap();
394        writer.finish().await.unwrap();
395
396        let mut mock_index_applier = MockIndexApplier::new();
397        mock_index_applier.expect_memory_usage().returning(|| 100);
398        mock_index_applier.expect_apply().returning(|_, _, _| {
399            Ok(ApplyOutput {
400                matched_segment_ids: Bitmap::new_bitvec(),
401                total_row_count: 100,
402                segment_row_count: 10,
403            })
404        });
405
406        let sst_index_applier = InvertedIndexApplier::new(
407            table_dir.clone(),
408            PathType::Bare,
409            object_store,
410            Box::new(mock_index_applier),
411            puffin_manager_factory,
412            Default::default(),
413        );
414        let output = sst_index_applier.apply(index_id, None, None).await.unwrap();
415        assert_eq!(
416            output,
417            ApplyOutput {
418                matched_segment_ids: Bitmap::new_bitvec(),
419                total_row_count: 100,
420                segment_row_count: 10,
421            }
422        );
423    }
424
425    #[tokio::test]
426    async fn test_index_applier_apply_invalid_blob_type() {
427        let (_d, puffin_manager_factory) =
428            PuffinManagerFactory::new_for_test_async("test_index_applier_apply_invalid_blob_type_")
429                .await;
430        let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
431        let file_id = RegionFileId::new(0.into(), FileId::random());
432        let index_id = RegionIndexId::new(file_id, 0);
433        let table_dir = "table_dir".to_string();
434
435        let puffin_manager = puffin_manager_factory.build(
436            object_store.clone(),
437            RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
438        );
439        let mut writer = puffin_manager.writer(&index_id).await.unwrap();
440        writer
441            .put_blob(
442                "invalid_blob_type",
443                Cursor::new(vec![]),
444                Default::default(),
445                Default::default(),
446            )
447            .await
448            .unwrap();
449        writer.finish().await.unwrap();
450
451        let mut mock_index_applier = MockIndexApplier::new();
452        mock_index_applier.expect_memory_usage().returning(|| 100);
453        mock_index_applier.expect_apply().never();
454
455        let sst_index_applier = InvertedIndexApplier::new(
456            table_dir.clone(),
457            PathType::Bare,
458            object_store,
459            Box::new(mock_index_applier),
460            puffin_manager_factory,
461            Default::default(),
462        );
463        let res = sst_index_applier.apply(index_id, None, None).await;
464        assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found"));
465    }
466}