mito2/sst/index/fulltext_index/
applier.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashSet};
16use std::iter;
17use std::ops::Range;
18use std::sync::Arc;
19
20use common_base::range_read::RangeReader;
21use common_telemetry::warn;
22use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
23use index::bloom_filter::reader::BloomFilterReaderImpl;
24use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
25use index::fulltext_index::Config;
26use object_store::ObjectStore;
27use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
28use puffin::puffin_manager::{GuardWithMetadata, PuffinManager, PuffinReader};
29use snafu::ResultExt;
30use store_api::storage::{ColumnId, RegionId};
31
32use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
33use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
34use crate::cache::index::bloom_filter_index::{
35    BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
36};
37use crate::cache::index::result_cache::PredicateKey;
38use crate::error::{
39    ApplyBloomFilterIndexSnafu, ApplyFulltextIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu,
40    PuffinReadBlobSnafu, Result,
41};
42use crate::metrics::INDEX_APPLY_ELAPSED;
43use crate::sst::file::FileId;
44use crate::sst::index::fulltext_index::applier::builder::{FulltextRequest, FulltextTerm};
45use crate::sst::index::fulltext_index::{INDEX_BLOB_TYPE_BLOOM, INDEX_BLOB_TYPE_TANTIVY};
46use crate::sst::index::puffin_manager::{
47    PuffinManagerFactory, SstPuffinBlob, SstPuffinDir, SstPuffinReader,
48};
49use crate::sst::index::TYPE_FULLTEXT_INDEX;
50
51pub mod builder;
52
53/// `FulltextIndexApplier` is responsible for applying fulltext index to the provided SST files
54pub struct FulltextIndexApplier {
55    /// Requests to be applied.
56    requests: Arc<BTreeMap<ColumnId, FulltextRequest>>,
57
58    /// The source of the index.
59    index_source: IndexSource,
60
61    /// Cache for bloom filter index.
62    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
63
64    /// Predicate key. Used to identify the predicate and fetch result from cache.
65    predicate_key: PredicateKey,
66}
67
68pub type FulltextIndexApplierRef = Arc<FulltextIndexApplier>;
69
70impl FulltextIndexApplier {
71    /// Creates a new `FulltextIndexApplier`.
72    pub fn new(
73        region_dir: String,
74        region_id: RegionId,
75        store: ObjectStore,
76        requests: BTreeMap<ColumnId, FulltextRequest>,
77        puffin_manager_factory: PuffinManagerFactory,
78    ) -> Self {
79        let requests = Arc::new(requests);
80        let index_source = IndexSource::new(region_dir, region_id, puffin_manager_factory, store);
81
82        Self {
83            predicate_key: PredicateKey::new_fulltext(requests.clone()),
84            requests,
85            index_source,
86            bloom_filter_index_cache: None,
87        }
88    }
89
90    /// Sets the file cache.
91    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
92        self.index_source.set_file_cache(file_cache);
93        self
94    }
95
96    /// Sets the puffin metadata cache.
97    pub fn with_puffin_metadata_cache(
98        mut self,
99        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
100    ) -> Self {
101        self.index_source
102            .set_puffin_metadata_cache(puffin_metadata_cache);
103        self
104    }
105
106    /// Sets the bloom filter cache.
107    pub fn with_bloom_filter_cache(
108        mut self,
109        bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
110    ) -> Self {
111        self.bloom_filter_index_cache = bloom_filter_index_cache;
112        self
113    }
114
115    /// Returns the predicate key.
116    pub fn predicate_key(&self) -> &PredicateKey {
117        &self.predicate_key
118    }
119}
120
121impl FulltextIndexApplier {
122    /// Applies fine-grained fulltext index to the specified SST file.
123    /// Returns the row ids that match the queries.
124    pub async fn apply_fine(
125        &self,
126        file_id: FileId,
127        file_size_hint: Option<u64>,
128    ) -> Result<Option<BTreeSet<RowId>>> {
129        let timer = INDEX_APPLY_ELAPSED
130            .with_label_values(&[TYPE_FULLTEXT_INDEX])
131            .start_timer();
132
133        let mut row_ids: Option<BTreeSet<RowId>> = None;
134        for (column_id, request) in self.requests.iter() {
135            if request.queries.is_empty() && request.terms.is_empty() {
136                continue;
137            }
138
139            let Some(result) = self
140                .apply_fine_one_column(file_size_hint, file_id, *column_id, request)
141                .await?
142            else {
143                continue;
144            };
145
146            if let Some(ids) = row_ids.as_mut() {
147                ids.retain(|id| result.contains(id));
148            } else {
149                row_ids = Some(result);
150            }
151
152            if let Some(ids) = row_ids.as_ref() {
153                if ids.is_empty() {
154                    break;
155                }
156            }
157        }
158
159        if row_ids.is_none() {
160            timer.stop_and_discard();
161        }
162        Ok(row_ids)
163    }
164
165    async fn apply_fine_one_column(
166        &self,
167        file_size_hint: Option<u64>,
168        file_id: FileId,
169        column_id: ColumnId,
170        request: &FulltextRequest,
171    ) -> Result<Option<BTreeSet<RowId>>> {
172        let blob_key = format!("{INDEX_BLOB_TYPE_TANTIVY}-{column_id}");
173        let dir = self
174            .index_source
175            .dir(file_id, &blob_key, file_size_hint)
176            .await?;
177
178        let dir = match &dir {
179            Some(dir) => dir,
180            None => {
181                return Ok(None);
182            }
183        };
184
185        let config = Config::from_blob_metadata(dir.metadata()).context(ApplyFulltextIndexSnafu)?;
186        let path = dir.path();
187
188        let searcher =
189            TantivyFulltextIndexSearcher::new(path, config).context(ApplyFulltextIndexSnafu)?;
190        let mut row_ids: Option<BTreeSet<RowId>> = None;
191
192        // 1. Apply queries
193        for query in &request.queries {
194            let result = searcher
195                .search(&query.0)
196                .await
197                .context(ApplyFulltextIndexSnafu)?;
198
199            if let Some(ids) = row_ids.as_mut() {
200                ids.retain(|id| result.contains(id));
201            } else {
202                row_ids = Some(result);
203            }
204
205            if let Some(ids) = row_ids.as_ref() {
206                if ids.is_empty() {
207                    break;
208                }
209            }
210        }
211
212        // 2. Apply terms
213        let query = request.terms_as_query(config.case_sensitive);
214        if !query.0.is_empty() {
215            let result = searcher
216                .search(&query.0)
217                .await
218                .context(ApplyFulltextIndexSnafu)?;
219
220            if let Some(ids) = row_ids.as_mut() {
221                ids.retain(|id| result.contains(id));
222            } else {
223                row_ids = Some(result);
224            }
225        }
226
227        Ok(row_ids)
228    }
229}
230
231impl FulltextIndexApplier {
232    /// Applies coarse-grained fulltext index to the specified SST file.
233    /// Returns (row group id -> ranges) that match the queries.
234    ///
235    /// Row group id existing in the returned result means that the row group is searched.
236    /// Empty ranges means that the row group is searched but no rows are found.
237    pub async fn apply_coarse(
238        &self,
239        file_id: FileId,
240        file_size_hint: Option<u64>,
241        row_groups: impl Iterator<Item = (usize, bool)>,
242    ) -> Result<Option<Vec<(usize, Vec<Range<usize>>)>>> {
243        let timer = INDEX_APPLY_ELAPSED
244            .with_label_values(&[TYPE_FULLTEXT_INDEX])
245            .start_timer();
246
247        let (input, mut output) = Self::init_coarse_output(row_groups);
248        let mut applied = false;
249
250        for (column_id, request) in self.requests.iter() {
251            if request.terms.is_empty() {
252                // only apply terms
253                continue;
254            }
255
256            applied |= self
257                .apply_coarse_one_column(
258                    file_id,
259                    file_size_hint,
260                    *column_id,
261                    &request.terms,
262                    &mut output,
263                )
264                .await?;
265        }
266
267        if !applied {
268            timer.stop_and_discard();
269            return Ok(None);
270        }
271
272        Self::adjust_coarse_output(input, &mut output);
273        Ok(Some(output))
274    }
275
276    async fn apply_coarse_one_column(
277        &self,
278        file_id: FileId,
279        file_size_hint: Option<u64>,
280        column_id: ColumnId,
281        terms: &[FulltextTerm],
282        output: &mut [(usize, Vec<Range<usize>>)],
283    ) -> Result<bool> {
284        let blob_key = format!("{INDEX_BLOB_TYPE_BLOOM}-{column_id}");
285        let Some(reader) = self
286            .index_source
287            .blob(file_id, &blob_key, file_size_hint)
288            .await?
289        else {
290            return Ok(false);
291        };
292        let config =
293            Config::from_blob_metadata(reader.metadata()).context(ApplyFulltextIndexSnafu)?;
294
295        let predicates = Self::terms_to_predicates(terms, &config);
296        if predicates.is_empty() {
297            return Ok(false);
298        }
299
300        let range_reader = reader.reader().await.context(PuffinBuildReaderSnafu)?;
301        let reader = if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
302            let blob_size = range_reader
303                .metadata()
304                .await
305                .context(MetadataSnafu)?
306                .content_length;
307            let reader = CachedBloomFilterIndexBlobReader::new(
308                file_id,
309                column_id,
310                Tag::Fulltext,
311                blob_size,
312                BloomFilterReaderImpl::new(range_reader),
313                bloom_filter_cache.clone(),
314            );
315            Box::new(reader) as _
316        } else {
317            Box::new(BloomFilterReaderImpl::new(range_reader)) as _
318        };
319
320        let mut applier = BloomFilterApplier::new(reader)
321            .await
322            .context(ApplyBloomFilterIndexSnafu)?;
323        for (_, row_group_output) in output.iter_mut() {
324            // All rows are filtered out, skip the search
325            if row_group_output.is_empty() {
326                continue;
327            }
328
329            *row_group_output = applier
330                .search(&predicates, row_group_output)
331                .await
332                .context(ApplyBloomFilterIndexSnafu)?;
333        }
334
335        Ok(true)
336    }
337
338    /// Initializes the coarse output. Must call `adjust_coarse_output` after applying bloom filters.
339    ///
340    /// `row_groups` is a list of (row group length, whether to search).
341    ///
342    /// Returns (`input`, `output`):
343    /// * `input` is a list of (row group index to search, row group range based on start of the file).
344    /// * `output` is a list of (row group index to search, row group ranges based on start of the file).
345    #[allow(clippy::type_complexity)]
346    fn init_coarse_output(
347        row_groups: impl Iterator<Item = (usize, bool)>,
348    ) -> (Vec<(usize, Range<usize>)>, Vec<(usize, Vec<Range<usize>>)>) {
349        // Calculates row groups' ranges based on start of the file.
350        let mut input = Vec::with_capacity(row_groups.size_hint().0);
351        let mut start = 0;
352        for (i, (len, to_search)) in row_groups.enumerate() {
353            let end = start + len;
354            if to_search {
355                input.push((i, start..end));
356            }
357            start = end;
358        }
359
360        // Initializes output with input ranges, but ranges are based on start of the file not the row group,
361        // so we need to adjust them later.
362        let output = input
363            .iter()
364            .map(|(i, range)| (*i, vec![range.clone()]))
365            .collect::<Vec<_>>();
366
367        (input, output)
368    }
369
370    /// Adjusts the coarse output. Makes the output ranges based on row group start.
371    fn adjust_coarse_output(
372        input: Vec<(usize, Range<usize>)>,
373        output: &mut [(usize, Vec<Range<usize>>)],
374    ) {
375        // adjust ranges to be based on row group
376        for ((_, output), (_, input)) in output.iter_mut().zip(input) {
377            let start = input.start;
378            for range in output.iter_mut() {
379                range.start -= start;
380                range.end -= start;
381            }
382        }
383    }
384
385    /// Converts terms to predicates.
386    ///
387    /// Split terms by non-alphanumeric characters and convert them to lowercase if case-insensitive.
388    /// Multiple terms are combined with AND semantics.
389    fn terms_to_predicates(terms: &[FulltextTerm], config: &Config) -> Vec<InListPredicate> {
390        let mut probes = HashSet::new();
391        for term in terms {
392            if config.case_sensitive && term.col_lowered {
393                // lowercased terms are not indexed
394                continue;
395            }
396
397            let ts = term
398                .term
399                .split(|c: char| !c.is_alphanumeric())
400                .filter(|&t| !t.is_empty())
401                .map(|t| {
402                    if !config.case_sensitive {
403                        t.to_lowercase()
404                    } else {
405                        t.to_string()
406                    }
407                    .into_bytes()
408                });
409
410            probes.extend(ts);
411        }
412
413        probes
414            .into_iter()
415            .map(|p| InListPredicate {
416                list: iter::once(p).collect(),
417            })
418            .collect::<Vec<_>>()
419    }
420}
421
422/// The source of the index.
423struct IndexSource {
424    region_dir: String,
425    region_id: RegionId,
426
427    /// The puffin manager factory.
428    puffin_manager_factory: PuffinManagerFactory,
429
430    /// Store responsible for accessing remote index files.
431    remote_store: ObjectStore,
432
433    /// Local file cache.
434    file_cache: Option<FileCacheRef>,
435
436    /// The puffin metadata cache.
437    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
438}
439
440impl IndexSource {
441    fn new(
442        region_dir: String,
443        region_id: RegionId,
444        puffin_manager_factory: PuffinManagerFactory,
445        remote_store: ObjectStore,
446    ) -> Self {
447        Self {
448            region_dir,
449            region_id,
450            puffin_manager_factory,
451            remote_store,
452            file_cache: None,
453            puffin_metadata_cache: None,
454        }
455    }
456
457    fn set_file_cache(&mut self, file_cache: Option<FileCacheRef>) {
458        self.file_cache = file_cache;
459    }
460
461    fn set_puffin_metadata_cache(&mut self, puffin_metadata_cache: Option<PuffinMetadataCacheRef>) {
462        self.puffin_metadata_cache = puffin_metadata_cache;
463    }
464
465    /// Returns the blob with the specified key from local cache or remote store.
466    ///
467    /// Returns `None` if the blob is not found.
468    async fn blob(
469        &self,
470        file_id: FileId,
471        key: &str,
472        file_size_hint: Option<u64>,
473    ) -> Result<Option<GuardWithMetadata<SstPuffinBlob>>> {
474        let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
475        let res = reader.blob(key).await;
476        match res {
477            Ok(blob) => Ok(Some(blob)),
478            Err(err) if err.is_blob_not_found() => Ok(None),
479            Err(err) => {
480                if fallbacked {
481                    Err(err).context(PuffinReadBlobSnafu)
482                } else {
483                    warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
484                    let reader = self.build_remote(file_id, file_size_hint).await?;
485                    let res = reader.blob(key).await;
486                    match res {
487                        Ok(blob) => Ok(Some(blob)),
488                        Err(err) if err.is_blob_not_found() => Ok(None),
489                        Err(err) => Err(err).context(PuffinReadBlobSnafu),
490                    }
491                }
492            }
493        }
494    }
495
496    /// Returns the directory with the specified key from local cache or remote store.
497    ///
498    /// Returns `None` if the directory is not found.
499    async fn dir(
500        &self,
501        file_id: FileId,
502        key: &str,
503        file_size_hint: Option<u64>,
504    ) -> Result<Option<GuardWithMetadata<SstPuffinDir>>> {
505        let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
506        let res = reader.dir(key).await;
507        match res {
508            Ok(dir) => Ok(Some(dir)),
509            Err(err) if err.is_blob_not_found() => Ok(None),
510            Err(err) => {
511                if fallbacked {
512                    Err(err).context(PuffinReadBlobSnafu)
513                } else {
514                    warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
515                    let reader = self.build_remote(file_id, file_size_hint).await?;
516                    let res = reader.dir(key).await;
517                    match res {
518                        Ok(dir) => Ok(Some(dir)),
519                        Err(err) if err.is_blob_not_found() => Ok(None),
520                        Err(err) => Err(err).context(PuffinReadBlobSnafu),
521                    }
522                }
523            }
524        }
525    }
526
527    /// Return reader and whether it is fallbacked to remote store.
528    async fn ensure_reader(
529        &self,
530        file_id: FileId,
531        file_size_hint: Option<u64>,
532    ) -> Result<(SstPuffinReader, bool)> {
533        match self.build_local_cache(file_id, file_size_hint).await {
534            Ok(Some(r)) => Ok((r, false)),
535            Ok(None) => Ok((self.build_remote(file_id, file_size_hint).await?, true)),
536            Err(err) => Err(err),
537        }
538    }
539
540    async fn build_local_cache(
541        &self,
542        file_id: FileId,
543        file_size_hint: Option<u64>,
544    ) -> Result<Option<SstPuffinReader>> {
545        let Some(file_cache) = &self.file_cache else {
546            return Ok(None);
547        };
548
549        let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin);
550        if file_cache.get(index_key).await.is_none() {
551            return Ok(None);
552        };
553
554        let puffin_manager = self
555            .puffin_manager_factory
556            .build(
557                file_cache.local_store(),
558                WriteCachePathProvider::new(self.region_id, file_cache.clone()),
559            )
560            .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
561        let reader = puffin_manager
562            .reader(&file_id)
563            .await
564            .context(PuffinBuildReaderSnafu)?
565            .with_file_size_hint(file_size_hint);
566        Ok(Some(reader))
567    }
568
569    async fn build_remote(
570        &self,
571        file_id: FileId,
572        file_size_hint: Option<u64>,
573    ) -> Result<SstPuffinReader> {
574        let puffin_manager = self
575            .puffin_manager_factory
576            .build(
577                self.remote_store.clone(),
578                RegionFilePathFactory::new(self.region_dir.clone()),
579            )
580            .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
581
582        let reader = puffin_manager
583            .reader(&file_id)
584            .await
585            .context(PuffinBuildReaderSnafu)?
586            .with_file_size_hint(file_size_hint);
587
588        Ok(reader)
589    }
590}