mito2/sst/index/fulltext_index/
applier.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashSet};
16use std::iter;
17use std::ops::Range;
18use std::sync::Arc;
19use std::time::Instant;
20
21use common_base::range_read::RangeReader;
22use common_telemetry::{tracing, warn};
23use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
24use index::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReaderImpl};
25use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
26use index::fulltext_index::tokenizer::{ChineseTokenizer, EnglishTokenizer, Tokenizer};
27use index::fulltext_index::{Analyzer, Config};
28use index::target::IndexTarget;
29use object_store::ObjectStore;
30use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
31use puffin::puffin_manager::{GuardWithMetadata, PuffinManager, PuffinReader};
32use snafu::ResultExt;
33use store_api::region_request::PathType;
34use store_api::storage::ColumnId;
35
36use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
37use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
38use crate::cache::index::bloom_filter_index::{
39    BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
40};
41use crate::cache::index::result_cache::PredicateKey;
42use crate::error::{
43    ApplyBloomFilterIndexSnafu, ApplyFulltextIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu,
44    PuffinReadBlobSnafu, Result,
45};
46use crate::metrics::INDEX_APPLY_ELAPSED;
47use crate::sst::file::RegionIndexId;
48use crate::sst::index::fulltext_index::applier::builder::{FulltextRequest, FulltextTerm};
49use crate::sst::index::fulltext_index::{INDEX_BLOB_TYPE_BLOOM, INDEX_BLOB_TYPE_TANTIVY};
50use crate::sst::index::puffin_manager::{
51    PuffinManagerFactory, SstPuffinBlob, SstPuffinDir, SstPuffinReader,
52};
53use crate::sst::index::{TYPE_FULLTEXT_INDEX, trigger_index_background_download};
54
55pub mod builder;
56
57/// Metrics for tracking fulltext index apply operations.
58#[derive(Default, Clone)]
59pub struct FulltextIndexApplyMetrics {
60    /// Total time spent applying the index.
61    pub apply_elapsed: std::time::Duration,
62    /// Number of blob cache misses.
63    pub blob_cache_miss: usize,
64    /// Number of directory cache hits.
65    pub dir_cache_hit: usize,
66    /// Number of directory cache misses.
67    pub dir_cache_miss: usize,
68    /// Elapsed time to initialize directory data.
69    pub dir_init_elapsed: std::time::Duration,
70    /// Metrics for bloom filter reads.
71    pub bloom_filter_read_metrics: BloomFilterReadMetrics,
72}
73
74impl std::fmt::Debug for FulltextIndexApplyMetrics {
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        let Self {
77            apply_elapsed,
78            blob_cache_miss,
79            dir_cache_hit,
80            dir_cache_miss,
81            dir_init_elapsed,
82            bloom_filter_read_metrics,
83        } = self;
84
85        if self.is_empty() {
86            return write!(f, "{{}}");
87        }
88        write!(f, "{{")?;
89
90        write!(f, "\"apply_elapsed\":\"{:?}\"", apply_elapsed)?;
91
92        if *blob_cache_miss > 0 {
93            write!(f, ", \"blob_cache_miss\":{}", blob_cache_miss)?;
94        }
95        if *dir_cache_hit > 0 {
96            write!(f, ", \"dir_cache_hit\":{}", dir_cache_hit)?;
97        }
98        if *dir_cache_miss > 0 {
99            write!(f, ", \"dir_cache_miss\":{}", dir_cache_miss)?;
100        }
101        if !dir_init_elapsed.is_zero() {
102            write!(f, ", \"dir_init_elapsed\":\"{:?}\"", dir_init_elapsed)?;
103        }
104        write!(
105            f,
106            ", \"bloom_filter_read_metrics\":{:?}",
107            bloom_filter_read_metrics
108        )?;
109
110        write!(f, "}}")
111    }
112}
113
114impl FulltextIndexApplyMetrics {
115    /// Returns true if the metrics are empty (contain no meaningful data).
116    pub fn is_empty(&self) -> bool {
117        self.apply_elapsed.is_zero()
118    }
119
120    /// Collects metrics from a directory read operation.
121    pub fn collect_dir_metrics(
122        &mut self,
123        elapsed: std::time::Duration,
124        dir_metrics: puffin::puffin_manager::DirMetrics,
125    ) {
126        self.dir_init_elapsed += elapsed;
127        if dir_metrics.cache_hit {
128            self.dir_cache_hit += 1;
129        } else {
130            self.dir_cache_miss += 1;
131        }
132    }
133
134    /// Merges another metrics into this one.
135    pub fn merge_from(&mut self, other: &Self) {
136        self.apply_elapsed += other.apply_elapsed;
137        self.blob_cache_miss += other.blob_cache_miss;
138        self.dir_cache_hit += other.dir_cache_hit;
139        self.dir_cache_miss += other.dir_cache_miss;
140        self.dir_init_elapsed += other.dir_init_elapsed;
141        self.bloom_filter_read_metrics
142            .merge_from(&other.bloom_filter_read_metrics);
143    }
144}
145
146/// `FulltextIndexApplier` is responsible for applying fulltext index to the provided SST files
147pub struct FulltextIndexApplier {
148    /// Requests to be applied.
149    requests: Arc<BTreeMap<ColumnId, FulltextRequest>>,
150
151    /// The source of the index.
152    index_source: IndexSource,
153
154    /// Cache for bloom filter index.
155    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
156
157    /// Predicate key. Used to identify the predicate and fetch result from cache.
158    predicate_key: PredicateKey,
159}
160
161pub type FulltextIndexApplierRef = Arc<FulltextIndexApplier>;
162
163impl FulltextIndexApplier {
164    /// Creates a new `FulltextIndexApplier`.
165    pub fn new(
166        table_dir: String,
167        path_type: PathType,
168        store: ObjectStore,
169        requests: BTreeMap<ColumnId, FulltextRequest>,
170        puffin_manager_factory: PuffinManagerFactory,
171    ) -> Self {
172        let requests = Arc::new(requests);
173        let index_source = IndexSource::new(table_dir, path_type, puffin_manager_factory, store);
174
175        Self {
176            predicate_key: PredicateKey::new_fulltext(requests.clone()),
177            requests,
178            index_source,
179            bloom_filter_index_cache: None,
180        }
181    }
182
183    /// Sets the file cache.
184    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
185        self.index_source.set_file_cache(file_cache);
186        self
187    }
188
189    /// Sets the puffin metadata cache.
190    pub fn with_puffin_metadata_cache(
191        mut self,
192        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
193    ) -> Self {
194        self.index_source
195            .set_puffin_metadata_cache(puffin_metadata_cache);
196        self
197    }
198
199    /// Sets the bloom filter cache.
200    pub fn with_bloom_filter_cache(
201        mut self,
202        bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
203    ) -> Self {
204        self.bloom_filter_index_cache = bloom_filter_index_cache;
205        self
206    }
207
208    /// Returns the predicate key.
209    pub fn predicate_key(&self) -> &PredicateKey {
210        &self.predicate_key
211    }
212}
213
214impl FulltextIndexApplier {
215    /// Applies fine-grained fulltext index to the specified SST file.
216    /// Returns the row ids that match the queries.
217    ///
218    /// # Arguments
219    /// * `file_id` - The region file ID to apply predicates to
220    /// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
221    /// * `metrics` - Optional mutable reference to collect metrics on demand
222    #[tracing::instrument(
223        skip_all,
224        fields(file_id = %file_id)
225    )]
226    pub async fn apply_fine(
227        &self,
228        file_id: RegionIndexId,
229        file_size_hint: Option<u64>,
230        mut metrics: Option<&mut FulltextIndexApplyMetrics>,
231    ) -> Result<Option<BTreeSet<RowId>>> {
232        let apply_start = Instant::now();
233
234        let mut row_ids: Option<BTreeSet<RowId>> = None;
235        for (column_id, request) in self.requests.iter() {
236            if request.queries.is_empty() && request.terms.is_empty() {
237                continue;
238            }
239
240            let Some(result) = self
241                .apply_fine_one_column(
242                    file_size_hint,
243                    file_id,
244                    *column_id,
245                    request,
246                    metrics.as_deref_mut(),
247                )
248                .await?
249            else {
250                continue;
251            };
252
253            if let Some(ids) = row_ids.as_mut() {
254                ids.retain(|id| result.contains(id));
255            } else {
256                row_ids = Some(result);
257            }
258
259            if let Some(ids) = row_ids.as_ref()
260                && ids.is_empty()
261            {
262                break;
263            }
264        }
265
266        // Record elapsed time to histogram and collect metrics if requested
267        let elapsed = apply_start.elapsed();
268        INDEX_APPLY_ELAPSED
269            .with_label_values(&[TYPE_FULLTEXT_INDEX])
270            .observe(elapsed.as_secs_f64());
271
272        if let Some(m) = metrics {
273            m.apply_elapsed += elapsed;
274        }
275
276        Ok(row_ids)
277    }
278
279    async fn apply_fine_one_column(
280        &self,
281        file_size_hint: Option<u64>,
282        file_id: RegionIndexId,
283        column_id: ColumnId,
284        request: &FulltextRequest,
285        metrics: Option<&mut FulltextIndexApplyMetrics>,
286    ) -> Result<Option<BTreeSet<RowId>>> {
287        let blob_key = format!(
288            "{INDEX_BLOB_TYPE_TANTIVY}-{}",
289            IndexTarget::ColumnId(column_id)
290        );
291        let dir = self
292            .index_source
293            .dir(file_id, &blob_key, file_size_hint, metrics)
294            .await?;
295
296        let dir = match &dir {
297            Some(dir) => dir,
298            None => {
299                return Ok(None);
300            }
301        };
302
303        let config = Config::from_blob_metadata(dir.metadata()).context(ApplyFulltextIndexSnafu)?;
304        let path = dir.path();
305
306        let searcher =
307            TantivyFulltextIndexSearcher::new(path, config).context(ApplyFulltextIndexSnafu)?;
308        let mut row_ids: Option<BTreeSet<RowId>> = None;
309
310        // 1. Apply queries
311        for query in &request.queries {
312            let result = searcher
313                .search(&query.0)
314                .await
315                .context(ApplyFulltextIndexSnafu)?;
316
317            if let Some(ids) = row_ids.as_mut() {
318                ids.retain(|id| result.contains(id));
319            } else {
320                row_ids = Some(result);
321            }
322
323            if let Some(ids) = row_ids.as_ref()
324                && ids.is_empty()
325            {
326                break;
327            }
328        }
329
330        // 2. Apply terms
331        let query = request.terms_as_query(config.case_sensitive);
332        if !query.0.is_empty() {
333            let result = searcher
334                .search(&query.0)
335                .await
336                .context(ApplyFulltextIndexSnafu)?;
337
338            if let Some(ids) = row_ids.as_mut() {
339                ids.retain(|id| result.contains(id));
340            } else {
341                row_ids = Some(result);
342            }
343        }
344
345        Ok(row_ids)
346    }
347}
348
349impl FulltextIndexApplier {
350    /// Applies coarse-grained fulltext index to the specified SST file.
351    /// Returns (row group id -> ranges) that match the queries.
352    ///
353    /// Row group id existing in the returned result means that the row group is searched.
354    /// Empty ranges means that the row group is searched but no rows are found.
355    ///
356    /// # Arguments
357    /// * `file_id` - The region file ID to apply predicates to
358    /// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
359    /// * `row_groups` - Iterator of row group lengths and whether to search in the row group
360    /// * `metrics` - Optional mutable reference to collect metrics on demand
361    #[allow(clippy::type_complexity)]
362    #[tracing::instrument(
363        skip_all,
364        fields(file_id = %file_id)
365    )]
366    pub async fn apply_coarse(
367        &self,
368        file_id: RegionIndexId,
369        file_size_hint: Option<u64>,
370        row_groups: impl Iterator<Item = (usize, bool)>,
371        mut metrics: Option<&mut FulltextIndexApplyMetrics>,
372    ) -> Result<Option<Vec<(usize, Vec<Range<usize>>)>>> {
373        let apply_start = Instant::now();
374
375        let (input, mut output) = Self::init_coarse_output(row_groups);
376        let mut applied = false;
377
378        for (column_id, request) in self.requests.iter() {
379            if request.terms.is_empty() {
380                // only apply terms
381                continue;
382            }
383
384            applied |= self
385                .apply_coarse_one_column(
386                    file_id,
387                    file_size_hint,
388                    *column_id,
389                    &request.terms,
390                    &mut output,
391                    metrics.as_deref_mut(),
392                )
393                .await?;
394        }
395
396        if !applied {
397            return Ok(None);
398        }
399
400        Self::adjust_coarse_output(input, &mut output);
401
402        // Record elapsed time to histogram and collect metrics if requested
403        let elapsed = apply_start.elapsed();
404        INDEX_APPLY_ELAPSED
405            .with_label_values(&[TYPE_FULLTEXT_INDEX])
406            .observe(elapsed.as_secs_f64());
407
408        if let Some(m) = metrics {
409            m.apply_elapsed += elapsed;
410        }
411
412        Ok(Some(output))
413    }
414
415    async fn apply_coarse_one_column(
416        &self,
417        file_id: RegionIndexId,
418        file_size_hint: Option<u64>,
419        column_id: ColumnId,
420        terms: &[FulltextTerm],
421        output: &mut [(usize, Vec<Range<usize>>)],
422        mut metrics: Option<&mut FulltextIndexApplyMetrics>,
423    ) -> Result<bool> {
424        let blob_key = format!(
425            "{INDEX_BLOB_TYPE_BLOOM}-{}",
426            IndexTarget::ColumnId(column_id)
427        );
428        let Some(reader) = self
429            .index_source
430            .blob(file_id, &blob_key, file_size_hint, metrics.as_deref_mut())
431            .await?
432        else {
433            return Ok(false);
434        };
435        let config =
436            Config::from_blob_metadata(reader.metadata()).context(ApplyFulltextIndexSnafu)?;
437
438        let predicates = Self::terms_to_predicates(terms, &config);
439        if predicates.is_empty() {
440            return Ok(false);
441        }
442
443        let range_reader = reader.reader().await.context(PuffinBuildReaderSnafu)?;
444        let reader = if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
445            let blob_size = range_reader
446                .metadata()
447                .await
448                .context(MetadataSnafu)?
449                .content_length;
450            let reader = CachedBloomFilterIndexBlobReader::new(
451                file_id.file_id(),
452                file_id.version,
453                column_id,
454                Tag::Fulltext,
455                blob_size,
456                BloomFilterReaderImpl::new(range_reader),
457                bloom_filter_cache.clone(),
458            );
459            Box::new(reader) as _
460        } else {
461            Box::new(BloomFilterReaderImpl::new(range_reader)) as _
462        };
463
464        let mut applier = BloomFilterApplier::new(reader)
465            .await
466            .context(ApplyBloomFilterIndexSnafu)?;
467        for (_, row_group_output) in output.iter_mut() {
468            // All rows are filtered out, skip the search
469            if row_group_output.is_empty() {
470                continue;
471            }
472
473            *row_group_output = applier
474                .search(
475                    &predicates,
476                    row_group_output,
477                    metrics
478                        .as_deref_mut()
479                        .map(|m| &mut m.bloom_filter_read_metrics),
480                )
481                .await
482                .context(ApplyBloomFilterIndexSnafu)?;
483        }
484
485        Ok(true)
486    }
487
488    /// Initializes the coarse output. Must call `adjust_coarse_output` after applying bloom filters.
489    ///
490    /// `row_groups` is a list of (row group length, whether to search).
491    ///
492    /// Returns (`input`, `output`):
493    /// * `input` is a list of (row group index to search, row group range based on start of the file).
494    /// * `output` is a list of (row group index to search, row group ranges based on start of the file).
495    #[allow(clippy::type_complexity)]
496    fn init_coarse_output(
497        row_groups: impl Iterator<Item = (usize, bool)>,
498    ) -> (Vec<(usize, Range<usize>)>, Vec<(usize, Vec<Range<usize>>)>) {
499        // Calculates row groups' ranges based on start of the file.
500        let mut input = Vec::with_capacity(row_groups.size_hint().0);
501        let mut start = 0;
502        for (i, (len, to_search)) in row_groups.enumerate() {
503            let end = start + len;
504            if to_search {
505                input.push((i, start..end));
506            }
507            start = end;
508        }
509
510        // Initializes output with input ranges, but ranges are based on start of the file not the row group,
511        // so we need to adjust them later.
512        let output = input
513            .iter()
514            .map(|(i, range)| (*i, vec![range.clone()]))
515            .collect::<Vec<_>>();
516
517        (input, output)
518    }
519
520    /// Adjusts the coarse output. Makes the output ranges based on row group start.
521    fn adjust_coarse_output(
522        input: Vec<(usize, Range<usize>)>,
523        output: &mut [(usize, Vec<Range<usize>>)],
524    ) {
525        // adjust ranges to be based on row group
526        for ((_, output), (_, input)) in output.iter_mut().zip(input) {
527            let start = input.start;
528            for range in output.iter_mut() {
529                range.start -= start;
530                range.end -= start;
531            }
532        }
533    }
534
535    /// Converts terms to predicates.
536    ///
537    /// Split terms by non-alphanumeric characters and convert them to lowercase if case-insensitive.
538    /// Multiple terms are combined with AND semantics.
539    fn terms_to_predicates(terms: &[FulltextTerm], config: &Config) -> Vec<InListPredicate> {
540        let mut probes = HashSet::new();
541        for term in terms {
542            if config.case_sensitive && term.col_lowered {
543                // lowercased terms are not indexed
544                continue;
545            }
546            probes.extend(Self::term_to_probes(&term.term, config));
547        }
548
549        probes
550            .into_iter()
551            .map(|p| InListPredicate {
552                list: iter::once(p).collect(),
553            })
554            .collect::<Vec<_>>()
555    }
556
557    fn term_to_probes<'a>(term: &'a str, config: &'a Config) -> impl Iterator<Item = Vec<u8>> + 'a {
558        let tokens = match config.analyzer {
559            Analyzer::English => EnglishTokenizer {}.tokenize(term),
560            Analyzer::Chinese => ChineseTokenizer {}.tokenize(term),
561        };
562
563        tokens.into_iter().map(|t| {
564            if !config.case_sensitive {
565                t.to_lowercase()
566            } else {
567                t.to_string()
568            }
569            .into_bytes()
570        })
571    }
572}
573
574/// The source of the index.
575struct IndexSource {
576    table_dir: String,
577
578    /// Path type for generating file paths.
579    path_type: PathType,
580
581    /// The puffin manager factory.
582    puffin_manager_factory: PuffinManagerFactory,
583
584    /// Store responsible for accessing remote index files.
585    remote_store: ObjectStore,
586
587    /// Local file cache.
588    file_cache: Option<FileCacheRef>,
589
590    /// The puffin metadata cache.
591    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
592}
593
594impl IndexSource {
595    fn new(
596        table_dir: String,
597        path_type: PathType,
598        puffin_manager_factory: PuffinManagerFactory,
599        remote_store: ObjectStore,
600    ) -> Self {
601        Self {
602            table_dir,
603            path_type,
604            puffin_manager_factory,
605            remote_store,
606            file_cache: None,
607            puffin_metadata_cache: None,
608        }
609    }
610
611    fn set_file_cache(&mut self, file_cache: Option<FileCacheRef>) {
612        self.file_cache = file_cache;
613    }
614
615    fn set_puffin_metadata_cache(&mut self, puffin_metadata_cache: Option<PuffinMetadataCacheRef>) {
616        self.puffin_metadata_cache = puffin_metadata_cache;
617    }
618
619    /// Returns the blob with the specified key from local cache or remote store.
620    ///
621    /// Returns `None` if the blob is not found.
622    async fn blob(
623        &self,
624        file_id: RegionIndexId,
625        key: &str,
626        file_size_hint: Option<u64>,
627        metrics: Option<&mut FulltextIndexApplyMetrics>,
628    ) -> Result<Option<GuardWithMetadata<SstPuffinBlob>>> {
629        let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
630
631        // Track cache miss if fallbacked to remote
632        if fallbacked && let Some(m) = metrics {
633            m.blob_cache_miss += 1;
634        }
635
636        let res = reader.blob(key).await;
637        match res {
638            Ok(blob) => Ok(Some(blob)),
639            Err(err) if err.is_blob_not_found() => Ok(None),
640            Err(err) => {
641                if fallbacked {
642                    Err(err).context(PuffinReadBlobSnafu)
643                } else {
644                    warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
645                    let reader = self.build_remote(file_id, file_size_hint).await?;
646                    let res = reader.blob(key).await;
647                    match res {
648                        Ok(blob) => Ok(Some(blob)),
649                        Err(err) if err.is_blob_not_found() => Ok(None),
650                        Err(err) => Err(err).context(PuffinReadBlobSnafu),
651                    }
652                }
653            }
654        }
655    }
656
657    /// Returns the directory with the specified key from local cache or remote store.
658    ///
659    /// Returns `None` if the directory is not found.
660    async fn dir(
661        &self,
662        file_id: RegionIndexId,
663        key: &str,
664        file_size_hint: Option<u64>,
665        mut metrics: Option<&mut FulltextIndexApplyMetrics>,
666    ) -> Result<Option<GuardWithMetadata<SstPuffinDir>>> {
667        let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
668
669        // Track cache miss if fallbacked to remote
670        if fallbacked && let Some(m) = &mut metrics {
671            m.blob_cache_miss += 1;
672        }
673
674        let start = metrics.as_ref().map(|_| Instant::now());
675        let res = reader.dir(key).await;
676        match res {
677            Ok((dir, dir_metrics)) => {
678                if let Some(m) = metrics {
679                    // Safety: start is Some when metrics is Some
680                    m.collect_dir_metrics(start.unwrap().elapsed(), dir_metrics);
681                }
682                Ok(Some(dir))
683            }
684            Err(err) if err.is_blob_not_found() => Ok(None),
685            Err(err) => {
686                if fallbacked {
687                    Err(err).context(PuffinReadBlobSnafu)
688                } else {
689                    warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
690                    let reader = self.build_remote(file_id, file_size_hint).await?;
691                    let start = metrics.as_ref().map(|_| Instant::now());
692                    let res = reader.dir(key).await;
693                    match res {
694                        Ok((dir, dir_metrics)) => {
695                            if let Some(m) = metrics {
696                                // Safety: start is Some when metrics is Some
697                                m.collect_dir_metrics(start.unwrap().elapsed(), dir_metrics);
698                            }
699                            Ok(Some(dir))
700                        }
701                        Err(err) if err.is_blob_not_found() => Ok(None),
702                        Err(err) => Err(err).context(PuffinReadBlobSnafu),
703                    }
704                }
705            }
706        }
707    }
708
709    /// Return reader and whether it is fallbacked to remote store.
710    async fn ensure_reader(
711        &self,
712        file_id: RegionIndexId,
713        file_size_hint: Option<u64>,
714    ) -> Result<(SstPuffinReader, bool)> {
715        match self.build_local_cache(file_id, file_size_hint).await {
716            Ok(Some(r)) => Ok((r, false)),
717            Ok(None) => Ok((self.build_remote(file_id, file_size_hint).await?, true)),
718            Err(err) => Err(err),
719        }
720    }
721
722    async fn build_local_cache(
723        &self,
724        file_id: RegionIndexId,
725        file_size_hint: Option<u64>,
726    ) -> Result<Option<SstPuffinReader>> {
727        let Some(file_cache) = &self.file_cache else {
728            return Ok(None);
729        };
730
731        let index_key = IndexKey::new(
732            file_id.region_id(),
733            file_id.file_id(),
734            FileType::Puffin(file_id.version),
735        );
736        if file_cache.get(index_key).await.is_none() {
737            return Ok(None);
738        };
739
740        let puffin_manager = self
741            .puffin_manager_factory
742            .build(
743                file_cache.local_store(),
744                WriteCachePathProvider::new(file_cache.clone()),
745            )
746            .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
747        let reader = puffin_manager
748            .reader(&file_id)
749            .await
750            .context(PuffinBuildReaderSnafu)?
751            .with_file_size_hint(file_size_hint);
752        Ok(Some(reader))
753    }
754
755    async fn build_remote(
756        &self,
757        file_id: RegionIndexId,
758        file_size_hint: Option<u64>,
759    ) -> Result<SstPuffinReader> {
760        let path_factory = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
761
762        // Trigger background download if file cache and file size are available
763        trigger_index_background_download(
764            self.file_cache.as_ref(),
765            &file_id,
766            file_size_hint,
767            &path_factory,
768            &self.remote_store,
769        );
770
771        let puffin_manager = self
772            .puffin_manager_factory
773            .build(self.remote_store.clone(), path_factory)
774            .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
775
776        let reader = puffin_manager
777            .reader(&file_id)
778            .await
779            .context(PuffinBuildReaderSnafu)?
780            .with_file_size_hint(file_size_hint);
781
782        Ok(reader)
783    }
784}