mito2/sst/index/inverted_index/
applier.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16
17use std::sync::Arc;
18
19use common_base::range_read::RangeReader;
20use common_telemetry::warn;
21use index::inverted_index::format::reader::InvertedIndexBlobReader;
22use index::inverted_index::search::index_apply::{
23    ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
24};
25use object_store::ObjectStore;
26use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
27use puffin::puffin_manager::{PuffinManager, PuffinReader};
28use snafu::ResultExt;
29use store_api::storage::RegionId;
30
31use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
32use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
33use crate::cache::index::inverted_index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef};
34use crate::error::{
35    ApplyInvertedIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result,
36};
37use crate::metrics::{INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE};
38use crate::sst::file::FileId;
39use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
40use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
41use crate::sst::index::TYPE_INVERTED_INDEX;
42
43/// `InvertedIndexApplier` is responsible for applying predicates to the provided SST files
44/// and returning the relevant row group ids for further scan.
45pub(crate) struct InvertedIndexApplier {
46    /// The root directory of the region.
47    region_dir: String,
48
49    /// Region ID.
50    region_id: RegionId,
51
52    /// Store responsible for accessing remote index files.
53    store: ObjectStore,
54
55    /// The cache of index files.
56    file_cache: Option<FileCacheRef>,
57
58    /// Predefined index applier used to apply predicates to index files
59    /// and return the relevant row group ids for further scan.
60    index_applier: Box<dyn IndexApplier>,
61
62    /// The puffin manager factory.
63    puffin_manager_factory: PuffinManagerFactory,
64
65    /// In-memory cache for inverted index.
66    inverted_index_cache: Option<InvertedIndexCacheRef>,
67
68    /// Puffin metadata cache.
69    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
70}
71
72pub(crate) type InvertedIndexApplierRef = Arc<InvertedIndexApplier>;
73
74impl InvertedIndexApplier {
75    /// Creates a new `InvertedIndexApplier`.
76    pub fn new(
77        region_dir: String,
78        region_id: RegionId,
79        store: ObjectStore,
80        index_applier: Box<dyn IndexApplier>,
81        puffin_manager_factory: PuffinManagerFactory,
82    ) -> Self {
83        INDEX_APPLY_MEMORY_USAGE.add(index_applier.memory_usage() as i64);
84
85        Self {
86            region_dir,
87            region_id,
88            store,
89            file_cache: None,
90            index_applier,
91            puffin_manager_factory,
92            inverted_index_cache: None,
93            puffin_metadata_cache: None,
94        }
95    }
96
97    /// Sets the file cache.
98    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
99        self.file_cache = file_cache;
100        self
101    }
102
103    /// Sets the index cache.
104    pub fn with_index_cache(mut self, index_cache: Option<InvertedIndexCacheRef>) -> Self {
105        self.inverted_index_cache = index_cache;
106        self
107    }
108
109    /// Sets the puffin metadata cache.
110    pub fn with_puffin_metadata_cache(
111        mut self,
112        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
113    ) -> Self {
114        self.puffin_metadata_cache = puffin_metadata_cache;
115        self
116    }
117
118    /// Applies predicates to the provided SST file id and returns the relevant row group ids
119    pub async fn apply(&self, file_id: FileId, file_size_hint: Option<u64>) -> Result<ApplyOutput> {
120        let _timer = INDEX_APPLY_ELAPSED
121            .with_label_values(&[TYPE_INVERTED_INDEX])
122            .start_timer();
123
124        let context = SearchContext {
125            // Encountering a non-existing column indicates that it doesn't match predicates.
126            index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
127        };
128
129        let blob = match self.cached_blob_reader(file_id, file_size_hint).await {
130            Ok(Some(puffin_reader)) => puffin_reader,
131            other => {
132                if let Err(err) = other {
133                    warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
134                }
135                self.remote_blob_reader(file_id, file_size_hint).await?
136            }
137        };
138
139        if let Some(index_cache) = &self.inverted_index_cache {
140            let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
141            let mut index_reader = CachedInvertedIndexBlobReader::new(
142                file_id,
143                blob_size,
144                InvertedIndexBlobReader::new(blob),
145                index_cache.clone(),
146            );
147            self.index_applier
148                .apply(context, &mut index_reader)
149                .await
150                .context(ApplyInvertedIndexSnafu)
151        } else {
152            let mut index_reader = InvertedIndexBlobReader::new(blob);
153            self.index_applier
154                .apply(context, &mut index_reader)
155                .await
156                .context(ApplyInvertedIndexSnafu)
157        }
158    }
159
160    /// Creates a blob reader from the cached index file.
161    async fn cached_blob_reader(
162        &self,
163        file_id: FileId,
164        file_size_hint: Option<u64>,
165    ) -> Result<Option<BlobReader>> {
166        let Some(file_cache) = &self.file_cache else {
167            return Ok(None);
168        };
169
170        let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin);
171        if file_cache.get(index_key).await.is_none() {
172            return Ok(None);
173        };
174
175        let puffin_manager = self.puffin_manager_factory.build(
176            file_cache.local_store(),
177            WriteCachePathProvider::new(self.region_id, file_cache.clone()),
178        );
179
180        // Adds file size hint to the puffin reader to avoid extra metadata read.
181        let reader = puffin_manager
182            .reader(&file_id)
183            .await
184            .context(PuffinBuildReaderSnafu)?
185            .with_file_size_hint(file_size_hint)
186            .blob(INDEX_BLOB_TYPE)
187            .await
188            .context(PuffinReadBlobSnafu)?
189            .reader()
190            .await
191            .context(PuffinBuildReaderSnafu)?;
192        Ok(Some(reader))
193    }
194
195    /// Creates a blob reader from the remote index file.
196    async fn remote_blob_reader(
197        &self,
198        file_id: FileId,
199        file_size_hint: Option<u64>,
200    ) -> Result<BlobReader> {
201        let puffin_manager = self
202            .puffin_manager_factory
203            .build(
204                self.store.clone(),
205                RegionFilePathFactory::new(self.region_dir.clone()),
206            )
207            .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
208
209        puffin_manager
210            .reader(&file_id)
211            .await
212            .context(PuffinBuildReaderSnafu)?
213            .with_file_size_hint(file_size_hint)
214            .blob(INDEX_BLOB_TYPE)
215            .await
216            .context(PuffinReadBlobSnafu)?
217            .reader()
218            .await
219            .context(PuffinBuildReaderSnafu)
220    }
221}
222
223impl Drop for InvertedIndexApplier {
224    fn drop(&mut self) {
225        INDEX_APPLY_MEMORY_USAGE.sub(self.index_applier.memory_usage() as i64);
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    use futures::io::Cursor;
232    use index::bitmap::Bitmap;
233    use index::inverted_index::search::index_apply::MockIndexApplier;
234    use object_store::services::Memory;
235    use puffin::puffin_manager::PuffinWriter;
236
237    use super::*;
238
239    #[tokio::test]
240    async fn test_index_applier_apply_basic() {
241        let (_d, puffin_manager_factory) =
242            PuffinManagerFactory::new_for_test_async("test_index_applier_apply_basic_").await;
243        let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
244        let file_id = FileId::random();
245        let region_dir = "region_dir".to_string();
246
247        let puffin_manager = puffin_manager_factory.build(
248            object_store.clone(),
249            RegionFilePathFactory::new(region_dir.clone()),
250        );
251        let mut writer = puffin_manager.writer(&file_id).await.unwrap();
252        writer
253            .put_blob(
254                INDEX_BLOB_TYPE,
255                Cursor::new(vec![]),
256                Default::default(),
257                Default::default(),
258            )
259            .await
260            .unwrap();
261        writer.finish().await.unwrap();
262
263        let mut mock_index_applier = MockIndexApplier::new();
264        mock_index_applier.expect_memory_usage().returning(|| 100);
265        mock_index_applier.expect_apply().returning(|_, _| {
266            Ok(ApplyOutput {
267                matched_segment_ids: Bitmap::new_bitvec(),
268                total_row_count: 100,
269                segment_row_count: 10,
270            })
271        });
272
273        let sst_index_applier = InvertedIndexApplier::new(
274            region_dir.clone(),
275            RegionId::new(0, 0),
276            object_store,
277            Box::new(mock_index_applier),
278            puffin_manager_factory,
279        );
280        let output = sst_index_applier.apply(file_id, None).await.unwrap();
281        assert_eq!(
282            output,
283            ApplyOutput {
284                matched_segment_ids: Bitmap::new_bitvec(),
285                total_row_count: 100,
286                segment_row_count: 10,
287            }
288        );
289    }
290
291    #[tokio::test]
292    async fn test_index_applier_apply_invalid_blob_type() {
293        let (_d, puffin_manager_factory) =
294            PuffinManagerFactory::new_for_test_async("test_index_applier_apply_invalid_blob_type_")
295                .await;
296        let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
297        let file_id = FileId::random();
298        let region_dir = "region_dir".to_string();
299
300        let puffin_manager = puffin_manager_factory.build(
301            object_store.clone(),
302            RegionFilePathFactory::new(region_dir.clone()),
303        );
304        let mut writer = puffin_manager.writer(&file_id).await.unwrap();
305        writer
306            .put_blob(
307                "invalid_blob_type",
308                Cursor::new(vec![]),
309                Default::default(),
310                Default::default(),
311            )
312            .await
313            .unwrap();
314        writer.finish().await.unwrap();
315
316        let mut mock_index_applier = MockIndexApplier::new();
317        mock_index_applier.expect_memory_usage().returning(|| 100);
318        mock_index_applier.expect_apply().never();
319
320        let sst_index_applier = InvertedIndexApplier::new(
321            region_dir.clone(),
322            RegionId::new(0, 0),
323            object_store,
324            Box::new(mock_index_applier),
325            puffin_manager_factory,
326        );
327        let res = sst_index_applier.apply(file_id, None).await;
328        assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found"));
329    }
330}