mito2/sst/index/inverted_index/
applier.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16
17use std::collections::BTreeMap;
18use std::sync::Arc;
19
20use common_base::range_read::RangeReader;
21use common_telemetry::warn;
22use index::inverted_index::format::reader::InvertedIndexBlobReader;
23use index::inverted_index::search::index_apply::{
24    ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
25};
26use index::inverted_index::search::predicate::Predicate;
27use object_store::ObjectStore;
28use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
29use puffin::puffin_manager::{PuffinManager, PuffinReader};
30use snafu::ResultExt;
31use store_api::storage::{ColumnId, RegionId};
32
33use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
34use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
35use crate::cache::index::inverted_index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef};
36use crate::cache::index::result_cache::PredicateKey;
37use crate::error::{
38    ApplyInvertedIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result,
39};
40use crate::metrics::{INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE};
41use crate::sst::file::FileId;
42use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
43use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
44use crate::sst::index::TYPE_INVERTED_INDEX;
45
46/// `InvertedIndexApplier` is responsible for applying predicates to the provided SST files
47/// and returning the relevant row group ids for further scan.
48pub(crate) struct InvertedIndexApplier {
49    /// The root directory of the region.
50    region_dir: String,
51
52    /// Region ID.
53    region_id: RegionId,
54
55    /// Store responsible for accessing remote index files.
56    store: ObjectStore,
57
58    /// The cache of index files.
59    file_cache: Option<FileCacheRef>,
60
61    /// Predefined index applier used to apply predicates to index files
62    /// and return the relevant row group ids for further scan.
63    index_applier: Box<dyn IndexApplier>,
64
65    /// The puffin manager factory.
66    puffin_manager_factory: PuffinManagerFactory,
67
68    /// In-memory cache for inverted index.
69    inverted_index_cache: Option<InvertedIndexCacheRef>,
70
71    /// Puffin metadata cache.
72    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
73
74    /// Predicate key. Used to identify the predicate and fetch result from cache.
75    predicate_key: PredicateKey,
76}
77
78pub(crate) type InvertedIndexApplierRef = Arc<InvertedIndexApplier>;
79
80impl InvertedIndexApplier {
81    /// Creates a new `InvertedIndexApplier`.
82    pub fn new(
83        region_dir: String,
84        region_id: RegionId,
85        store: ObjectStore,
86        index_applier: Box<dyn IndexApplier>,
87        puffin_manager_factory: PuffinManagerFactory,
88        predicates: BTreeMap<ColumnId, Vec<Predicate>>,
89    ) -> Self {
90        INDEX_APPLY_MEMORY_USAGE.add(index_applier.memory_usage() as i64);
91
92        Self {
93            region_dir,
94            region_id,
95            store,
96            file_cache: None,
97            index_applier,
98            puffin_manager_factory,
99            inverted_index_cache: None,
100            puffin_metadata_cache: None,
101            predicate_key: PredicateKey::new_inverted(Arc::new(predicates)),
102        }
103    }
104
105    /// Sets the file cache.
106    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
107        self.file_cache = file_cache;
108        self
109    }
110
111    /// Sets the index cache.
112    pub fn with_index_cache(mut self, index_cache: Option<InvertedIndexCacheRef>) -> Self {
113        self.inverted_index_cache = index_cache;
114        self
115    }
116
117    /// Sets the puffin metadata cache.
118    pub fn with_puffin_metadata_cache(
119        mut self,
120        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
121    ) -> Self {
122        self.puffin_metadata_cache = puffin_metadata_cache;
123        self
124    }
125
126    /// Applies predicates to the provided SST file id and returns the relevant row group ids
127    pub async fn apply(&self, file_id: FileId, file_size_hint: Option<u64>) -> Result<ApplyOutput> {
128        let _timer = INDEX_APPLY_ELAPSED
129            .with_label_values(&[TYPE_INVERTED_INDEX])
130            .start_timer();
131
132        let context = SearchContext {
133            // Encountering a non-existing column indicates that it doesn't match predicates.
134            index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
135        };
136
137        let blob = match self.cached_blob_reader(file_id, file_size_hint).await {
138            Ok(Some(puffin_reader)) => puffin_reader,
139            other => {
140                if let Err(err) = other {
141                    warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
142                }
143                self.remote_blob_reader(file_id, file_size_hint).await?
144            }
145        };
146
147        if let Some(index_cache) = &self.inverted_index_cache {
148            let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
149            let mut index_reader = CachedInvertedIndexBlobReader::new(
150                file_id,
151                blob_size,
152                InvertedIndexBlobReader::new(blob),
153                index_cache.clone(),
154            );
155            self.index_applier
156                .apply(context, &mut index_reader)
157                .await
158                .context(ApplyInvertedIndexSnafu)
159        } else {
160            let mut index_reader = InvertedIndexBlobReader::new(blob);
161            self.index_applier
162                .apply(context, &mut index_reader)
163                .await
164                .context(ApplyInvertedIndexSnafu)
165        }
166    }
167
168    /// Creates a blob reader from the cached index file.
169    async fn cached_blob_reader(
170        &self,
171        file_id: FileId,
172        file_size_hint: Option<u64>,
173    ) -> Result<Option<BlobReader>> {
174        let Some(file_cache) = &self.file_cache else {
175            return Ok(None);
176        };
177
178        let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin);
179        if file_cache.get(index_key).await.is_none() {
180            return Ok(None);
181        };
182
183        let puffin_manager = self.puffin_manager_factory.build(
184            file_cache.local_store(),
185            WriteCachePathProvider::new(self.region_id, file_cache.clone()),
186        );
187
188        // Adds file size hint to the puffin reader to avoid extra metadata read.
189        let reader = puffin_manager
190            .reader(&file_id)
191            .await
192            .context(PuffinBuildReaderSnafu)?
193            .with_file_size_hint(file_size_hint)
194            .blob(INDEX_BLOB_TYPE)
195            .await
196            .context(PuffinReadBlobSnafu)?
197            .reader()
198            .await
199            .context(PuffinBuildReaderSnafu)?;
200        Ok(Some(reader))
201    }
202
203    /// Creates a blob reader from the remote index file.
204    async fn remote_blob_reader(
205        &self,
206        file_id: FileId,
207        file_size_hint: Option<u64>,
208    ) -> Result<BlobReader> {
209        let puffin_manager = self
210            .puffin_manager_factory
211            .build(
212                self.store.clone(),
213                RegionFilePathFactory::new(self.region_dir.clone()),
214            )
215            .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
216
217        puffin_manager
218            .reader(&file_id)
219            .await
220            .context(PuffinBuildReaderSnafu)?
221            .with_file_size_hint(file_size_hint)
222            .blob(INDEX_BLOB_TYPE)
223            .await
224            .context(PuffinReadBlobSnafu)?
225            .reader()
226            .await
227            .context(PuffinBuildReaderSnafu)
228    }
229
230    /// Returns the predicate key.
231    pub fn predicate_key(&self) -> &PredicateKey {
232        &self.predicate_key
233    }
234}
235
236impl Drop for InvertedIndexApplier {
237    fn drop(&mut self) {
238        INDEX_APPLY_MEMORY_USAGE.sub(self.index_applier.memory_usage() as i64);
239    }
240}
241
242#[cfg(test)]
243mod tests {
244    use futures::io::Cursor;
245    use index::bitmap::Bitmap;
246    use index::inverted_index::search::index_apply::MockIndexApplier;
247    use object_store::services::Memory;
248    use puffin::puffin_manager::PuffinWriter;
249
250    use super::*;
251
252    #[tokio::test]
253    async fn test_index_applier_apply_basic() {
254        let (_d, puffin_manager_factory) =
255            PuffinManagerFactory::new_for_test_async("test_index_applier_apply_basic_").await;
256        let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
257        let file_id = FileId::random();
258        let region_dir = "region_dir".to_string();
259
260        let puffin_manager = puffin_manager_factory.build(
261            object_store.clone(),
262            RegionFilePathFactory::new(region_dir.clone()),
263        );
264        let mut writer = puffin_manager.writer(&file_id).await.unwrap();
265        writer
266            .put_blob(
267                INDEX_BLOB_TYPE,
268                Cursor::new(vec![]),
269                Default::default(),
270                Default::default(),
271            )
272            .await
273            .unwrap();
274        writer.finish().await.unwrap();
275
276        let mut mock_index_applier = MockIndexApplier::new();
277        mock_index_applier.expect_memory_usage().returning(|| 100);
278        mock_index_applier.expect_apply().returning(|_, _| {
279            Ok(ApplyOutput {
280                matched_segment_ids: Bitmap::new_bitvec(),
281                total_row_count: 100,
282                segment_row_count: 10,
283            })
284        });
285
286        let sst_index_applier = InvertedIndexApplier::new(
287            region_dir.clone(),
288            RegionId::new(0, 0),
289            object_store,
290            Box::new(mock_index_applier),
291            puffin_manager_factory,
292            Default::default(),
293        );
294        let output = sst_index_applier.apply(file_id, None).await.unwrap();
295        assert_eq!(
296            output,
297            ApplyOutput {
298                matched_segment_ids: Bitmap::new_bitvec(),
299                total_row_count: 100,
300                segment_row_count: 10,
301            }
302        );
303    }
304
305    #[tokio::test]
306    async fn test_index_applier_apply_invalid_blob_type() {
307        let (_d, puffin_manager_factory) =
308            PuffinManagerFactory::new_for_test_async("test_index_applier_apply_invalid_blob_type_")
309                .await;
310        let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
311        let file_id = FileId::random();
312        let region_dir = "region_dir".to_string();
313
314        let puffin_manager = puffin_manager_factory.build(
315            object_store.clone(),
316            RegionFilePathFactory::new(region_dir.clone()),
317        );
318        let mut writer = puffin_manager.writer(&file_id).await.unwrap();
319        writer
320            .put_blob(
321                "invalid_blob_type",
322                Cursor::new(vec![]),
323                Default::default(),
324                Default::default(),
325            )
326            .await
327            .unwrap();
328        writer.finish().await.unwrap();
329
330        let mut mock_index_applier = MockIndexApplier::new();
331        mock_index_applier.expect_memory_usage().returning(|| 100);
332        mock_index_applier.expect_apply().never();
333
334        let sst_index_applier = InvertedIndexApplier::new(
335            region_dir.clone(),
336            RegionId::new(0, 0),
337            object_store,
338            Box::new(mock_index_applier),
339            puffin_manager_factory,
340            Default::default(),
341        );
342        let res = sst_index_applier.apply(file_id, None).await;
343        assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found"));
344    }
345}