mito2/sst/index/fulltext_index/
applier.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashMap, HashSet};
16use std::iter;
17use std::ops::Range;
18use std::sync::Arc;
19
20use common_base::range_read::RangeReader;
21use common_telemetry::warn;
22use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
23use index::bloom_filter::reader::BloomFilterReaderImpl;
24use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
25use index::fulltext_index::Config;
26use object_store::ObjectStore;
27use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
28use puffin::puffin_manager::{GuardWithMetadata, PuffinManager, PuffinReader};
29use snafu::ResultExt;
30use store_api::storage::{ColumnId, RegionId};
31
32use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
33use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
34use crate::cache::index::bloom_filter_index::{
35    BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
36};
37use crate::error::{
38    ApplyBloomFilterIndexSnafu, ApplyFulltextIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu,
39    PuffinReadBlobSnafu, Result,
40};
41use crate::metrics::INDEX_APPLY_ELAPSED;
42use crate::sst::file::FileId;
43use crate::sst::index::fulltext_index::applier::builder::{FulltextRequest, FulltextTerm};
44use crate::sst::index::fulltext_index::{INDEX_BLOB_TYPE_BLOOM, INDEX_BLOB_TYPE_TANTIVY};
45use crate::sst::index::puffin_manager::{
46    PuffinManagerFactory, SstPuffinBlob, SstPuffinDir, SstPuffinReader,
47};
48use crate::sst::index::TYPE_FULLTEXT_INDEX;
49
50pub mod builder;
51
52/// `FulltextIndexApplier` is responsible for applying fulltext index to the provided SST files
53pub struct FulltextIndexApplier {
54    /// Requests to be applied.
55    requests: HashMap<ColumnId, FulltextRequest>,
56
57    /// The source of the index.
58    index_source: IndexSource,
59
60    /// Cache for bloom filter index.
61    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
62}
63
64pub type FulltextIndexApplierRef = Arc<FulltextIndexApplier>;
65
66impl FulltextIndexApplier {
67    /// Creates a new `FulltextIndexApplier`.
68    pub fn new(
69        region_dir: String,
70        region_id: RegionId,
71        store: ObjectStore,
72        requests: HashMap<ColumnId, FulltextRequest>,
73        puffin_manager_factory: PuffinManagerFactory,
74    ) -> Self {
75        let index_source = IndexSource::new(region_dir, region_id, puffin_manager_factory, store);
76
77        Self {
78            requests,
79            index_source,
80            bloom_filter_index_cache: None,
81        }
82    }
83
84    /// Sets the file cache.
85    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
86        self.index_source.set_file_cache(file_cache);
87        self
88    }
89
90    /// Sets the puffin metadata cache.
91    pub fn with_puffin_metadata_cache(
92        mut self,
93        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
94    ) -> Self {
95        self.index_source
96            .set_puffin_metadata_cache(puffin_metadata_cache);
97        self
98    }
99
100    /// Sets the bloom filter cache.
101    pub fn with_bloom_filter_cache(
102        mut self,
103        bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
104    ) -> Self {
105        self.bloom_filter_index_cache = bloom_filter_index_cache;
106        self
107    }
108}
109
110impl FulltextIndexApplier {
111    /// Applies fine-grained fulltext index to the specified SST file.
112    /// Returns the row ids that match the queries.
113    pub async fn apply_fine(
114        &self,
115        file_id: FileId,
116        file_size_hint: Option<u64>,
117    ) -> Result<Option<BTreeSet<RowId>>> {
118        let timer = INDEX_APPLY_ELAPSED
119            .with_label_values(&[TYPE_FULLTEXT_INDEX])
120            .start_timer();
121
122        let mut row_ids: Option<BTreeSet<RowId>> = None;
123        for (column_id, request) in &self.requests {
124            if request.queries.is_empty() && request.terms.is_empty() {
125                continue;
126            }
127
128            let Some(result) = self
129                .apply_fine_one_column(file_size_hint, file_id, *column_id, request)
130                .await?
131            else {
132                continue;
133            };
134
135            if let Some(ids) = row_ids.as_mut() {
136                ids.retain(|id| result.contains(id));
137            } else {
138                row_ids = Some(result);
139            }
140
141            if let Some(ids) = row_ids.as_ref() {
142                if ids.is_empty() {
143                    break;
144                }
145            }
146        }
147
148        if row_ids.is_none() {
149            timer.stop_and_discard();
150        }
151        Ok(row_ids)
152    }
153
154    async fn apply_fine_one_column(
155        &self,
156        file_size_hint: Option<u64>,
157        file_id: FileId,
158        column_id: ColumnId,
159        request: &FulltextRequest,
160    ) -> Result<Option<BTreeSet<RowId>>> {
161        let blob_key = format!("{INDEX_BLOB_TYPE_TANTIVY}-{column_id}");
162        let dir = self
163            .index_source
164            .dir(file_id, &blob_key, file_size_hint)
165            .await?;
166
167        let dir = match &dir {
168            Some(dir) => dir,
169            None => {
170                return Ok(None);
171            }
172        };
173
174        let config = Config::from_blob_metadata(dir.metadata()).context(ApplyFulltextIndexSnafu)?;
175        let path = dir.path();
176
177        let searcher =
178            TantivyFulltextIndexSearcher::new(path, config).context(ApplyFulltextIndexSnafu)?;
179        let mut row_ids: Option<BTreeSet<RowId>> = None;
180
181        // 1. Apply queries
182        for query in &request.queries {
183            let result = searcher
184                .search(&query.0)
185                .await
186                .context(ApplyFulltextIndexSnafu)?;
187
188            if let Some(ids) = row_ids.as_mut() {
189                ids.retain(|id| result.contains(id));
190            } else {
191                row_ids = Some(result);
192            }
193
194            if let Some(ids) = row_ids.as_ref() {
195                if ids.is_empty() {
196                    break;
197                }
198            }
199        }
200
201        // 2. Apply terms
202        let query = request.terms_as_query(config.case_sensitive);
203        if !query.0.is_empty() {
204            let result = searcher
205                .search(&query.0)
206                .await
207                .context(ApplyFulltextIndexSnafu)?;
208
209            if let Some(ids) = row_ids.as_mut() {
210                ids.retain(|id| result.contains(id));
211            } else {
212                row_ids = Some(result);
213            }
214        }
215
216        Ok(row_ids)
217    }
218}
219
220impl FulltextIndexApplier {
221    /// Applies coarse-grained fulltext index to the specified SST file.
222    /// Returns (row group id -> ranges) that match the queries.
223    pub async fn apply_coarse(
224        &self,
225        file_id: FileId,
226        file_size_hint: Option<u64>,
227        row_groups: impl Iterator<Item = (usize, bool)>,
228    ) -> Result<Option<Vec<(usize, Vec<Range<usize>>)>>> {
229        let timer = INDEX_APPLY_ELAPSED
230            .with_label_values(&[TYPE_FULLTEXT_INDEX])
231            .start_timer();
232
233        let (input, mut output) = Self::init_coarse_output(row_groups);
234        let mut applied = false;
235
236        for (column_id, request) in &self.requests {
237            if request.terms.is_empty() {
238                // only apply terms
239                continue;
240            }
241
242            applied |= self
243                .apply_coarse_one_column(
244                    file_id,
245                    file_size_hint,
246                    *column_id,
247                    &request.terms,
248                    &mut output,
249                )
250                .await?;
251        }
252
253        if !applied {
254            timer.stop_and_discard();
255            return Ok(None);
256        }
257
258        Self::adjust_coarse_output(input, &mut output);
259        Ok(Some(output))
260    }
261
262    async fn apply_coarse_one_column(
263        &self,
264        file_id: FileId,
265        file_size_hint: Option<u64>,
266        column_id: ColumnId,
267        terms: &[FulltextTerm],
268        output: &mut [(usize, Vec<Range<usize>>)],
269    ) -> Result<bool> {
270        let blob_key = format!("{INDEX_BLOB_TYPE_BLOOM}-{column_id}");
271        let Some(reader) = self
272            .index_source
273            .blob(file_id, &blob_key, file_size_hint)
274            .await?
275        else {
276            return Ok(false);
277        };
278        let config =
279            Config::from_blob_metadata(reader.metadata()).context(ApplyFulltextIndexSnafu)?;
280
281        let predicates = Self::terms_to_predicates(terms, &config);
282        if predicates.is_empty() {
283            return Ok(false);
284        }
285
286        let range_reader = reader.reader().await.context(PuffinBuildReaderSnafu)?;
287        let reader = if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
288            let blob_size = range_reader
289                .metadata()
290                .await
291                .context(MetadataSnafu)?
292                .content_length;
293            let reader = CachedBloomFilterIndexBlobReader::new(
294                file_id,
295                column_id,
296                Tag::Fulltext,
297                blob_size,
298                BloomFilterReaderImpl::new(range_reader),
299                bloom_filter_cache.clone(),
300            );
301            Box::new(reader) as _
302        } else {
303            Box::new(BloomFilterReaderImpl::new(range_reader)) as _
304        };
305
306        let mut applier = BloomFilterApplier::new(reader)
307            .await
308            .context(ApplyBloomFilterIndexSnafu)?;
309        for (_, row_group_output) in output.iter_mut() {
310            // All rows are filtered out, skip the search
311            if row_group_output.is_empty() {
312                continue;
313            }
314
315            *row_group_output = applier
316                .search(&predicates, row_group_output)
317                .await
318                .context(ApplyBloomFilterIndexSnafu)?;
319        }
320
321        Ok(true)
322    }
323
324    /// Initializes the coarse output. Must call `adjust_coarse_output` after applying bloom filters.
325    ///
326    /// `row_groups` is a list of (row group length, whether to search).
327    ///
328    /// Returns (`input`, `output`):
329    /// * `input` is a list of (row group index to search, row group range based on start of the file).
330    /// * `output` is a list of (row group index to search, row group ranges based on start of the file).
331    #[allow(clippy::type_complexity)]
332    fn init_coarse_output(
333        row_groups: impl Iterator<Item = (usize, bool)>,
334    ) -> (Vec<(usize, Range<usize>)>, Vec<(usize, Vec<Range<usize>>)>) {
335        // Calculates row groups' ranges based on start of the file.
336        let mut input = Vec::with_capacity(row_groups.size_hint().0);
337        let mut start = 0;
338        for (i, (len, to_search)) in row_groups.enumerate() {
339            let end = start + len;
340            if to_search {
341                input.push((i, start..end));
342            }
343            start = end;
344        }
345
346        // Initializes output with input ranges, but ranges are based on start of the file not the row group,
347        // so we need to adjust them later.
348        let output = input
349            .iter()
350            .map(|(i, range)| (*i, vec![range.clone()]))
351            .collect::<Vec<_>>();
352
353        (input, output)
354    }
355
356    /// Adjusts the coarse output. Makes the output ranges based on row group start.
357    fn adjust_coarse_output(
358        input: Vec<(usize, Range<usize>)>,
359        output: &mut Vec<(usize, Vec<Range<usize>>)>,
360    ) {
361        // adjust ranges to be based on row group
362        for ((_, output), (_, input)) in output.iter_mut().zip(input) {
363            let start = input.start;
364            for range in output.iter_mut() {
365                range.start -= start;
366                range.end -= start;
367            }
368        }
369        output.retain(|(_, ranges)| !ranges.is_empty());
370    }
371
372    /// Converts terms to predicates.
373    ///
374    /// Split terms by non-alphanumeric characters and convert them to lowercase if case-insensitive.
375    /// Multiple terms are combined with AND semantics.
376    fn terms_to_predicates(terms: &[FulltextTerm], config: &Config) -> Vec<InListPredicate> {
377        let mut probes = HashSet::new();
378        for term in terms {
379            if config.case_sensitive && term.col_lowered {
380                // lowercased terms are not indexed
381                continue;
382            }
383
384            let ts = term
385                .term
386                .split(|c: char| !c.is_alphanumeric())
387                .filter(|&t| !t.is_empty())
388                .map(|t| {
389                    if !config.case_sensitive {
390                        t.to_lowercase()
391                    } else {
392                        t.to_string()
393                    }
394                    .into_bytes()
395                });
396
397            probes.extend(ts);
398        }
399
400        probes
401            .into_iter()
402            .map(|p| InListPredicate {
403                list: iter::once(p).collect(),
404            })
405            .collect::<Vec<_>>()
406    }
407}
408
409/// The source of the index.
410struct IndexSource {
411    region_dir: String,
412    region_id: RegionId,
413
414    /// The puffin manager factory.
415    puffin_manager_factory: PuffinManagerFactory,
416
417    /// Store responsible for accessing remote index files.
418    remote_store: ObjectStore,
419
420    /// Local file cache.
421    file_cache: Option<FileCacheRef>,
422
423    /// The puffin metadata cache.
424    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
425}
426
427impl IndexSource {
428    fn new(
429        region_dir: String,
430        region_id: RegionId,
431        puffin_manager_factory: PuffinManagerFactory,
432        remote_store: ObjectStore,
433    ) -> Self {
434        Self {
435            region_dir,
436            region_id,
437            puffin_manager_factory,
438            remote_store,
439            file_cache: None,
440            puffin_metadata_cache: None,
441        }
442    }
443
444    fn set_file_cache(&mut self, file_cache: Option<FileCacheRef>) {
445        self.file_cache = file_cache;
446    }
447
448    fn set_puffin_metadata_cache(&mut self, puffin_metadata_cache: Option<PuffinMetadataCacheRef>) {
449        self.puffin_metadata_cache = puffin_metadata_cache;
450    }
451
452    /// Returns the blob with the specified key from local cache or remote store.
453    ///
454    /// Returns `None` if the blob is not found.
455    #[allow(unused)]
456    async fn blob(
457        &self,
458        file_id: FileId,
459        key: &str,
460        file_size_hint: Option<u64>,
461    ) -> Result<Option<GuardWithMetadata<SstPuffinBlob>>> {
462        let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
463        let res = reader.blob(key).await;
464        match res {
465            Ok(blob) => Ok(Some(blob)),
466            Err(err) if err.is_blob_not_found() => Ok(None),
467            Err(err) => {
468                if fallbacked {
469                    Err(err).context(PuffinReadBlobSnafu)
470                } else {
471                    warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
472                    let reader = self.build_remote(file_id, file_size_hint).await?;
473                    let res = reader.blob(key).await;
474                    match res {
475                        Ok(blob) => Ok(Some(blob)),
476                        Err(err) if err.is_blob_not_found() => Ok(None),
477                        Err(err) => Err(err).context(PuffinReadBlobSnafu),
478                    }
479                }
480            }
481        }
482    }
483
484    /// Returns the directory with the specified key from local cache or remote store.
485    ///
486    /// Returns `None` if the directory is not found.
487    async fn dir(
488        &self,
489        file_id: FileId,
490        key: &str,
491        file_size_hint: Option<u64>,
492    ) -> Result<Option<GuardWithMetadata<SstPuffinDir>>> {
493        let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
494        let res = reader.dir(key).await;
495        match res {
496            Ok(dir) => Ok(Some(dir)),
497            Err(err) if err.is_blob_not_found() => Ok(None),
498            Err(err) => {
499                if fallbacked {
500                    Err(err).context(PuffinReadBlobSnafu)
501                } else {
502                    warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
503                    let reader = self.build_remote(file_id, file_size_hint).await?;
504                    let res = reader.dir(key).await;
505                    match res {
506                        Ok(dir) => Ok(Some(dir)),
507                        Err(err) if err.is_blob_not_found() => Ok(None),
508                        Err(err) => Err(err).context(PuffinReadBlobSnafu),
509                    }
510                }
511            }
512        }
513    }
514
515    /// Return reader and whether it is fallbacked to remote store.
516    async fn ensure_reader(
517        &self,
518        file_id: FileId,
519        file_size_hint: Option<u64>,
520    ) -> Result<(SstPuffinReader, bool)> {
521        match self.build_local_cache(file_id, file_size_hint).await {
522            Ok(Some(r)) => Ok((r, false)),
523            Ok(None) => Ok((self.build_remote(file_id, file_size_hint).await?, true)),
524            Err(err) => Err(err),
525        }
526    }
527
528    async fn build_local_cache(
529        &self,
530        file_id: FileId,
531        file_size_hint: Option<u64>,
532    ) -> Result<Option<SstPuffinReader>> {
533        let Some(file_cache) = &self.file_cache else {
534            return Ok(None);
535        };
536
537        let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin);
538        if file_cache.get(index_key).await.is_none() {
539            return Ok(None);
540        };
541
542        let puffin_manager = self
543            .puffin_manager_factory
544            .build(
545                file_cache.local_store(),
546                WriteCachePathProvider::new(self.region_id, file_cache.clone()),
547            )
548            .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
549        let reader = puffin_manager
550            .reader(&file_id)
551            .await
552            .context(PuffinBuildReaderSnafu)?
553            .with_file_size_hint(file_size_hint);
554        Ok(Some(reader))
555    }
556
557    async fn build_remote(
558        &self,
559        file_id: FileId,
560        file_size_hint: Option<u64>,
561    ) -> Result<SstPuffinReader> {
562        let puffin_manager = self
563            .puffin_manager_factory
564            .build(
565                self.remote_store.clone(),
566                RegionFilePathFactory::new(self.region_dir.clone()),
567            )
568            .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
569
570        let reader = puffin_manager
571            .reader(&file_id)
572            .await
573            .context(PuffinBuildReaderSnafu)?
574            .with_file_size_hint(file_size_hint);
575
576        Ok(reader)
577    }
578}