mito2/sst/index/fulltext_index/
applier.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashSet};
16use std::iter;
17use std::ops::Range;
18use std::sync::Arc;
19use std::time::Instant;
20
21use common_base::range_read::RangeReader;
22use common_telemetry::warn;
23use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
24use index::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReaderImpl};
25use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
26use index::fulltext_index::tokenizer::{ChineseTokenizer, EnglishTokenizer, Tokenizer};
27use index::fulltext_index::{Analyzer, Config};
28use index::target::IndexTarget;
29use object_store::ObjectStore;
30use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
31use puffin::puffin_manager::{GuardWithMetadata, PuffinManager, PuffinReader};
32use snafu::ResultExt;
33use store_api::region_request::PathType;
34use store_api::storage::ColumnId;
35
36use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
37use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
38use crate::cache::index::bloom_filter_index::{
39    BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
40};
41use crate::cache::index::result_cache::PredicateKey;
42use crate::error::{
43    ApplyBloomFilterIndexSnafu, ApplyFulltextIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu,
44    PuffinReadBlobSnafu, Result,
45};
46use crate::metrics::INDEX_APPLY_ELAPSED;
47use crate::sst::file::RegionIndexId;
48use crate::sst::index::TYPE_FULLTEXT_INDEX;
49use crate::sst::index::fulltext_index::applier::builder::{FulltextRequest, FulltextTerm};
50use crate::sst::index::fulltext_index::{INDEX_BLOB_TYPE_BLOOM, INDEX_BLOB_TYPE_TANTIVY};
51use crate::sst::index::puffin_manager::{
52    PuffinManagerFactory, SstPuffinBlob, SstPuffinDir, SstPuffinReader,
53};
54
55pub mod builder;
56
57/// Metrics for tracking fulltext index apply operations.
58#[derive(Default, Clone)]
59pub struct FulltextIndexApplyMetrics {
60    /// Total time spent applying the index.
61    pub apply_elapsed: std::time::Duration,
62    /// Number of blob cache misses.
63    pub blob_cache_miss: usize,
64    /// Number of directory cache hits.
65    pub dir_cache_hit: usize,
66    /// Number of directory cache misses.
67    pub dir_cache_miss: usize,
68    /// Elapsed time to initialize directory data.
69    pub dir_init_elapsed: std::time::Duration,
70    /// Metrics for bloom filter reads.
71    pub bloom_filter_read_metrics: BloomFilterReadMetrics,
72}
73
74impl std::fmt::Debug for FulltextIndexApplyMetrics {
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        let Self {
77            apply_elapsed,
78            blob_cache_miss,
79            dir_cache_hit,
80            dir_cache_miss,
81            dir_init_elapsed,
82            bloom_filter_read_metrics,
83        } = self;
84
85        if self.is_empty() {
86            return write!(f, "{{}}");
87        }
88        write!(f, "{{")?;
89
90        write!(f, "\"apply_elapsed\":\"{:?}\"", apply_elapsed)?;
91
92        if *blob_cache_miss > 0 {
93            write!(f, ", \"blob_cache_miss\":{}", blob_cache_miss)?;
94        }
95        if *dir_cache_hit > 0 {
96            write!(f, ", \"dir_cache_hit\":{}", dir_cache_hit)?;
97        }
98        if *dir_cache_miss > 0 {
99            write!(f, ", \"dir_cache_miss\":{}", dir_cache_miss)?;
100        }
101        if !dir_init_elapsed.is_zero() {
102            write!(f, ", \"dir_init_elapsed\":\"{:?}\"", dir_init_elapsed)?;
103        }
104        write!(
105            f,
106            ", \"bloom_filter_read_metrics\":{:?}",
107            bloom_filter_read_metrics
108        )?;
109
110        write!(f, "}}")
111    }
112}
113
114impl FulltextIndexApplyMetrics {
115    /// Returns true if the metrics are empty (contain no meaningful data).
116    pub fn is_empty(&self) -> bool {
117        self.apply_elapsed.is_zero()
118    }
119
120    /// Collects metrics from a directory read operation.
121    pub fn collect_dir_metrics(
122        &mut self,
123        elapsed: std::time::Duration,
124        dir_metrics: puffin::puffin_manager::DirMetrics,
125    ) {
126        self.dir_init_elapsed += elapsed;
127        if dir_metrics.cache_hit {
128            self.dir_cache_hit += 1;
129        } else {
130            self.dir_cache_miss += 1;
131        }
132    }
133
134    /// Merges another metrics into this one.
135    pub fn merge_from(&mut self, other: &Self) {
136        self.apply_elapsed += other.apply_elapsed;
137        self.blob_cache_miss += other.blob_cache_miss;
138        self.dir_cache_hit += other.dir_cache_hit;
139        self.dir_cache_miss += other.dir_cache_miss;
140        self.dir_init_elapsed += other.dir_init_elapsed;
141        self.bloom_filter_read_metrics
142            .merge_from(&other.bloom_filter_read_metrics);
143    }
144}
145
146/// `FulltextIndexApplier` is responsible for applying fulltext index to the provided SST files
147pub struct FulltextIndexApplier {
148    /// Requests to be applied.
149    requests: Arc<BTreeMap<ColumnId, FulltextRequest>>,
150
151    /// The source of the index.
152    index_source: IndexSource,
153
154    /// Cache for bloom filter index.
155    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
156
157    /// Predicate key. Used to identify the predicate and fetch result from cache.
158    predicate_key: PredicateKey,
159}
160
161pub type FulltextIndexApplierRef = Arc<FulltextIndexApplier>;
162
163impl FulltextIndexApplier {
164    /// Creates a new `FulltextIndexApplier`.
165    pub fn new(
166        table_dir: String,
167        path_type: PathType,
168        store: ObjectStore,
169        requests: BTreeMap<ColumnId, FulltextRequest>,
170        puffin_manager_factory: PuffinManagerFactory,
171    ) -> Self {
172        let requests = Arc::new(requests);
173        let index_source = IndexSource::new(table_dir, path_type, puffin_manager_factory, store);
174
175        Self {
176            predicate_key: PredicateKey::new_fulltext(requests.clone()),
177            requests,
178            index_source,
179            bloom_filter_index_cache: None,
180        }
181    }
182
183    /// Sets the file cache.
184    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
185        self.index_source.set_file_cache(file_cache);
186        self
187    }
188
189    /// Sets the puffin metadata cache.
190    pub fn with_puffin_metadata_cache(
191        mut self,
192        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
193    ) -> Self {
194        self.index_source
195            .set_puffin_metadata_cache(puffin_metadata_cache);
196        self
197    }
198
199    /// Sets the bloom filter cache.
200    pub fn with_bloom_filter_cache(
201        mut self,
202        bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
203    ) -> Self {
204        self.bloom_filter_index_cache = bloom_filter_index_cache;
205        self
206    }
207
208    /// Returns the predicate key.
209    pub fn predicate_key(&self) -> &PredicateKey {
210        &self.predicate_key
211    }
212}
213
214impl FulltextIndexApplier {
215    /// Applies fine-grained fulltext index to the specified SST file.
216    /// Returns the row ids that match the queries.
217    ///
218    /// # Arguments
219    /// * `file_id` - The region file ID to apply predicates to
220    /// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
221    /// * `metrics` - Optional mutable reference to collect metrics on demand
222    pub async fn apply_fine(
223        &self,
224        file_id: RegionIndexId,
225        file_size_hint: Option<u64>,
226        mut metrics: Option<&mut FulltextIndexApplyMetrics>,
227    ) -> Result<Option<BTreeSet<RowId>>> {
228        let apply_start = Instant::now();
229
230        let mut row_ids: Option<BTreeSet<RowId>> = None;
231        for (column_id, request) in self.requests.iter() {
232            if request.queries.is_empty() && request.terms.is_empty() {
233                continue;
234            }
235
236            let Some(result) = self
237                .apply_fine_one_column(
238                    file_size_hint,
239                    file_id,
240                    *column_id,
241                    request,
242                    metrics.as_deref_mut(),
243                )
244                .await?
245            else {
246                continue;
247            };
248
249            if let Some(ids) = row_ids.as_mut() {
250                ids.retain(|id| result.contains(id));
251            } else {
252                row_ids = Some(result);
253            }
254
255            if let Some(ids) = row_ids.as_ref()
256                && ids.is_empty()
257            {
258                break;
259            }
260        }
261
262        // Record elapsed time to histogram and collect metrics if requested
263        let elapsed = apply_start.elapsed();
264        INDEX_APPLY_ELAPSED
265            .with_label_values(&[TYPE_FULLTEXT_INDEX])
266            .observe(elapsed.as_secs_f64());
267
268        if let Some(m) = metrics {
269            m.apply_elapsed += elapsed;
270        }
271
272        Ok(row_ids)
273    }
274
275    async fn apply_fine_one_column(
276        &self,
277        file_size_hint: Option<u64>,
278        file_id: RegionIndexId,
279        column_id: ColumnId,
280        request: &FulltextRequest,
281        metrics: Option<&mut FulltextIndexApplyMetrics>,
282    ) -> Result<Option<BTreeSet<RowId>>> {
283        let blob_key = format!(
284            "{INDEX_BLOB_TYPE_TANTIVY}-{}",
285            IndexTarget::ColumnId(column_id)
286        );
287        let dir = self
288            .index_source
289            .dir(file_id, &blob_key, file_size_hint, metrics)
290            .await?;
291
292        let dir = match &dir {
293            Some(dir) => dir,
294            None => {
295                return Ok(None);
296            }
297        };
298
299        let config = Config::from_blob_metadata(dir.metadata()).context(ApplyFulltextIndexSnafu)?;
300        let path = dir.path();
301
302        let searcher =
303            TantivyFulltextIndexSearcher::new(path, config).context(ApplyFulltextIndexSnafu)?;
304        let mut row_ids: Option<BTreeSet<RowId>> = None;
305
306        // 1. Apply queries
307        for query in &request.queries {
308            let result = searcher
309                .search(&query.0)
310                .await
311                .context(ApplyFulltextIndexSnafu)?;
312
313            if let Some(ids) = row_ids.as_mut() {
314                ids.retain(|id| result.contains(id));
315            } else {
316                row_ids = Some(result);
317            }
318
319            if let Some(ids) = row_ids.as_ref()
320                && ids.is_empty()
321            {
322                break;
323            }
324        }
325
326        // 2. Apply terms
327        let query = request.terms_as_query(config.case_sensitive);
328        if !query.0.is_empty() {
329            let result = searcher
330                .search(&query.0)
331                .await
332                .context(ApplyFulltextIndexSnafu)?;
333
334            if let Some(ids) = row_ids.as_mut() {
335                ids.retain(|id| result.contains(id));
336            } else {
337                row_ids = Some(result);
338            }
339        }
340
341        Ok(row_ids)
342    }
343}
344
345impl FulltextIndexApplier {
346    /// Applies coarse-grained fulltext index to the specified SST file.
347    /// Returns (row group id -> ranges) that match the queries.
348    ///
349    /// Row group id existing in the returned result means that the row group is searched.
350    /// Empty ranges means that the row group is searched but no rows are found.
351    ///
352    /// # Arguments
353    /// * `file_id` - The region file ID to apply predicates to
354    /// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
355    /// * `row_groups` - Iterator of row group lengths and whether to search in the row group
356    /// * `metrics` - Optional mutable reference to collect metrics on demand
357    pub async fn apply_coarse(
358        &self,
359        file_id: RegionIndexId,
360        file_size_hint: Option<u64>,
361        row_groups: impl Iterator<Item = (usize, bool)>,
362        mut metrics: Option<&mut FulltextIndexApplyMetrics>,
363    ) -> Result<Option<Vec<(usize, Vec<Range<usize>>)>>> {
364        let apply_start = Instant::now();
365
366        let (input, mut output) = Self::init_coarse_output(row_groups);
367        let mut applied = false;
368
369        for (column_id, request) in self.requests.iter() {
370            if request.terms.is_empty() {
371                // only apply terms
372                continue;
373            }
374
375            applied |= self
376                .apply_coarse_one_column(
377                    file_id,
378                    file_size_hint,
379                    *column_id,
380                    &request.terms,
381                    &mut output,
382                    metrics.as_deref_mut(),
383                )
384                .await?;
385        }
386
387        if !applied {
388            return Ok(None);
389        }
390
391        Self::adjust_coarse_output(input, &mut output);
392
393        // Record elapsed time to histogram and collect metrics if requested
394        let elapsed = apply_start.elapsed();
395        INDEX_APPLY_ELAPSED
396            .with_label_values(&[TYPE_FULLTEXT_INDEX])
397            .observe(elapsed.as_secs_f64());
398
399        if let Some(m) = metrics {
400            m.apply_elapsed += elapsed;
401        }
402
403        Ok(Some(output))
404    }
405
406    async fn apply_coarse_one_column(
407        &self,
408        file_id: RegionIndexId,
409        file_size_hint: Option<u64>,
410        column_id: ColumnId,
411        terms: &[FulltextTerm],
412        output: &mut [(usize, Vec<Range<usize>>)],
413        mut metrics: Option<&mut FulltextIndexApplyMetrics>,
414    ) -> Result<bool> {
415        let blob_key = format!(
416            "{INDEX_BLOB_TYPE_BLOOM}-{}",
417            IndexTarget::ColumnId(column_id)
418        );
419        let Some(reader) = self
420            .index_source
421            .blob(file_id, &blob_key, file_size_hint, metrics.as_deref_mut())
422            .await?
423        else {
424            return Ok(false);
425        };
426        let config =
427            Config::from_blob_metadata(reader.metadata()).context(ApplyFulltextIndexSnafu)?;
428
429        let predicates = Self::terms_to_predicates(terms, &config);
430        if predicates.is_empty() {
431            return Ok(false);
432        }
433
434        let range_reader = reader.reader().await.context(PuffinBuildReaderSnafu)?;
435        let reader = if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
436            let blob_size = range_reader
437                .metadata()
438                .await
439                .context(MetadataSnafu)?
440                .content_length;
441            let reader = CachedBloomFilterIndexBlobReader::new(
442                file_id.file_id(),
443                file_id.version,
444                column_id,
445                Tag::Fulltext,
446                blob_size,
447                BloomFilterReaderImpl::new(range_reader),
448                bloom_filter_cache.clone(),
449            );
450            Box::new(reader) as _
451        } else {
452            Box::new(BloomFilterReaderImpl::new(range_reader)) as _
453        };
454
455        let mut applier = BloomFilterApplier::new(reader)
456            .await
457            .context(ApplyBloomFilterIndexSnafu)?;
458        for (_, row_group_output) in output.iter_mut() {
459            // All rows are filtered out, skip the search
460            if row_group_output.is_empty() {
461                continue;
462            }
463
464            *row_group_output = applier
465                .search(
466                    &predicates,
467                    row_group_output,
468                    metrics
469                        .as_deref_mut()
470                        .map(|m| &mut m.bloom_filter_read_metrics),
471                )
472                .await
473                .context(ApplyBloomFilterIndexSnafu)?;
474        }
475
476        Ok(true)
477    }
478
479    /// Initializes the coarse output. Must call `adjust_coarse_output` after applying bloom filters.
480    ///
481    /// `row_groups` is a list of (row group length, whether to search).
482    ///
483    /// Returns (`input`, `output`):
484    /// * `input` is a list of (row group index to search, row group range based on start of the file).
485    /// * `output` is a list of (row group index to search, row group ranges based on start of the file).
486    #[allow(clippy::type_complexity)]
487    fn init_coarse_output(
488        row_groups: impl Iterator<Item = (usize, bool)>,
489    ) -> (Vec<(usize, Range<usize>)>, Vec<(usize, Vec<Range<usize>>)>) {
490        // Calculates row groups' ranges based on start of the file.
491        let mut input = Vec::with_capacity(row_groups.size_hint().0);
492        let mut start = 0;
493        for (i, (len, to_search)) in row_groups.enumerate() {
494            let end = start + len;
495            if to_search {
496                input.push((i, start..end));
497            }
498            start = end;
499        }
500
501        // Initializes output with input ranges, but ranges are based on start of the file not the row group,
502        // so we need to adjust them later.
503        let output = input
504            .iter()
505            .map(|(i, range)| (*i, vec![range.clone()]))
506            .collect::<Vec<_>>();
507
508        (input, output)
509    }
510
511    /// Adjusts the coarse output. Makes the output ranges based on row group start.
512    fn adjust_coarse_output(
513        input: Vec<(usize, Range<usize>)>,
514        output: &mut [(usize, Vec<Range<usize>>)],
515    ) {
516        // adjust ranges to be based on row group
517        for ((_, output), (_, input)) in output.iter_mut().zip(input) {
518            let start = input.start;
519            for range in output.iter_mut() {
520                range.start -= start;
521                range.end -= start;
522            }
523        }
524    }
525
526    /// Converts terms to predicates.
527    ///
528    /// Split terms by non-alphanumeric characters and convert them to lowercase if case-insensitive.
529    /// Multiple terms are combined with AND semantics.
530    fn terms_to_predicates(terms: &[FulltextTerm], config: &Config) -> Vec<InListPredicate> {
531        let mut probes = HashSet::new();
532        for term in terms {
533            if config.case_sensitive && term.col_lowered {
534                // lowercased terms are not indexed
535                continue;
536            }
537            probes.extend(Self::term_to_probes(&term.term, config));
538        }
539
540        probes
541            .into_iter()
542            .map(|p| InListPredicate {
543                list: iter::once(p).collect(),
544            })
545            .collect::<Vec<_>>()
546    }
547
548    fn term_to_probes<'a>(term: &'a str, config: &'a Config) -> impl Iterator<Item = Vec<u8>> + 'a {
549        let tokens = match config.analyzer {
550            Analyzer::English => EnglishTokenizer {}.tokenize(term),
551            Analyzer::Chinese => ChineseTokenizer {}.tokenize(term),
552        };
553
554        tokens.into_iter().map(|t| {
555            if !config.case_sensitive {
556                t.to_lowercase()
557            } else {
558                t.to_string()
559            }
560            .into_bytes()
561        })
562    }
563}
564
565/// The source of the index.
566struct IndexSource {
567    table_dir: String,
568
569    /// Path type for generating file paths.
570    path_type: PathType,
571
572    /// The puffin manager factory.
573    puffin_manager_factory: PuffinManagerFactory,
574
575    /// Store responsible for accessing remote index files.
576    remote_store: ObjectStore,
577
578    /// Local file cache.
579    file_cache: Option<FileCacheRef>,
580
581    /// The puffin metadata cache.
582    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
583}
584
585impl IndexSource {
586    fn new(
587        table_dir: String,
588        path_type: PathType,
589        puffin_manager_factory: PuffinManagerFactory,
590        remote_store: ObjectStore,
591    ) -> Self {
592        Self {
593            table_dir,
594            path_type,
595            puffin_manager_factory,
596            remote_store,
597            file_cache: None,
598            puffin_metadata_cache: None,
599        }
600    }
601
602    fn set_file_cache(&mut self, file_cache: Option<FileCacheRef>) {
603        self.file_cache = file_cache;
604    }
605
606    fn set_puffin_metadata_cache(&mut self, puffin_metadata_cache: Option<PuffinMetadataCacheRef>) {
607        self.puffin_metadata_cache = puffin_metadata_cache;
608    }
609
610    /// Returns the blob with the specified key from local cache or remote store.
611    ///
612    /// Returns `None` if the blob is not found.
613    async fn blob(
614        &self,
615        file_id: RegionIndexId,
616        key: &str,
617        file_size_hint: Option<u64>,
618        metrics: Option<&mut FulltextIndexApplyMetrics>,
619    ) -> Result<Option<GuardWithMetadata<SstPuffinBlob>>> {
620        let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
621
622        // Track cache miss if fallbacked to remote
623        if fallbacked && let Some(m) = metrics {
624            m.blob_cache_miss += 1;
625        }
626
627        let res = reader.blob(key).await;
628        match res {
629            Ok(blob) => Ok(Some(blob)),
630            Err(err) if err.is_blob_not_found() => Ok(None),
631            Err(err) => {
632                if fallbacked {
633                    Err(err).context(PuffinReadBlobSnafu)
634                } else {
635                    warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
636                    let reader = self.build_remote(file_id, file_size_hint).await?;
637                    let res = reader.blob(key).await;
638                    match res {
639                        Ok(blob) => Ok(Some(blob)),
640                        Err(err) if err.is_blob_not_found() => Ok(None),
641                        Err(err) => Err(err).context(PuffinReadBlobSnafu),
642                    }
643                }
644            }
645        }
646    }
647
648    /// Returns the directory with the specified key from local cache or remote store.
649    ///
650    /// Returns `None` if the directory is not found.
651    async fn dir(
652        &self,
653        file_id: RegionIndexId,
654        key: &str,
655        file_size_hint: Option<u64>,
656        mut metrics: Option<&mut FulltextIndexApplyMetrics>,
657    ) -> Result<Option<GuardWithMetadata<SstPuffinDir>>> {
658        let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
659
660        // Track cache miss if fallbacked to remote
661        if fallbacked && let Some(m) = &mut metrics {
662            m.blob_cache_miss += 1;
663        }
664
665        let start = metrics.as_ref().map(|_| Instant::now());
666        let res = reader.dir(key).await;
667        match res {
668            Ok((dir, dir_metrics)) => {
669                if let Some(m) = metrics {
670                    // Safety: start is Some when metrics is Some
671                    m.collect_dir_metrics(start.unwrap().elapsed(), dir_metrics);
672                }
673                Ok(Some(dir))
674            }
675            Err(err) if err.is_blob_not_found() => Ok(None),
676            Err(err) => {
677                if fallbacked {
678                    Err(err).context(PuffinReadBlobSnafu)
679                } else {
680                    warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
681                    let reader = self.build_remote(file_id, file_size_hint).await?;
682                    let start = metrics.as_ref().map(|_| Instant::now());
683                    let res = reader.dir(key).await;
684                    match res {
685                        Ok((dir, dir_metrics)) => {
686                            if let Some(m) = metrics {
687                                // Safety: start is Some when metrics is Some
688                                m.collect_dir_metrics(start.unwrap().elapsed(), dir_metrics);
689                            }
690                            Ok(Some(dir))
691                        }
692                        Err(err) if err.is_blob_not_found() => Ok(None),
693                        Err(err) => Err(err).context(PuffinReadBlobSnafu),
694                    }
695                }
696            }
697        }
698    }
699
700    /// Return reader and whether it is fallbacked to remote store.
701    async fn ensure_reader(
702        &self,
703        file_id: RegionIndexId,
704        file_size_hint: Option<u64>,
705    ) -> Result<(SstPuffinReader, bool)> {
706        match self.build_local_cache(file_id, file_size_hint).await {
707            Ok(Some(r)) => Ok((r, false)),
708            Ok(None) => Ok((self.build_remote(file_id, file_size_hint).await?, true)),
709            Err(err) => Err(err),
710        }
711    }
712
713    async fn build_local_cache(
714        &self,
715        file_id: RegionIndexId,
716        file_size_hint: Option<u64>,
717    ) -> Result<Option<SstPuffinReader>> {
718        let Some(file_cache) = &self.file_cache else {
719            return Ok(None);
720        };
721
722        let index_key = IndexKey::new(
723            file_id.region_id(),
724            file_id.file_id(),
725            FileType::Puffin(file_id.version),
726        );
727        if file_cache.get(index_key).await.is_none() {
728            return Ok(None);
729        };
730
731        let puffin_manager = self
732            .puffin_manager_factory
733            .build(
734                file_cache.local_store(),
735                WriteCachePathProvider::new(file_cache.clone()),
736            )
737            .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
738        let reader = puffin_manager
739            .reader(&file_id)
740            .await
741            .context(PuffinBuildReaderSnafu)?
742            .with_file_size_hint(file_size_hint);
743        Ok(Some(reader))
744    }
745
746    async fn build_remote(
747        &self,
748        file_id: RegionIndexId,
749        file_size_hint: Option<u64>,
750    ) -> Result<SstPuffinReader> {
751        let puffin_manager = self
752            .puffin_manager_factory
753            .build(
754                self.remote_store.clone(),
755                RegionFilePathFactory::new(self.table_dir.clone(), self.path_type),
756            )
757            .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
758
759        let reader = puffin_manager
760            .reader(&file_id)
761            .await
762            .context(PuffinBuildReaderSnafu)?
763            .with_file_size_hint(file_size_hint);
764
765        Ok(reader)
766    }
767}