mito2/sst/index/fulltext_index/
applier.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashSet};
16use std::iter;
17use std::ops::Range;
18use std::sync::Arc;
19
20use common_base::range_read::RangeReader;
21use common_telemetry::warn;
22use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
23use index::bloom_filter::reader::BloomFilterReaderImpl;
24use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
25use index::fulltext_index::tokenizer::{ChineseTokenizer, EnglishTokenizer, Tokenizer};
26use index::fulltext_index::{Analyzer, Config};
27use index::target::IndexTarget;
28use object_store::ObjectStore;
29use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
30use puffin::puffin_manager::{GuardWithMetadata, PuffinManager, PuffinReader};
31use snafu::ResultExt;
32use store_api::region_request::PathType;
33use store_api::storage::ColumnId;
34
35use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
36use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
37use crate::cache::index::bloom_filter_index::{
38    BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
39};
40use crate::cache::index::result_cache::PredicateKey;
41use crate::error::{
42    ApplyBloomFilterIndexSnafu, ApplyFulltextIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu,
43    PuffinReadBlobSnafu, Result,
44};
45use crate::metrics::INDEX_APPLY_ELAPSED;
46use crate::sst::file::RegionFileId;
47use crate::sst::index::TYPE_FULLTEXT_INDEX;
48use crate::sst::index::fulltext_index::applier::builder::{FulltextRequest, FulltextTerm};
49use crate::sst::index::fulltext_index::{INDEX_BLOB_TYPE_BLOOM, INDEX_BLOB_TYPE_TANTIVY};
50use crate::sst::index::puffin_manager::{
51    PuffinManagerFactory, SstPuffinBlob, SstPuffinDir, SstPuffinReader,
52};
53
54pub mod builder;
55
56/// `FulltextIndexApplier` is responsible for applying fulltext index to the provided SST files
57pub struct FulltextIndexApplier {
58    /// Requests to be applied.
59    requests: Arc<BTreeMap<ColumnId, FulltextRequest>>,
60
61    /// The source of the index.
62    index_source: IndexSource,
63
64    /// Cache for bloom filter index.
65    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
66
67    /// Predicate key. Used to identify the predicate and fetch result from cache.
68    predicate_key: PredicateKey,
69}
70
71pub type FulltextIndexApplierRef = Arc<FulltextIndexApplier>;
72
73impl FulltextIndexApplier {
74    /// Creates a new `FulltextIndexApplier`.
75    pub fn new(
76        table_dir: String,
77        path_type: PathType,
78        store: ObjectStore,
79        requests: BTreeMap<ColumnId, FulltextRequest>,
80        puffin_manager_factory: PuffinManagerFactory,
81    ) -> Self {
82        let requests = Arc::new(requests);
83        let index_source = IndexSource::new(table_dir, path_type, puffin_manager_factory, store);
84
85        Self {
86            predicate_key: PredicateKey::new_fulltext(requests.clone()),
87            requests,
88            index_source,
89            bloom_filter_index_cache: None,
90        }
91    }
92
93    /// Sets the file cache.
94    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
95        self.index_source.set_file_cache(file_cache);
96        self
97    }
98
99    /// Sets the puffin metadata cache.
100    pub fn with_puffin_metadata_cache(
101        mut self,
102        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
103    ) -> Self {
104        self.index_source
105            .set_puffin_metadata_cache(puffin_metadata_cache);
106        self
107    }
108
109    /// Sets the bloom filter cache.
110    pub fn with_bloom_filter_cache(
111        mut self,
112        bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
113    ) -> Self {
114        self.bloom_filter_index_cache = bloom_filter_index_cache;
115        self
116    }
117
118    /// Returns the predicate key.
119    pub fn predicate_key(&self) -> &PredicateKey {
120        &self.predicate_key
121    }
122}
123
124impl FulltextIndexApplier {
125    /// Applies fine-grained fulltext index to the specified SST file.
126    /// Returns the row ids that match the queries.
127    pub async fn apply_fine(
128        &self,
129        file_id: RegionFileId,
130        file_size_hint: Option<u64>,
131    ) -> Result<Option<BTreeSet<RowId>>> {
132        let timer = INDEX_APPLY_ELAPSED
133            .with_label_values(&[TYPE_FULLTEXT_INDEX])
134            .start_timer();
135
136        let mut row_ids: Option<BTreeSet<RowId>> = None;
137        for (column_id, request) in self.requests.iter() {
138            if request.queries.is_empty() && request.terms.is_empty() {
139                continue;
140            }
141
142            let Some(result) = self
143                .apply_fine_one_column(file_size_hint, file_id, *column_id, request)
144                .await?
145            else {
146                continue;
147            };
148
149            if let Some(ids) = row_ids.as_mut() {
150                ids.retain(|id| result.contains(id));
151            } else {
152                row_ids = Some(result);
153            }
154
155            if let Some(ids) = row_ids.as_ref()
156                && ids.is_empty()
157            {
158                break;
159            }
160        }
161
162        if row_ids.is_none() {
163            timer.stop_and_discard();
164        }
165        Ok(row_ids)
166    }
167
168    async fn apply_fine_one_column(
169        &self,
170        file_size_hint: Option<u64>,
171        file_id: RegionFileId,
172        column_id: ColumnId,
173        request: &FulltextRequest,
174    ) -> Result<Option<BTreeSet<RowId>>> {
175        let blob_key = format!(
176            "{INDEX_BLOB_TYPE_TANTIVY}-{}",
177            IndexTarget::ColumnId(column_id)
178        );
179        let dir = self
180            .index_source
181            .dir(file_id, &blob_key, file_size_hint)
182            .await?;
183
184        let dir = match &dir {
185            Some(dir) => dir,
186            None => {
187                return Ok(None);
188            }
189        };
190
191        let config = Config::from_blob_metadata(dir.metadata()).context(ApplyFulltextIndexSnafu)?;
192        let path = dir.path();
193
194        let searcher =
195            TantivyFulltextIndexSearcher::new(path, config).context(ApplyFulltextIndexSnafu)?;
196        let mut row_ids: Option<BTreeSet<RowId>> = None;
197
198        // 1. Apply queries
199        for query in &request.queries {
200            let result = searcher
201                .search(&query.0)
202                .await
203                .context(ApplyFulltextIndexSnafu)?;
204
205            if let Some(ids) = row_ids.as_mut() {
206                ids.retain(|id| result.contains(id));
207            } else {
208                row_ids = Some(result);
209            }
210
211            if let Some(ids) = row_ids.as_ref()
212                && ids.is_empty()
213            {
214                break;
215            }
216        }
217
218        // 2. Apply terms
219        let query = request.terms_as_query(config.case_sensitive);
220        if !query.0.is_empty() {
221            let result = searcher
222                .search(&query.0)
223                .await
224                .context(ApplyFulltextIndexSnafu)?;
225
226            if let Some(ids) = row_ids.as_mut() {
227                ids.retain(|id| result.contains(id));
228            } else {
229                row_ids = Some(result);
230            }
231        }
232
233        Ok(row_ids)
234    }
235}
236
237impl FulltextIndexApplier {
238    /// Applies coarse-grained fulltext index to the specified SST file.
239    /// Returns (row group id -> ranges) that match the queries.
240    ///
241    /// Row group id existing in the returned result means that the row group is searched.
242    /// Empty ranges means that the row group is searched but no rows are found.
243    pub async fn apply_coarse(
244        &self,
245        file_id: RegionFileId,
246        file_size_hint: Option<u64>,
247        row_groups: impl Iterator<Item = (usize, bool)>,
248    ) -> Result<Option<Vec<(usize, Vec<Range<usize>>)>>> {
249        let timer = INDEX_APPLY_ELAPSED
250            .with_label_values(&[TYPE_FULLTEXT_INDEX])
251            .start_timer();
252
253        let (input, mut output) = Self::init_coarse_output(row_groups);
254        let mut applied = false;
255
256        for (column_id, request) in self.requests.iter() {
257            if request.terms.is_empty() {
258                // only apply terms
259                continue;
260            }
261
262            applied |= self
263                .apply_coarse_one_column(
264                    file_id,
265                    file_size_hint,
266                    *column_id,
267                    &request.terms,
268                    &mut output,
269                )
270                .await?;
271        }
272
273        if !applied {
274            timer.stop_and_discard();
275            return Ok(None);
276        }
277
278        Self::adjust_coarse_output(input, &mut output);
279        Ok(Some(output))
280    }
281
282    async fn apply_coarse_one_column(
283        &self,
284        file_id: RegionFileId,
285        file_size_hint: Option<u64>,
286        column_id: ColumnId,
287        terms: &[FulltextTerm],
288        output: &mut [(usize, Vec<Range<usize>>)],
289    ) -> Result<bool> {
290        let blob_key = format!(
291            "{INDEX_BLOB_TYPE_BLOOM}-{}",
292            IndexTarget::ColumnId(column_id)
293        );
294        let Some(reader) = self
295            .index_source
296            .blob(file_id, &blob_key, file_size_hint)
297            .await?
298        else {
299            return Ok(false);
300        };
301        let config =
302            Config::from_blob_metadata(reader.metadata()).context(ApplyFulltextIndexSnafu)?;
303
304        let predicates = Self::terms_to_predicates(terms, &config);
305        if predicates.is_empty() {
306            return Ok(false);
307        }
308
309        let range_reader = reader.reader().await.context(PuffinBuildReaderSnafu)?;
310        let reader = if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
311            let blob_size = range_reader
312                .metadata()
313                .await
314                .context(MetadataSnafu)?
315                .content_length;
316            let reader = CachedBloomFilterIndexBlobReader::new(
317                file_id.file_id(),
318                column_id,
319                Tag::Fulltext,
320                blob_size,
321                BloomFilterReaderImpl::new(range_reader),
322                bloom_filter_cache.clone(),
323            );
324            Box::new(reader) as _
325        } else {
326            Box::new(BloomFilterReaderImpl::new(range_reader)) as _
327        };
328
329        let mut applier = BloomFilterApplier::new(reader)
330            .await
331            .context(ApplyBloomFilterIndexSnafu)?;
332        for (_, row_group_output) in output.iter_mut() {
333            // All rows are filtered out, skip the search
334            if row_group_output.is_empty() {
335                continue;
336            }
337
338            *row_group_output = applier
339                .search(&predicates, row_group_output)
340                .await
341                .context(ApplyBloomFilterIndexSnafu)?;
342        }
343
344        Ok(true)
345    }
346
347    /// Initializes the coarse output. Must call `adjust_coarse_output` after applying bloom filters.
348    ///
349    /// `row_groups` is a list of (row group length, whether to search).
350    ///
351    /// Returns (`input`, `output`):
352    /// * `input` is a list of (row group index to search, row group range based on start of the file).
353    /// * `output` is a list of (row group index to search, row group ranges based on start of the file).
354    #[allow(clippy::type_complexity)]
355    fn init_coarse_output(
356        row_groups: impl Iterator<Item = (usize, bool)>,
357    ) -> (Vec<(usize, Range<usize>)>, Vec<(usize, Vec<Range<usize>>)>) {
358        // Calculates row groups' ranges based on start of the file.
359        let mut input = Vec::with_capacity(row_groups.size_hint().0);
360        let mut start = 0;
361        for (i, (len, to_search)) in row_groups.enumerate() {
362            let end = start + len;
363            if to_search {
364                input.push((i, start..end));
365            }
366            start = end;
367        }
368
369        // Initializes output with input ranges, but ranges are based on start of the file not the row group,
370        // so we need to adjust them later.
371        let output = input
372            .iter()
373            .map(|(i, range)| (*i, vec![range.clone()]))
374            .collect::<Vec<_>>();
375
376        (input, output)
377    }
378
379    /// Adjusts the coarse output. Makes the output ranges based on row group start.
380    fn adjust_coarse_output(
381        input: Vec<(usize, Range<usize>)>,
382        output: &mut [(usize, Vec<Range<usize>>)],
383    ) {
384        // adjust ranges to be based on row group
385        for ((_, output), (_, input)) in output.iter_mut().zip(input) {
386            let start = input.start;
387            for range in output.iter_mut() {
388                range.start -= start;
389                range.end -= start;
390            }
391        }
392    }
393
394    /// Converts terms to predicates.
395    ///
396    /// Split terms by non-alphanumeric characters and convert them to lowercase if case-insensitive.
397    /// Multiple terms are combined with AND semantics.
398    fn terms_to_predicates(terms: &[FulltextTerm], config: &Config) -> Vec<InListPredicate> {
399        let mut probes = HashSet::new();
400        for term in terms {
401            if config.case_sensitive && term.col_lowered {
402                // lowercased terms are not indexed
403                continue;
404            }
405            probes.extend(Self::term_to_probes(&term.term, config));
406        }
407
408        probes
409            .into_iter()
410            .map(|p| InListPredicate {
411                list: iter::once(p).collect(),
412            })
413            .collect::<Vec<_>>()
414    }
415
416    fn term_to_probes<'a>(term: &'a str, config: &'a Config) -> impl Iterator<Item = Vec<u8>> + 'a {
417        let tokens = match config.analyzer {
418            Analyzer::English => EnglishTokenizer {}.tokenize(term),
419            Analyzer::Chinese => ChineseTokenizer {}.tokenize(term),
420        };
421
422        tokens.into_iter().map(|t| {
423            if !config.case_sensitive {
424                t.to_lowercase()
425            } else {
426                t.to_string()
427            }
428            .into_bytes()
429        })
430    }
431}
432
433/// The source of the index.
434struct IndexSource {
435    table_dir: String,
436
437    /// Path type for generating file paths.
438    path_type: PathType,
439
440    /// The puffin manager factory.
441    puffin_manager_factory: PuffinManagerFactory,
442
443    /// Store responsible for accessing remote index files.
444    remote_store: ObjectStore,
445
446    /// Local file cache.
447    file_cache: Option<FileCacheRef>,
448
449    /// The puffin metadata cache.
450    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
451}
452
453impl IndexSource {
454    fn new(
455        table_dir: String,
456        path_type: PathType,
457        puffin_manager_factory: PuffinManagerFactory,
458        remote_store: ObjectStore,
459    ) -> Self {
460        Self {
461            table_dir,
462            path_type,
463            puffin_manager_factory,
464            remote_store,
465            file_cache: None,
466            puffin_metadata_cache: None,
467        }
468    }
469
470    fn set_file_cache(&mut self, file_cache: Option<FileCacheRef>) {
471        self.file_cache = file_cache;
472    }
473
474    fn set_puffin_metadata_cache(&mut self, puffin_metadata_cache: Option<PuffinMetadataCacheRef>) {
475        self.puffin_metadata_cache = puffin_metadata_cache;
476    }
477
478    /// Returns the blob with the specified key from local cache or remote store.
479    ///
480    /// Returns `None` if the blob is not found.
481    async fn blob(
482        &self,
483        file_id: RegionFileId,
484        key: &str,
485        file_size_hint: Option<u64>,
486    ) -> Result<Option<GuardWithMetadata<SstPuffinBlob>>> {
487        let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
488        let res = reader.blob(key).await;
489        match res {
490            Ok(blob) => Ok(Some(blob)),
491            Err(err) if err.is_blob_not_found() => Ok(None),
492            Err(err) => {
493                if fallbacked {
494                    Err(err).context(PuffinReadBlobSnafu)
495                } else {
496                    warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
497                    let reader = self.build_remote(file_id, file_size_hint).await?;
498                    let res = reader.blob(key).await;
499                    match res {
500                        Ok(blob) => Ok(Some(blob)),
501                        Err(err) if err.is_blob_not_found() => Ok(None),
502                        Err(err) => Err(err).context(PuffinReadBlobSnafu),
503                    }
504                }
505            }
506        }
507    }
508
509    /// Returns the directory with the specified key from local cache or remote store.
510    ///
511    /// Returns `None` if the directory is not found.
512    async fn dir(
513        &self,
514        file_id: RegionFileId,
515        key: &str,
516        file_size_hint: Option<u64>,
517    ) -> Result<Option<GuardWithMetadata<SstPuffinDir>>> {
518        let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
519        let res = reader.dir(key).await;
520        match res {
521            Ok(dir) => Ok(Some(dir)),
522            Err(err) if err.is_blob_not_found() => Ok(None),
523            Err(err) => {
524                if fallbacked {
525                    Err(err).context(PuffinReadBlobSnafu)
526                } else {
527                    warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
528                    let reader = self.build_remote(file_id, file_size_hint).await?;
529                    let res = reader.dir(key).await;
530                    match res {
531                        Ok(dir) => Ok(Some(dir)),
532                        Err(err) if err.is_blob_not_found() => Ok(None),
533                        Err(err) => Err(err).context(PuffinReadBlobSnafu),
534                    }
535                }
536            }
537        }
538    }
539
540    /// Return reader and whether it is fallbacked to remote store.
541    async fn ensure_reader(
542        &self,
543        file_id: RegionFileId,
544        file_size_hint: Option<u64>,
545    ) -> Result<(SstPuffinReader, bool)> {
546        match self.build_local_cache(file_id, file_size_hint).await {
547            Ok(Some(r)) => Ok((r, false)),
548            Ok(None) => Ok((self.build_remote(file_id, file_size_hint).await?, true)),
549            Err(err) => Err(err),
550        }
551    }
552
553    async fn build_local_cache(
554        &self,
555        file_id: RegionFileId,
556        file_size_hint: Option<u64>,
557    ) -> Result<Option<SstPuffinReader>> {
558        let Some(file_cache) = &self.file_cache else {
559            return Ok(None);
560        };
561
562        let index_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin);
563        if file_cache.get(index_key).await.is_none() {
564            return Ok(None);
565        };
566
567        let puffin_manager = self
568            .puffin_manager_factory
569            .build(
570                file_cache.local_store(),
571                WriteCachePathProvider::new(file_cache.clone()),
572            )
573            .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
574        let reader = puffin_manager
575            .reader(&file_id)
576            .await
577            .context(PuffinBuildReaderSnafu)?
578            .with_file_size_hint(file_size_hint);
579        Ok(Some(reader))
580    }
581
582    async fn build_remote(
583        &self,
584        file_id: RegionFileId,
585        file_size_hint: Option<u64>,
586    ) -> Result<SstPuffinReader> {
587        let puffin_manager = self
588            .puffin_manager_factory
589            .build(
590                self.remote_store.clone(),
591                RegionFilePathFactory::new(self.table_dir.clone(), self.path_type),
592            )
593            .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
594
595        let reader = puffin_manager
596            .reader(&file_id)
597            .await
598            .context(PuffinBuildReaderSnafu)?
599            .with_file_size_hint(file_size_hint);
600
601        Ok(reader)
602    }
603}