mito2/sst/index/inverted_index/
applier.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16
17use std::collections::BTreeMap;
18use std::sync::Arc;
19
20use common_base::range_read::RangeReader;
21use common_telemetry::warn;
22use index::inverted_index::format::reader::InvertedIndexBlobReader;
23use index::inverted_index::search::index_apply::{
24    ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
25};
26use index::inverted_index::search::predicate::Predicate;
27use object_store::ObjectStore;
28use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
29use puffin::puffin_manager::{PuffinManager, PuffinReader};
30use snafu::ResultExt;
31use store_api::region_request::PathType;
32use store_api::storage::ColumnId;
33
34use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
35use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
36use crate::cache::index::inverted_index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef};
37use crate::cache::index::result_cache::PredicateKey;
38use crate::error::{
39    ApplyInvertedIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result,
40};
41use crate::metrics::{INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE};
42use crate::sst::file::RegionFileId;
43use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
44use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
45use crate::sst::index::TYPE_INVERTED_INDEX;
46
47/// `InvertedIndexApplier` is responsible for applying predicates to the provided SST files
48/// and returning the relevant row group ids for further scan.
49pub(crate) struct InvertedIndexApplier {
50    /// The root directory of the table.
51    table_dir: String,
52
53    /// Path type for generating file paths.
54    path_type: PathType,
55
56    /// Store responsible for accessing remote index files.
57    store: ObjectStore,
58
59    /// The cache of index files.
60    file_cache: Option<FileCacheRef>,
61
62    /// Predefined index applier used to apply predicates to index files
63    /// and return the relevant row group ids for further scan.
64    index_applier: Box<dyn IndexApplier>,
65
66    /// The puffin manager factory.
67    puffin_manager_factory: PuffinManagerFactory,
68
69    /// In-memory cache for inverted index.
70    inverted_index_cache: Option<InvertedIndexCacheRef>,
71
72    /// Puffin metadata cache.
73    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
74
75    /// Predicate key. Used to identify the predicate and fetch result from cache.
76    predicate_key: PredicateKey,
77}
78
79pub(crate) type InvertedIndexApplierRef = Arc<InvertedIndexApplier>;
80
81impl InvertedIndexApplier {
82    /// Creates a new `InvertedIndexApplier`.
83    pub fn new(
84        table_dir: String,
85        path_type: PathType,
86        store: ObjectStore,
87        index_applier: Box<dyn IndexApplier>,
88        puffin_manager_factory: PuffinManagerFactory,
89        predicates: BTreeMap<ColumnId, Vec<Predicate>>,
90    ) -> Self {
91        INDEX_APPLY_MEMORY_USAGE.add(index_applier.memory_usage() as i64);
92
93        Self {
94            table_dir,
95            path_type,
96            store,
97            file_cache: None,
98            index_applier,
99            puffin_manager_factory,
100            inverted_index_cache: None,
101            puffin_metadata_cache: None,
102            predicate_key: PredicateKey::new_inverted(Arc::new(predicates)),
103        }
104    }
105
106    /// Sets the file cache.
107    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
108        self.file_cache = file_cache;
109        self
110    }
111
112    /// Sets the index cache.
113    pub fn with_index_cache(mut self, index_cache: Option<InvertedIndexCacheRef>) -> Self {
114        self.inverted_index_cache = index_cache;
115        self
116    }
117
118    /// Sets the puffin metadata cache.
119    pub fn with_puffin_metadata_cache(
120        mut self,
121        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
122    ) -> Self {
123        self.puffin_metadata_cache = puffin_metadata_cache;
124        self
125    }
126
127    /// Applies predicates to the provided SST file id and returns the relevant row group ids
128    pub async fn apply(
129        &self,
130        file_id: RegionFileId,
131        file_size_hint: Option<u64>,
132    ) -> Result<ApplyOutput> {
133        let _timer = INDEX_APPLY_ELAPSED
134            .with_label_values(&[TYPE_INVERTED_INDEX])
135            .start_timer();
136
137        let context = SearchContext {
138            // Encountering a non-existing column indicates that it doesn't match predicates.
139            index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
140        };
141
142        let blob = match self.cached_blob_reader(file_id, file_size_hint).await {
143            Ok(Some(puffin_reader)) => puffin_reader,
144            other => {
145                if let Err(err) = other {
146                    warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
147                }
148                self.remote_blob_reader(file_id, file_size_hint).await?
149            }
150        };
151
152        if let Some(index_cache) = &self.inverted_index_cache {
153            let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
154            let mut index_reader = CachedInvertedIndexBlobReader::new(
155                file_id.file_id(),
156                blob_size,
157                InvertedIndexBlobReader::new(blob),
158                index_cache.clone(),
159            );
160            self.index_applier
161                .apply(context, &mut index_reader)
162                .await
163                .context(ApplyInvertedIndexSnafu)
164        } else {
165            let mut index_reader = InvertedIndexBlobReader::new(blob);
166            self.index_applier
167                .apply(context, &mut index_reader)
168                .await
169                .context(ApplyInvertedIndexSnafu)
170        }
171    }
172
173    /// Creates a blob reader from the cached index file.
174    async fn cached_blob_reader(
175        &self,
176        file_id: RegionFileId,
177        file_size_hint: Option<u64>,
178    ) -> Result<Option<BlobReader>> {
179        let Some(file_cache) = &self.file_cache else {
180            return Ok(None);
181        };
182
183        let index_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin);
184        if file_cache.get(index_key).await.is_none() {
185            return Ok(None);
186        };
187
188        let puffin_manager = self.puffin_manager_factory.build(
189            file_cache.local_store(),
190            WriteCachePathProvider::new(file_cache.clone()),
191        );
192
193        // Adds file size hint to the puffin reader to avoid extra metadata read.
194        let reader = puffin_manager
195            .reader(&file_id)
196            .await
197            .context(PuffinBuildReaderSnafu)?
198            .with_file_size_hint(file_size_hint)
199            .blob(INDEX_BLOB_TYPE)
200            .await
201            .context(PuffinReadBlobSnafu)?
202            .reader()
203            .await
204            .context(PuffinBuildReaderSnafu)?;
205        Ok(Some(reader))
206    }
207
208    /// Creates a blob reader from the remote index file.
209    async fn remote_blob_reader(
210        &self,
211        file_id: RegionFileId,
212        file_size_hint: Option<u64>,
213    ) -> Result<BlobReader> {
214        let puffin_manager = self
215            .puffin_manager_factory
216            .build(
217                self.store.clone(),
218                RegionFilePathFactory::new(self.table_dir.clone(), self.path_type),
219            )
220            .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
221
222        puffin_manager
223            .reader(&file_id)
224            .await
225            .context(PuffinBuildReaderSnafu)?
226            .with_file_size_hint(file_size_hint)
227            .blob(INDEX_BLOB_TYPE)
228            .await
229            .context(PuffinReadBlobSnafu)?
230            .reader()
231            .await
232            .context(PuffinBuildReaderSnafu)
233    }
234
235    /// Returns the predicate key.
236    pub fn predicate_key(&self) -> &PredicateKey {
237        &self.predicate_key
238    }
239}
240
241impl Drop for InvertedIndexApplier {
242    fn drop(&mut self) {
243        INDEX_APPLY_MEMORY_USAGE.sub(self.index_applier.memory_usage() as i64);
244    }
245}
246
247#[cfg(test)]
248mod tests {
249    use futures::io::Cursor;
250    use index::bitmap::Bitmap;
251    use index::inverted_index::search::index_apply::MockIndexApplier;
252    use object_store::services::Memory;
253    use puffin::puffin_manager::PuffinWriter;
254
255    use super::*;
256    use crate::sst::file::FileId;
257
258    #[tokio::test]
259    async fn test_index_applier_apply_basic() {
260        let (_d, puffin_manager_factory) =
261            PuffinManagerFactory::new_for_test_async("test_index_applier_apply_basic_").await;
262        let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
263        let file_id = RegionFileId::new(0.into(), FileId::random());
264        let table_dir = "table_dir".to_string();
265
266        let puffin_manager = puffin_manager_factory.build(
267            object_store.clone(),
268            RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
269        );
270        let mut writer = puffin_manager.writer(&file_id).await.unwrap();
271        writer
272            .put_blob(
273                INDEX_BLOB_TYPE,
274                Cursor::new(vec![]),
275                Default::default(),
276                Default::default(),
277            )
278            .await
279            .unwrap();
280        writer.finish().await.unwrap();
281
282        let mut mock_index_applier = MockIndexApplier::new();
283        mock_index_applier.expect_memory_usage().returning(|| 100);
284        mock_index_applier.expect_apply().returning(|_, _| {
285            Ok(ApplyOutput {
286                matched_segment_ids: Bitmap::new_bitvec(),
287                total_row_count: 100,
288                segment_row_count: 10,
289            })
290        });
291
292        let sst_index_applier = InvertedIndexApplier::new(
293            table_dir.clone(),
294            PathType::Bare,
295            object_store,
296            Box::new(mock_index_applier),
297            puffin_manager_factory,
298            Default::default(),
299        );
300        let output = sst_index_applier.apply(file_id, None).await.unwrap();
301        assert_eq!(
302            output,
303            ApplyOutput {
304                matched_segment_ids: Bitmap::new_bitvec(),
305                total_row_count: 100,
306                segment_row_count: 10,
307            }
308        );
309    }
310
311    #[tokio::test]
312    async fn test_index_applier_apply_invalid_blob_type() {
313        let (_d, puffin_manager_factory) =
314            PuffinManagerFactory::new_for_test_async("test_index_applier_apply_invalid_blob_type_")
315                .await;
316        let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
317        let file_id = RegionFileId::new(0.into(), FileId::random());
318        let table_dir = "table_dir".to_string();
319
320        let puffin_manager = puffin_manager_factory.build(
321            object_store.clone(),
322            RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
323        );
324        let mut writer = puffin_manager.writer(&file_id).await.unwrap();
325        writer
326            .put_blob(
327                "invalid_blob_type",
328                Cursor::new(vec![]),
329                Default::default(),
330                Default::default(),
331            )
332            .await
333            .unwrap();
334        writer.finish().await.unwrap();
335
336        let mut mock_index_applier = MockIndexApplier::new();
337        mock_index_applier.expect_memory_usage().returning(|| 100);
338        mock_index_applier.expect_apply().never();
339
340        let sst_index_applier = InvertedIndexApplier::new(
341            table_dir.clone(),
342            PathType::Bare,
343            object_store,
344            Box::new(mock_index_applier),
345            puffin_manager_factory,
346            Default::default(),
347        );
348        let res = sst_index_applier.apply(file_id, None).await;
349        assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found"));
350    }
351}