mito2/sst/index/fulltext_index/
applier.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashSet};
16use std::iter;
17use std::ops::Range;
18use std::sync::Arc;
19
20use common_base::range_read::RangeReader;
21use common_telemetry::warn;
22use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
23use index::bloom_filter::reader::BloomFilterReaderImpl;
24use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
25use index::fulltext_index::Config;
26use object_store::ObjectStore;
27use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
28use puffin::puffin_manager::{GuardWithMetadata, PuffinManager, PuffinReader};
29use snafu::ResultExt;
30use store_api::storage::{ColumnId, RegionId};
31
32use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
33use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
34use crate::cache::index::bloom_filter_index::{
35    BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
36};
37use crate::cache::index::result_cache::PredicateKey;
38use crate::error::{
39    ApplyBloomFilterIndexSnafu, ApplyFulltextIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu,
40    PuffinReadBlobSnafu, Result,
41};
42use crate::metrics::INDEX_APPLY_ELAPSED;
43use crate::sst::file::FileId;
44use crate::sst::index::fulltext_index::applier::builder::{FulltextRequest, FulltextTerm};
45use crate::sst::index::fulltext_index::{INDEX_BLOB_TYPE_BLOOM, INDEX_BLOB_TYPE_TANTIVY};
46use crate::sst::index::puffin_manager::{
47    PuffinManagerFactory, SstPuffinBlob, SstPuffinDir, SstPuffinReader,
48};
49use crate::sst::index::TYPE_FULLTEXT_INDEX;
50
51pub mod builder;
52
53/// `FulltextIndexApplier` is responsible for applying fulltext index to the provided SST files
54pub struct FulltextIndexApplier {
55    /// Requests to be applied.
56    requests: Arc<BTreeMap<ColumnId, FulltextRequest>>,
57
58    /// The source of the index.
59    index_source: IndexSource,
60
61    /// Cache for bloom filter index.
62    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
63
64    /// Predicate key. Used to identify the predicate and fetch result from cache.
65    predicate_key: PredicateKey,
66}
67
68pub type FulltextIndexApplierRef = Arc<FulltextIndexApplier>;
69
70impl FulltextIndexApplier {
71    /// Creates a new `FulltextIndexApplier`.
72    pub fn new(
73        region_dir: String,
74        region_id: RegionId,
75        store: ObjectStore,
76        requests: BTreeMap<ColumnId, FulltextRequest>,
77        puffin_manager_factory: PuffinManagerFactory,
78    ) -> Self {
79        let requests = Arc::new(requests);
80        let index_source = IndexSource::new(region_dir, region_id, puffin_manager_factory, store);
81
82        Self {
83            predicate_key: PredicateKey::new_fulltext(requests.clone()),
84            requests,
85            index_source,
86            bloom_filter_index_cache: None,
87        }
88    }
89
90    /// Sets the file cache.
91    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
92        self.index_source.set_file_cache(file_cache);
93        self
94    }
95
96    /// Sets the puffin metadata cache.
97    pub fn with_puffin_metadata_cache(
98        mut self,
99        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
100    ) -> Self {
101        self.index_source
102            .set_puffin_metadata_cache(puffin_metadata_cache);
103        self
104    }
105
106    /// Sets the bloom filter cache.
107    pub fn with_bloom_filter_cache(
108        mut self,
109        bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
110    ) -> Self {
111        self.bloom_filter_index_cache = bloom_filter_index_cache;
112        self
113    }
114
115    /// Returns the predicate key.
116    pub fn predicate_key(&self) -> &PredicateKey {
117        &self.predicate_key
118    }
119}
120
121impl FulltextIndexApplier {
122    /// Applies fine-grained fulltext index to the specified SST file.
123    /// Returns the row ids that match the queries.
124    pub async fn apply_fine(
125        &self,
126        file_id: FileId,
127        file_size_hint: Option<u64>,
128    ) -> Result<Option<BTreeSet<RowId>>> {
129        let timer = INDEX_APPLY_ELAPSED
130            .with_label_values(&[TYPE_FULLTEXT_INDEX])
131            .start_timer();
132
133        let mut row_ids: Option<BTreeSet<RowId>> = None;
134        for (column_id, request) in self.requests.iter() {
135            if request.queries.is_empty() && request.terms.is_empty() {
136                continue;
137            }
138
139            let Some(result) = self
140                .apply_fine_one_column(file_size_hint, file_id, *column_id, request)
141                .await?
142            else {
143                continue;
144            };
145
146            if let Some(ids) = row_ids.as_mut() {
147                ids.retain(|id| result.contains(id));
148            } else {
149                row_ids = Some(result);
150            }
151
152            if let Some(ids) = row_ids.as_ref() {
153                if ids.is_empty() {
154                    break;
155                }
156            }
157        }
158
159        if row_ids.is_none() {
160            timer.stop_and_discard();
161        }
162        Ok(row_ids)
163    }
164
165    async fn apply_fine_one_column(
166        &self,
167        file_size_hint: Option<u64>,
168        file_id: FileId,
169        column_id: ColumnId,
170        request: &FulltextRequest,
171    ) -> Result<Option<BTreeSet<RowId>>> {
172        let blob_key = format!("{INDEX_BLOB_TYPE_TANTIVY}-{column_id}");
173        let dir = self
174            .index_source
175            .dir(file_id, &blob_key, file_size_hint)
176            .await?;
177
178        let dir = match &dir {
179            Some(dir) => dir,
180            None => {
181                return Ok(None);
182            }
183        };
184
185        let config = Config::from_blob_metadata(dir.metadata()).context(ApplyFulltextIndexSnafu)?;
186        let path = dir.path();
187
188        let searcher =
189            TantivyFulltextIndexSearcher::new(path, config).context(ApplyFulltextIndexSnafu)?;
190        let mut row_ids: Option<BTreeSet<RowId>> = None;
191
192        // 1. Apply queries
193        for query in &request.queries {
194            let result = searcher
195                .search(&query.0)
196                .await
197                .context(ApplyFulltextIndexSnafu)?;
198
199            if let Some(ids) = row_ids.as_mut() {
200                ids.retain(|id| result.contains(id));
201            } else {
202                row_ids = Some(result);
203            }
204
205            if let Some(ids) = row_ids.as_ref() {
206                if ids.is_empty() {
207                    break;
208                }
209            }
210        }
211
212        // 2. Apply terms
213        let query = request.terms_as_query(config.case_sensitive);
214        if !query.0.is_empty() {
215            let result = searcher
216                .search(&query.0)
217                .await
218                .context(ApplyFulltextIndexSnafu)?;
219
220            if let Some(ids) = row_ids.as_mut() {
221                ids.retain(|id| result.contains(id));
222            } else {
223                row_ids = Some(result);
224            }
225        }
226
227        Ok(row_ids)
228    }
229}
230
231impl FulltextIndexApplier {
232    /// Applies coarse-grained fulltext index to the specified SST file.
233    /// Returns (row group id -> ranges) that match the queries.
234    pub async fn apply_coarse(
235        &self,
236        file_id: FileId,
237        file_size_hint: Option<u64>,
238        row_groups: impl Iterator<Item = (usize, bool)>,
239    ) -> Result<Option<Vec<(usize, Vec<Range<usize>>)>>> {
240        let timer = INDEX_APPLY_ELAPSED
241            .with_label_values(&[TYPE_FULLTEXT_INDEX])
242            .start_timer();
243
244        let (input, mut output) = Self::init_coarse_output(row_groups);
245        let mut applied = false;
246
247        for (column_id, request) in self.requests.iter() {
248            if request.terms.is_empty() {
249                // only apply terms
250                continue;
251            }
252
253            applied |= self
254                .apply_coarse_one_column(
255                    file_id,
256                    file_size_hint,
257                    *column_id,
258                    &request.terms,
259                    &mut output,
260                )
261                .await?;
262        }
263
264        if !applied {
265            timer.stop_and_discard();
266            return Ok(None);
267        }
268
269        Self::adjust_coarse_output(input, &mut output);
270        Ok(Some(output))
271    }
272
273    async fn apply_coarse_one_column(
274        &self,
275        file_id: FileId,
276        file_size_hint: Option<u64>,
277        column_id: ColumnId,
278        terms: &[FulltextTerm],
279        output: &mut [(usize, Vec<Range<usize>>)],
280    ) -> Result<bool> {
281        let blob_key = format!("{INDEX_BLOB_TYPE_BLOOM}-{column_id}");
282        let Some(reader) = self
283            .index_source
284            .blob(file_id, &blob_key, file_size_hint)
285            .await?
286        else {
287            return Ok(false);
288        };
289        let config =
290            Config::from_blob_metadata(reader.metadata()).context(ApplyFulltextIndexSnafu)?;
291
292        let predicates = Self::terms_to_predicates(terms, &config);
293        if predicates.is_empty() {
294            return Ok(false);
295        }
296
297        let range_reader = reader.reader().await.context(PuffinBuildReaderSnafu)?;
298        let reader = if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
299            let blob_size = range_reader
300                .metadata()
301                .await
302                .context(MetadataSnafu)?
303                .content_length;
304            let reader = CachedBloomFilterIndexBlobReader::new(
305                file_id,
306                column_id,
307                Tag::Fulltext,
308                blob_size,
309                BloomFilterReaderImpl::new(range_reader),
310                bloom_filter_cache.clone(),
311            );
312            Box::new(reader) as _
313        } else {
314            Box::new(BloomFilterReaderImpl::new(range_reader)) as _
315        };
316
317        let mut applier = BloomFilterApplier::new(reader)
318            .await
319            .context(ApplyBloomFilterIndexSnafu)?;
320        for (_, row_group_output) in output.iter_mut() {
321            // All rows are filtered out, skip the search
322            if row_group_output.is_empty() {
323                continue;
324            }
325
326            *row_group_output = applier
327                .search(&predicates, row_group_output)
328                .await
329                .context(ApplyBloomFilterIndexSnafu)?;
330        }
331
332        Ok(true)
333    }
334
335    /// Initializes the coarse output. Must call `adjust_coarse_output` after applying bloom filters.
336    ///
337    /// `row_groups` is a list of (row group length, whether to search).
338    ///
339    /// Returns (`input`, `output`):
340    /// * `input` is a list of (row group index to search, row group range based on start of the file).
341    /// * `output` is a list of (row group index to search, row group ranges based on start of the file).
342    #[allow(clippy::type_complexity)]
343    fn init_coarse_output(
344        row_groups: impl Iterator<Item = (usize, bool)>,
345    ) -> (Vec<(usize, Range<usize>)>, Vec<(usize, Vec<Range<usize>>)>) {
346        // Calculates row groups' ranges based on start of the file.
347        let mut input = Vec::with_capacity(row_groups.size_hint().0);
348        let mut start = 0;
349        for (i, (len, to_search)) in row_groups.enumerate() {
350            let end = start + len;
351            if to_search {
352                input.push((i, start..end));
353            }
354            start = end;
355        }
356
357        // Initializes output with input ranges, but ranges are based on start of the file not the row group,
358        // so we need to adjust them later.
359        let output = input
360            .iter()
361            .map(|(i, range)| (*i, vec![range.clone()]))
362            .collect::<Vec<_>>();
363
364        (input, output)
365    }
366
367    /// Adjusts the coarse output. Makes the output ranges based on row group start.
368    fn adjust_coarse_output(
369        input: Vec<(usize, Range<usize>)>,
370        output: &mut Vec<(usize, Vec<Range<usize>>)>,
371    ) {
372        // adjust ranges to be based on row group
373        for ((_, output), (_, input)) in output.iter_mut().zip(input) {
374            let start = input.start;
375            for range in output.iter_mut() {
376                range.start -= start;
377                range.end -= start;
378            }
379        }
380        output.retain(|(_, ranges)| !ranges.is_empty());
381    }
382
383    /// Converts terms to predicates.
384    ///
385    /// Split terms by non-alphanumeric characters and convert them to lowercase if case-insensitive.
386    /// Multiple terms are combined with AND semantics.
387    fn terms_to_predicates(terms: &[FulltextTerm], config: &Config) -> Vec<InListPredicate> {
388        let mut probes = HashSet::new();
389        for term in terms {
390            if config.case_sensitive && term.col_lowered {
391                // lowercased terms are not indexed
392                continue;
393            }
394
395            let ts = term
396                .term
397                .split(|c: char| !c.is_alphanumeric())
398                .filter(|&t| !t.is_empty())
399                .map(|t| {
400                    if !config.case_sensitive {
401                        t.to_lowercase()
402                    } else {
403                        t.to_string()
404                    }
405                    .into_bytes()
406                });
407
408            probes.extend(ts);
409        }
410
411        probes
412            .into_iter()
413            .map(|p| InListPredicate {
414                list: iter::once(p).collect(),
415            })
416            .collect::<Vec<_>>()
417    }
418}
419
420/// The source of the index.
421struct IndexSource {
422    region_dir: String,
423    region_id: RegionId,
424
425    /// The puffin manager factory.
426    puffin_manager_factory: PuffinManagerFactory,
427
428    /// Store responsible for accessing remote index files.
429    remote_store: ObjectStore,
430
431    /// Local file cache.
432    file_cache: Option<FileCacheRef>,
433
434    /// The puffin metadata cache.
435    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
436}
437
438impl IndexSource {
439    fn new(
440        region_dir: String,
441        region_id: RegionId,
442        puffin_manager_factory: PuffinManagerFactory,
443        remote_store: ObjectStore,
444    ) -> Self {
445        Self {
446            region_dir,
447            region_id,
448            puffin_manager_factory,
449            remote_store,
450            file_cache: None,
451            puffin_metadata_cache: None,
452        }
453    }
454
455    fn set_file_cache(&mut self, file_cache: Option<FileCacheRef>) {
456        self.file_cache = file_cache;
457    }
458
459    fn set_puffin_metadata_cache(&mut self, puffin_metadata_cache: Option<PuffinMetadataCacheRef>) {
460        self.puffin_metadata_cache = puffin_metadata_cache;
461    }
462
463    /// Returns the blob with the specified key from local cache or remote store.
464    ///
465    /// Returns `None` if the blob is not found.
466    #[allow(unused)]
467    async fn blob(
468        &self,
469        file_id: FileId,
470        key: &str,
471        file_size_hint: Option<u64>,
472    ) -> Result<Option<GuardWithMetadata<SstPuffinBlob>>> {
473        let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
474        let res = reader.blob(key).await;
475        match res {
476            Ok(blob) => Ok(Some(blob)),
477            Err(err) if err.is_blob_not_found() => Ok(None),
478            Err(err) => {
479                if fallbacked {
480                    Err(err).context(PuffinReadBlobSnafu)
481                } else {
482                    warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
483                    let reader = self.build_remote(file_id, file_size_hint).await?;
484                    let res = reader.blob(key).await;
485                    match res {
486                        Ok(blob) => Ok(Some(blob)),
487                        Err(err) if err.is_blob_not_found() => Ok(None),
488                        Err(err) => Err(err).context(PuffinReadBlobSnafu),
489                    }
490                }
491            }
492        }
493    }
494
495    /// Returns the directory with the specified key from local cache or remote store.
496    ///
497    /// Returns `None` if the directory is not found.
498    async fn dir(
499        &self,
500        file_id: FileId,
501        key: &str,
502        file_size_hint: Option<u64>,
503    ) -> Result<Option<GuardWithMetadata<SstPuffinDir>>> {
504        let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
505        let res = reader.dir(key).await;
506        match res {
507            Ok(dir) => Ok(Some(dir)),
508            Err(err) if err.is_blob_not_found() => Ok(None),
509            Err(err) => {
510                if fallbacked {
511                    Err(err).context(PuffinReadBlobSnafu)
512                } else {
513                    warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
514                    let reader = self.build_remote(file_id, file_size_hint).await?;
515                    let res = reader.dir(key).await;
516                    match res {
517                        Ok(dir) => Ok(Some(dir)),
518                        Err(err) if err.is_blob_not_found() => Ok(None),
519                        Err(err) => Err(err).context(PuffinReadBlobSnafu),
520                    }
521                }
522            }
523        }
524    }
525
526    /// Return reader and whether it is fallbacked to remote store.
527    async fn ensure_reader(
528        &self,
529        file_id: FileId,
530        file_size_hint: Option<u64>,
531    ) -> Result<(SstPuffinReader, bool)> {
532        match self.build_local_cache(file_id, file_size_hint).await {
533            Ok(Some(r)) => Ok((r, false)),
534            Ok(None) => Ok((self.build_remote(file_id, file_size_hint).await?, true)),
535            Err(err) => Err(err),
536        }
537    }
538
539    async fn build_local_cache(
540        &self,
541        file_id: FileId,
542        file_size_hint: Option<u64>,
543    ) -> Result<Option<SstPuffinReader>> {
544        let Some(file_cache) = &self.file_cache else {
545            return Ok(None);
546        };
547
548        let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin);
549        if file_cache.get(index_key).await.is_none() {
550            return Ok(None);
551        };
552
553        let puffin_manager = self
554            .puffin_manager_factory
555            .build(
556                file_cache.local_store(),
557                WriteCachePathProvider::new(self.region_id, file_cache.clone()),
558            )
559            .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
560        let reader = puffin_manager
561            .reader(&file_id)
562            .await
563            .context(PuffinBuildReaderSnafu)?
564            .with_file_size_hint(file_size_hint);
565        Ok(Some(reader))
566    }
567
568    async fn build_remote(
569        &self,
570        file_id: FileId,
571        file_size_hint: Option<u64>,
572    ) -> Result<SstPuffinReader> {
573        let puffin_manager = self
574            .puffin_manager_factory
575            .build(
576                self.remote_store.clone(),
577                RegionFilePathFactory::new(self.region_dir.clone()),
578            )
579            .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
580
581        let reader = puffin_manager
582            .reader(&file_id)
583            .await
584            .context(PuffinBuildReaderSnafu)?
585            .with_file_size_hint(file_size_hint);
586
587        Ok(reader)
588    }
589}