mito2/sst/index/fulltext_index/
applier.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashSet};
16use std::iter;
17use std::ops::Range;
18use std::sync::Arc;
19
20use common_base::range_read::RangeReader;
21use common_telemetry::warn;
22use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
23use index::bloom_filter::reader::BloomFilterReaderImpl;
24use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
25use index::fulltext_index::tokenizer::{ChineseTokenizer, EnglishTokenizer, Tokenizer};
26use index::fulltext_index::{Analyzer, Config};
27use object_store::ObjectStore;
28use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
29use puffin::puffin_manager::{GuardWithMetadata, PuffinManager, PuffinReader};
30use snafu::ResultExt;
31use store_api::region_request::PathType;
32use store_api::storage::ColumnId;
33
34use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
35use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
36use crate::cache::index::bloom_filter_index::{
37    BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
38};
39use crate::cache::index::result_cache::PredicateKey;
40use crate::error::{
41    ApplyBloomFilterIndexSnafu, ApplyFulltextIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu,
42    PuffinReadBlobSnafu, Result,
43};
44use crate::metrics::INDEX_APPLY_ELAPSED;
45use crate::sst::file::RegionFileId;
46use crate::sst::index::fulltext_index::applier::builder::{FulltextRequest, FulltextTerm};
47use crate::sst::index::fulltext_index::{INDEX_BLOB_TYPE_BLOOM, INDEX_BLOB_TYPE_TANTIVY};
48use crate::sst::index::puffin_manager::{
49    PuffinManagerFactory, SstPuffinBlob, SstPuffinDir, SstPuffinReader,
50};
51use crate::sst::index::TYPE_FULLTEXT_INDEX;
52
53pub mod builder;
54
55/// `FulltextIndexApplier` is responsible for applying fulltext index to the provided SST files
56pub struct FulltextIndexApplier {
57    /// Requests to be applied.
58    requests: Arc<BTreeMap<ColumnId, FulltextRequest>>,
59
60    /// The source of the index.
61    index_source: IndexSource,
62
63    /// Cache for bloom filter index.
64    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
65
66    /// Predicate key. Used to identify the predicate and fetch result from cache.
67    predicate_key: PredicateKey,
68}
69
70pub type FulltextIndexApplierRef = Arc<FulltextIndexApplier>;
71
72impl FulltextIndexApplier {
73    /// Creates a new `FulltextIndexApplier`.
74    pub fn new(
75        table_dir: String,
76        path_type: PathType,
77        store: ObjectStore,
78        requests: BTreeMap<ColumnId, FulltextRequest>,
79        puffin_manager_factory: PuffinManagerFactory,
80    ) -> Self {
81        let requests = Arc::new(requests);
82        let index_source = IndexSource::new(table_dir, path_type, puffin_manager_factory, store);
83
84        Self {
85            predicate_key: PredicateKey::new_fulltext(requests.clone()),
86            requests,
87            index_source,
88            bloom_filter_index_cache: None,
89        }
90    }
91
92    /// Sets the file cache.
93    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
94        self.index_source.set_file_cache(file_cache);
95        self
96    }
97
98    /// Sets the puffin metadata cache.
99    pub fn with_puffin_metadata_cache(
100        mut self,
101        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
102    ) -> Self {
103        self.index_source
104            .set_puffin_metadata_cache(puffin_metadata_cache);
105        self
106    }
107
108    /// Sets the bloom filter cache.
109    pub fn with_bloom_filter_cache(
110        mut self,
111        bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
112    ) -> Self {
113        self.bloom_filter_index_cache = bloom_filter_index_cache;
114        self
115    }
116
117    /// Returns the predicate key.
118    pub fn predicate_key(&self) -> &PredicateKey {
119        &self.predicate_key
120    }
121}
122
123impl FulltextIndexApplier {
124    /// Applies fine-grained fulltext index to the specified SST file.
125    /// Returns the row ids that match the queries.
126    pub async fn apply_fine(
127        &self,
128        file_id: RegionFileId,
129        file_size_hint: Option<u64>,
130    ) -> Result<Option<BTreeSet<RowId>>> {
131        let timer = INDEX_APPLY_ELAPSED
132            .with_label_values(&[TYPE_FULLTEXT_INDEX])
133            .start_timer();
134
135        let mut row_ids: Option<BTreeSet<RowId>> = None;
136        for (column_id, request) in self.requests.iter() {
137            if request.queries.is_empty() && request.terms.is_empty() {
138                continue;
139            }
140
141            let Some(result) = self
142                .apply_fine_one_column(file_size_hint, file_id, *column_id, request)
143                .await?
144            else {
145                continue;
146            };
147
148            if let Some(ids) = row_ids.as_mut() {
149                ids.retain(|id| result.contains(id));
150            } else {
151                row_ids = Some(result);
152            }
153
154            if let Some(ids) = row_ids.as_ref() {
155                if ids.is_empty() {
156                    break;
157                }
158            }
159        }
160
161        if row_ids.is_none() {
162            timer.stop_and_discard();
163        }
164        Ok(row_ids)
165    }
166
167    async fn apply_fine_one_column(
168        &self,
169        file_size_hint: Option<u64>,
170        file_id: RegionFileId,
171        column_id: ColumnId,
172        request: &FulltextRequest,
173    ) -> Result<Option<BTreeSet<RowId>>> {
174        let blob_key = format!("{INDEX_BLOB_TYPE_TANTIVY}-{column_id}");
175        let dir = self
176            .index_source
177            .dir(file_id, &blob_key, file_size_hint)
178            .await?;
179
180        let dir = match &dir {
181            Some(dir) => dir,
182            None => {
183                return Ok(None);
184            }
185        };
186
187        let config = Config::from_blob_metadata(dir.metadata()).context(ApplyFulltextIndexSnafu)?;
188        let path = dir.path();
189
190        let searcher =
191            TantivyFulltextIndexSearcher::new(path, config).context(ApplyFulltextIndexSnafu)?;
192        let mut row_ids: Option<BTreeSet<RowId>> = None;
193
194        // 1. Apply queries
195        for query in &request.queries {
196            let result = searcher
197                .search(&query.0)
198                .await
199                .context(ApplyFulltextIndexSnafu)?;
200
201            if let Some(ids) = row_ids.as_mut() {
202                ids.retain(|id| result.contains(id));
203            } else {
204                row_ids = Some(result);
205            }
206
207            if let Some(ids) = row_ids.as_ref() {
208                if ids.is_empty() {
209                    break;
210                }
211            }
212        }
213
214        // 2. Apply terms
215        let query = request.terms_as_query(config.case_sensitive);
216        if !query.0.is_empty() {
217            let result = searcher
218                .search(&query.0)
219                .await
220                .context(ApplyFulltextIndexSnafu)?;
221
222            if let Some(ids) = row_ids.as_mut() {
223                ids.retain(|id| result.contains(id));
224            } else {
225                row_ids = Some(result);
226            }
227        }
228
229        Ok(row_ids)
230    }
231}
232
233impl FulltextIndexApplier {
234    /// Applies coarse-grained fulltext index to the specified SST file.
235    /// Returns (row group id -> ranges) that match the queries.
236    ///
237    /// Row group id existing in the returned result means that the row group is searched.
238    /// Empty ranges means that the row group is searched but no rows are found.
239    pub async fn apply_coarse(
240        &self,
241        file_id: RegionFileId,
242        file_size_hint: Option<u64>,
243        row_groups: impl Iterator<Item = (usize, bool)>,
244    ) -> Result<Option<Vec<(usize, Vec<Range<usize>>)>>> {
245        let timer = INDEX_APPLY_ELAPSED
246            .with_label_values(&[TYPE_FULLTEXT_INDEX])
247            .start_timer();
248
249        let (input, mut output) = Self::init_coarse_output(row_groups);
250        let mut applied = false;
251
252        for (column_id, request) in self.requests.iter() {
253            if request.terms.is_empty() {
254                // only apply terms
255                continue;
256            }
257
258            applied |= self
259                .apply_coarse_one_column(
260                    file_id,
261                    file_size_hint,
262                    *column_id,
263                    &request.terms,
264                    &mut output,
265                )
266                .await?;
267        }
268
269        if !applied {
270            timer.stop_and_discard();
271            return Ok(None);
272        }
273
274        Self::adjust_coarse_output(input, &mut output);
275        Ok(Some(output))
276    }
277
278    async fn apply_coarse_one_column(
279        &self,
280        file_id: RegionFileId,
281        file_size_hint: Option<u64>,
282        column_id: ColumnId,
283        terms: &[FulltextTerm],
284        output: &mut [(usize, Vec<Range<usize>>)],
285    ) -> Result<bool> {
286        let blob_key = format!("{INDEX_BLOB_TYPE_BLOOM}-{column_id}");
287        let Some(reader) = self
288            .index_source
289            .blob(file_id, &blob_key, file_size_hint)
290            .await?
291        else {
292            return Ok(false);
293        };
294        let config =
295            Config::from_blob_metadata(reader.metadata()).context(ApplyFulltextIndexSnafu)?;
296
297        let predicates = Self::terms_to_predicates(terms, &config);
298        if predicates.is_empty() {
299            return Ok(false);
300        }
301
302        let range_reader = reader.reader().await.context(PuffinBuildReaderSnafu)?;
303        let reader = if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
304            let blob_size = range_reader
305                .metadata()
306                .await
307                .context(MetadataSnafu)?
308                .content_length;
309            let reader = CachedBloomFilterIndexBlobReader::new(
310                file_id.file_id(),
311                column_id,
312                Tag::Fulltext,
313                blob_size,
314                BloomFilterReaderImpl::new(range_reader),
315                bloom_filter_cache.clone(),
316            );
317            Box::new(reader) as _
318        } else {
319            Box::new(BloomFilterReaderImpl::new(range_reader)) as _
320        };
321
322        let mut applier = BloomFilterApplier::new(reader)
323            .await
324            .context(ApplyBloomFilterIndexSnafu)?;
325        for (_, row_group_output) in output.iter_mut() {
326            // All rows are filtered out, skip the search
327            if row_group_output.is_empty() {
328                continue;
329            }
330
331            *row_group_output = applier
332                .search(&predicates, row_group_output)
333                .await
334                .context(ApplyBloomFilterIndexSnafu)?;
335        }
336
337        Ok(true)
338    }
339
340    /// Initializes the coarse output. Must call `adjust_coarse_output` after applying bloom filters.
341    ///
342    /// `row_groups` is a list of (row group length, whether to search).
343    ///
344    /// Returns (`input`, `output`):
345    /// * `input` is a list of (row group index to search, row group range based on start of the file).
346    /// * `output` is a list of (row group index to search, row group ranges based on start of the file).
347    #[allow(clippy::type_complexity)]
348    fn init_coarse_output(
349        row_groups: impl Iterator<Item = (usize, bool)>,
350    ) -> (Vec<(usize, Range<usize>)>, Vec<(usize, Vec<Range<usize>>)>) {
351        // Calculates row groups' ranges based on start of the file.
352        let mut input = Vec::with_capacity(row_groups.size_hint().0);
353        let mut start = 0;
354        for (i, (len, to_search)) in row_groups.enumerate() {
355            let end = start + len;
356            if to_search {
357                input.push((i, start..end));
358            }
359            start = end;
360        }
361
362        // Initializes output with input ranges, but ranges are based on start of the file not the row group,
363        // so we need to adjust them later.
364        let output = input
365            .iter()
366            .map(|(i, range)| (*i, vec![range.clone()]))
367            .collect::<Vec<_>>();
368
369        (input, output)
370    }
371
372    /// Adjusts the coarse output. Makes the output ranges based on row group start.
373    fn adjust_coarse_output(
374        input: Vec<(usize, Range<usize>)>,
375        output: &mut [(usize, Vec<Range<usize>>)],
376    ) {
377        // adjust ranges to be based on row group
378        for ((_, output), (_, input)) in output.iter_mut().zip(input) {
379            let start = input.start;
380            for range in output.iter_mut() {
381                range.start -= start;
382                range.end -= start;
383            }
384        }
385    }
386
387    /// Converts terms to predicates.
388    ///
389    /// Split terms by non-alphanumeric characters and convert them to lowercase if case-insensitive.
390    /// Multiple terms are combined with AND semantics.
391    fn terms_to_predicates(terms: &[FulltextTerm], config: &Config) -> Vec<InListPredicate> {
392        let mut probes = HashSet::new();
393        for term in terms {
394            if config.case_sensitive && term.col_lowered {
395                // lowercased terms are not indexed
396                continue;
397            }
398            probes.extend(Self::term_to_probes(&term.term, config));
399        }
400
401        probes
402            .into_iter()
403            .map(|p| InListPredicate {
404                list: iter::once(p).collect(),
405            })
406            .collect::<Vec<_>>()
407    }
408
409    fn term_to_probes<'a>(term: &'a str, config: &'a Config) -> impl Iterator<Item = Vec<u8>> + 'a {
410        let tokens = match config.analyzer {
411            Analyzer::English => EnglishTokenizer {}.tokenize(term),
412            Analyzer::Chinese => ChineseTokenizer {}.tokenize(term),
413        };
414
415        tokens.into_iter().map(|t| {
416            if !config.case_sensitive {
417                t.to_lowercase()
418            } else {
419                t.to_string()
420            }
421            .into_bytes()
422        })
423    }
424}
425
426/// The source of the index.
427struct IndexSource {
428    table_dir: String,
429
430    /// Path type for generating file paths.
431    path_type: PathType,
432
433    /// The puffin manager factory.
434    puffin_manager_factory: PuffinManagerFactory,
435
436    /// Store responsible for accessing remote index files.
437    remote_store: ObjectStore,
438
439    /// Local file cache.
440    file_cache: Option<FileCacheRef>,
441
442    /// The puffin metadata cache.
443    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
444}
445
446impl IndexSource {
447    fn new(
448        table_dir: String,
449        path_type: PathType,
450        puffin_manager_factory: PuffinManagerFactory,
451        remote_store: ObjectStore,
452    ) -> Self {
453        Self {
454            table_dir,
455            path_type,
456            puffin_manager_factory,
457            remote_store,
458            file_cache: None,
459            puffin_metadata_cache: None,
460        }
461    }
462
463    fn set_file_cache(&mut self, file_cache: Option<FileCacheRef>) {
464        self.file_cache = file_cache;
465    }
466
467    fn set_puffin_metadata_cache(&mut self, puffin_metadata_cache: Option<PuffinMetadataCacheRef>) {
468        self.puffin_metadata_cache = puffin_metadata_cache;
469    }
470
471    /// Returns the blob with the specified key from local cache or remote store.
472    ///
473    /// Returns `None` if the blob is not found.
474    async fn blob(
475        &self,
476        file_id: RegionFileId,
477        key: &str,
478        file_size_hint: Option<u64>,
479    ) -> Result<Option<GuardWithMetadata<SstPuffinBlob>>> {
480        let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
481        let res = reader.blob(key).await;
482        match res {
483            Ok(blob) => Ok(Some(blob)),
484            Err(err) if err.is_blob_not_found() => Ok(None),
485            Err(err) => {
486                if fallbacked {
487                    Err(err).context(PuffinReadBlobSnafu)
488                } else {
489                    warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
490                    let reader = self.build_remote(file_id, file_size_hint).await?;
491                    let res = reader.blob(key).await;
492                    match res {
493                        Ok(blob) => Ok(Some(blob)),
494                        Err(err) if err.is_blob_not_found() => Ok(None),
495                        Err(err) => Err(err).context(PuffinReadBlobSnafu),
496                    }
497                }
498            }
499        }
500    }
501
502    /// Returns the directory with the specified key from local cache or remote store.
503    ///
504    /// Returns `None` if the directory is not found.
505    async fn dir(
506        &self,
507        file_id: RegionFileId,
508        key: &str,
509        file_size_hint: Option<u64>,
510    ) -> Result<Option<GuardWithMetadata<SstPuffinDir>>> {
511        let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
512        let res = reader.dir(key).await;
513        match res {
514            Ok(dir) => Ok(Some(dir)),
515            Err(err) if err.is_blob_not_found() => Ok(None),
516            Err(err) => {
517                if fallbacked {
518                    Err(err).context(PuffinReadBlobSnafu)
519                } else {
520                    warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
521                    let reader = self.build_remote(file_id, file_size_hint).await?;
522                    let res = reader.dir(key).await;
523                    match res {
524                        Ok(dir) => Ok(Some(dir)),
525                        Err(err) if err.is_blob_not_found() => Ok(None),
526                        Err(err) => Err(err).context(PuffinReadBlobSnafu),
527                    }
528                }
529            }
530        }
531    }
532
533    /// Return reader and whether it is fallbacked to remote store.
534    async fn ensure_reader(
535        &self,
536        file_id: RegionFileId,
537        file_size_hint: Option<u64>,
538    ) -> Result<(SstPuffinReader, bool)> {
539        match self.build_local_cache(file_id, file_size_hint).await {
540            Ok(Some(r)) => Ok((r, false)),
541            Ok(None) => Ok((self.build_remote(file_id, file_size_hint).await?, true)),
542            Err(err) => Err(err),
543        }
544    }
545
546    async fn build_local_cache(
547        &self,
548        file_id: RegionFileId,
549        file_size_hint: Option<u64>,
550    ) -> Result<Option<SstPuffinReader>> {
551        let Some(file_cache) = &self.file_cache else {
552            return Ok(None);
553        };
554
555        let index_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin);
556        if file_cache.get(index_key).await.is_none() {
557            return Ok(None);
558        };
559
560        let puffin_manager = self
561            .puffin_manager_factory
562            .build(
563                file_cache.local_store(),
564                WriteCachePathProvider::new(file_cache.clone()),
565            )
566            .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
567        let reader = puffin_manager
568            .reader(&file_id)
569            .await
570            .context(PuffinBuildReaderSnafu)?
571            .with_file_size_hint(file_size_hint);
572        Ok(Some(reader))
573    }
574
575    async fn build_remote(
576        &self,
577        file_id: RegionFileId,
578        file_size_hint: Option<u64>,
579    ) -> Result<SstPuffinReader> {
580        let puffin_manager = self
581            .puffin_manager_factory
582            .build(
583                self.remote_store.clone(),
584                RegionFilePathFactory::new(self.table_dir.clone(), self.path_type),
585            )
586            .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
587
588        let reader = puffin_manager
589            .reader(&file_id)
590            .await
591            .context(PuffinBuildReaderSnafu)?
592            .with_file_size_hint(file_size_hint);
593
594        Ok(reader)
595    }
596}