mito2/
access_layer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use async_stream::try_stream;
19use common_time::Timestamp;
20use futures::{Stream, TryStreamExt};
21use object_store::services::Fs;
22use object_store::util::{join_dir, with_instrument_layers};
23use object_store::{ATOMIC_WRITE_DIR, ErrorKind, OLD_ATOMIC_WRITE_DIR, ObjectStore};
24use smallvec::SmallVec;
25use snafu::ResultExt;
26use store_api::metadata::RegionMetadataRef;
27use store_api::region_request::PathType;
28use store_api::sst_entry::StorageSstEntry;
29use store_api::storage::{FileId, RegionId, SequenceNumber};
30
31use crate::cache::CacheManagerRef;
32use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
33use crate::cache::write_cache::SstUploadRequest;
34use crate::config::{BloomFilterConfig, FulltextIndexConfig, IndexConfig, InvertedIndexConfig};
35use crate::error::{
36    CleanDirSnafu, DeleteIndexSnafu, DeleteIndexesSnafu, DeleteSstsSnafu, OpenDalSnafu, Result,
37};
38use crate::metrics::{COMPACTION_STAGE_ELAPSED, FLUSH_ELAPSED};
39use crate::read::FlatSource;
40use crate::region::options::IndexOptions;
41use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId};
42use crate::sst::index::IndexerBuilderImpl;
43use crate::sst::index::intermediate::IntermediateManager;
44use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinManager};
45use crate::sst::location::{self, region_dir_from_table_dir};
46use crate::sst::parquet::reader::ParquetReaderBuilder;
47use crate::sst::parquet::writer::ParquetWriter;
48use crate::sst::parquet::{SstInfo, WriteOptions};
49use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY, FormatType};
50
51pub type AccessLayerRef = Arc<AccessLayer>;
52/// SST write results.
53pub type SstInfoArray = SmallVec<[SstInfo; 2]>;
54
55/// Write operation type.
56#[derive(Eq, PartialEq, Debug)]
57pub enum WriteType {
58    /// Writes from flush
59    Flush,
60    /// Writes from compaction.
61    Compaction,
62}
63
64#[derive(Debug)]
65pub struct Metrics {
66    pub(crate) write_type: WriteType,
67    pub(crate) iter_source: Duration,
68    pub(crate) write_batch: Duration,
69    pub(crate) update_index: Duration,
70    pub(crate) upload_parquet: Duration,
71    pub(crate) upload_puffin: Duration,
72    pub(crate) compact_memtable: Duration,
73}
74
75impl Metrics {
76    pub fn new(write_type: WriteType) -> Self {
77        Self {
78            write_type,
79            iter_source: Default::default(),
80            write_batch: Default::default(),
81            update_index: Default::default(),
82            upload_parquet: Default::default(),
83            upload_puffin: Default::default(),
84            compact_memtable: Default::default(),
85        }
86    }
87
88    pub(crate) fn merge(mut self, other: Self) -> Self {
89        assert_eq!(self.write_type, other.write_type);
90        self.iter_source += other.iter_source;
91        self.write_batch += other.write_batch;
92        self.update_index += other.update_index;
93        self.upload_parquet += other.upload_parquet;
94        self.upload_puffin += other.upload_puffin;
95        self.compact_memtable += other.compact_memtable;
96        self
97    }
98
99    pub(crate) fn observe(self) {
100        match self.write_type {
101            WriteType::Flush => {
102                FLUSH_ELAPSED
103                    .with_label_values(&["iter_source"])
104                    .observe(self.iter_source.as_secs_f64());
105                FLUSH_ELAPSED
106                    .with_label_values(&["write_batch"])
107                    .observe(self.write_batch.as_secs_f64());
108                FLUSH_ELAPSED
109                    .with_label_values(&["update_index"])
110                    .observe(self.update_index.as_secs_f64());
111                FLUSH_ELAPSED
112                    .with_label_values(&["upload_parquet"])
113                    .observe(self.upload_parquet.as_secs_f64());
114                FLUSH_ELAPSED
115                    .with_label_values(&["upload_puffin"])
116                    .observe(self.upload_puffin.as_secs_f64());
117                if !self.compact_memtable.is_zero() {
118                    FLUSH_ELAPSED
119                        .with_label_values(&["compact_memtable"])
120                        .observe(self.upload_puffin.as_secs_f64());
121                }
122            }
123            WriteType::Compaction => {
124                COMPACTION_STAGE_ELAPSED
125                    .with_label_values(&["iter_source"])
126                    .observe(self.iter_source.as_secs_f64());
127                COMPACTION_STAGE_ELAPSED
128                    .with_label_values(&["write_batch"])
129                    .observe(self.write_batch.as_secs_f64());
130                COMPACTION_STAGE_ELAPSED
131                    .with_label_values(&["update_index"])
132                    .observe(self.update_index.as_secs_f64());
133                COMPACTION_STAGE_ELAPSED
134                    .with_label_values(&["upload_parquet"])
135                    .observe(self.upload_parquet.as_secs_f64());
136                COMPACTION_STAGE_ELAPSED
137                    .with_label_values(&["upload_puffin"])
138                    .observe(self.upload_puffin.as_secs_f64());
139            }
140        };
141    }
142}
143
144/// A layer to access SST files under the same directory.
145pub struct AccessLayer {
146    table_dir: String,
147    /// Path type for generating file paths.
148    path_type: PathType,
149    /// Target object store.
150    object_store: ObjectStore,
151    /// Puffin manager factory for index.
152    puffin_manager_factory: PuffinManagerFactory,
153    /// Intermediate manager for inverted index.
154    intermediate_manager: IntermediateManager,
155}
156
157impl std::fmt::Debug for AccessLayer {
158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159        f.debug_struct("AccessLayer")
160            .field("table_dir", &self.table_dir)
161            .finish()
162    }
163}
164
165impl AccessLayer {
166    /// Returns a new [AccessLayer] for specific `table_dir`.
167    pub fn new(
168        table_dir: impl Into<String>,
169        path_type: PathType,
170        object_store: ObjectStore,
171        puffin_manager_factory: PuffinManagerFactory,
172        intermediate_manager: IntermediateManager,
173    ) -> AccessLayer {
174        AccessLayer {
175            table_dir: table_dir.into(),
176            path_type,
177            object_store,
178            puffin_manager_factory,
179            intermediate_manager,
180        }
181    }
182
183    /// Returns the directory of the table.
184    pub fn table_dir(&self) -> &str {
185        &self.table_dir
186    }
187
188    /// Returns the object store of the layer.
189    pub fn object_store(&self) -> &ObjectStore {
190        &self.object_store
191    }
192
193    /// Returns the path type of the layer.
194    pub fn path_type(&self) -> PathType {
195        self.path_type
196    }
197
198    /// Returns the puffin manager factory.
199    pub fn puffin_manager_factory(&self) -> &PuffinManagerFactory {
200        &self.puffin_manager_factory
201    }
202
203    /// Returns the intermediate manager.
204    pub fn intermediate_manager(&self) -> &IntermediateManager {
205        &self.intermediate_manager
206    }
207
208    /// Build the puffin manager.
209    pub(crate) fn build_puffin_manager(&self) -> SstPuffinManager {
210        let store = self.object_store.clone();
211        let path_provider =
212            RegionFilePathFactory::new(self.table_dir().to_string(), self.path_type());
213        self.puffin_manager_factory.build(store, path_provider)
214    }
215
216    pub(crate) async fn delete_index(
217        &self,
218        index_file_id: RegionIndexId,
219    ) -> Result<(), crate::error::Error> {
220        let path = location::index_file_path(
221            &self.table_dir,
222            RegionIndexId::new(index_file_id.file_id, index_file_id.version),
223            self.path_type,
224        );
225        self.object_store
226            .delete(&path)
227            .await
228            .context(DeleteIndexSnafu {
229                file_id: index_file_id.file_id(),
230            })?;
231        Ok(())
232    }
233
234    pub(crate) async fn delete_ssts(
235        &self,
236        region_id: RegionId,
237        file_ids: &[FileId],
238    ) -> Result<(), crate::error::Error> {
239        if file_ids.is_empty() {
240            return Ok(());
241        }
242
243        let attempted_files = file_ids.to_vec();
244        let paths: Vec<_> = file_ids
245            .iter()
246            .map(|file_id| {
247                location::sst_file_path(
248                    &self.table_dir,
249                    RegionFileId::new(region_id, *file_id),
250                    self.path_type,
251                )
252            })
253            .collect();
254
255        let mut deleter = self
256            .object_store
257            .deleter()
258            .await
259            .with_context(|_| DeleteSstsSnafu {
260                region_id,
261                file_ids: attempted_files.clone(),
262            })?;
263        deleter
264            .delete_iter(paths.iter().map(String::as_str))
265            .await
266            .with_context(|_| DeleteSstsSnafu {
267                region_id,
268                file_ids: attempted_files.clone(),
269            })?;
270        deleter.close().await.with_context(|_| DeleteSstsSnafu {
271            region_id,
272            file_ids: attempted_files,
273        })?;
274
275        Ok(())
276    }
277
278    pub(crate) async fn delete_indexes(
279        &self,
280        index_ids: &[RegionIndexId],
281    ) -> Result<(), crate::error::Error> {
282        if index_ids.is_empty() {
283            return Ok(());
284        }
285
286        let file_ids: Vec<_> = index_ids
287            .iter()
288            .map(|index_id| index_id.file_id())
289            .collect();
290        let paths: Vec<_> = index_ids
291            .iter()
292            .map(|index_id| location::index_file_path(&self.table_dir, *index_id, self.path_type))
293            .collect();
294
295        let mut deleter = self
296            .object_store
297            .deleter()
298            .await
299            .context(DeleteIndexesSnafu {
300                file_ids: file_ids.clone(),
301            })?;
302        deleter
303            .delete_iter(paths.iter().map(String::as_str))
304            .await
305            .context(DeleteIndexesSnafu {
306                file_ids: file_ids.clone(),
307            })?;
308        deleter
309            .close()
310            .await
311            .context(DeleteIndexesSnafu { file_ids })?;
312
313        Ok(())
314    }
315
316    /// Returns the directory of the region in the table.
317    pub fn build_region_dir(&self, region_id: RegionId) -> String {
318        region_dir_from_table_dir(&self.table_dir, region_id, self.path_type)
319    }
320
321    /// Returns a reader builder for specific `file`.
322    pub(crate) fn read_sst(&self, file: FileHandle) -> ParquetReaderBuilder {
323        ParquetReaderBuilder::new(
324            self.table_dir.clone(),
325            self.path_type,
326            file,
327            self.object_store.clone(),
328        )
329    }
330
331    /// Writes a SST with specific `file_id` and `metadata` to the layer.
332    ///
333    /// Returns the info of the SST. If no data written, returns None.
334    pub async fn write_sst(
335        &self,
336        request: SstWriteRequest,
337        write_opts: &WriteOptions,
338        metrics: &mut Metrics,
339    ) -> Result<SstInfoArray> {
340        let region_id = request.metadata.region_id;
341        let region_metadata = request.metadata.clone();
342        let cache_manager = request.cache_manager.clone();
343
344        let sst_info = if let Some(write_cache) = cache_manager.write_cache() {
345            // Write to the write cache.
346            write_cache
347                .write_and_upload_sst(
348                    request,
349                    SstUploadRequest {
350                        dest_path_provider: RegionFilePathFactory::new(
351                            self.table_dir.clone(),
352                            self.path_type,
353                        ),
354                        remote_store: self.object_store.clone(),
355                    },
356                    write_opts,
357                    metrics,
358                )
359                .await?
360        } else {
361            // Write cache is disabled.
362            let store = self.object_store.clone();
363            let path_provider = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
364            let indexer_builder = IndexerBuilderImpl {
365                build_type: request.op_type.into(),
366                metadata: request.metadata.clone(),
367                row_group_size: write_opts.row_group_size,
368                puffin_manager: self
369                    .puffin_manager_factory
370                    .build(store, path_provider.clone()),
371                write_cache_enabled: false,
372                intermediate_manager: self.intermediate_manager.clone(),
373                index_options: request.index_options,
374                inverted_index_config: request.inverted_index_config,
375                fulltext_index_config: request.fulltext_index_config,
376                bloom_filter_index_config: request.bloom_filter_index_config,
377                #[cfg(feature = "vector_index")]
378                vector_index_config: request.vector_index_config,
379            };
380            // We disable write cache on file system but we still use atomic write.
381            // TODO(yingwen): If we support other non-fs stores without the write cache, then
382            // we may have find a way to check whether we need the cleaner.
383            let cleaner = TempFileCleaner::new(region_id, self.object_store.clone());
384            let mut writer = ParquetWriter::new_with_object_store(
385                self.object_store.clone(),
386                request.metadata,
387                request.index_config,
388                indexer_builder,
389                path_provider,
390                metrics,
391            )
392            .await
393            .with_file_cleaner(cleaner);
394            match request.sst_write_format {
395                FormatType::PrimaryKey => {
396                    writer
397                        .write_all_flat_as_primary_key(
398                            request.source,
399                            request.max_sequence,
400                            write_opts,
401                        )
402                        .await?
403                }
404                FormatType::Flat => {
405                    writer
406                        .write_all_flat(request.source, request.max_sequence, write_opts)
407                        .await?
408                }
409            }
410        };
411
412        // Put parquet metadata to cache manager.
413        if !sst_info.is_empty() {
414            for sst in &sst_info {
415                if let Some(parquet_metadata) = &sst.file_metadata {
416                    cache_manager.put_parquet_meta_data(
417                        RegionFileId::new(region_id, sst.file_id),
418                        parquet_metadata.clone(),
419                        Some(region_metadata.clone()),
420                    )
421                }
422            }
423        }
424
425        Ok(sst_info)
426    }
427
428    /// Puts encoded SST bytes to the write cache (if enabled) and uploads it to the object store.
429    pub(crate) async fn put_sst(
430        &self,
431        data: &bytes::Bytes,
432        region_id: RegionId,
433        sst_info: &SstInfo,
434        cache_manager: &CacheManagerRef,
435    ) -> Result<Metrics> {
436        if let Some(write_cache) = cache_manager.write_cache() {
437            // Write to cache and upload to remote store
438            let upload_request = SstUploadRequest {
439                dest_path_provider: RegionFilePathFactory::new(
440                    self.table_dir.clone(),
441                    self.path_type,
442                ),
443                remote_store: self.object_store.clone(),
444            };
445            write_cache
446                .put_and_upload_sst(data, region_id, sst_info, upload_request)
447                .await
448        } else {
449            let start = Instant::now();
450            let cleaner = TempFileCleaner::new(region_id, self.object_store.clone());
451            let path_provider = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
452            let sst_file_path =
453                path_provider.build_sst_file_path(RegionFileId::new(region_id, sst_info.file_id));
454            let mut writer = self
455                .object_store
456                .writer_with(&sst_file_path)
457                .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
458                .concurrent(DEFAULT_WRITE_CONCURRENCY)
459                .await
460                .context(OpenDalSnafu)?;
461            if let Err(err) = writer.write(data.clone()).await.context(OpenDalSnafu) {
462                cleaner.clean_by_file_id(sst_info.file_id).await;
463                return Err(err);
464            }
465            if let Err(err) = writer.close().await.context(OpenDalSnafu) {
466                cleaner.clean_by_file_id(sst_info.file_id).await;
467                return Err(err);
468            }
469            let mut metrics = Metrics::new(WriteType::Flush);
470            metrics.write_batch = start.elapsed();
471            Ok(metrics)
472        }
473    }
474
475    /// Lists the SST entries from the storage layer in the table directory.
476    pub fn storage_sst_entries(&self) -> impl Stream<Item = Result<StorageSstEntry>> + use<> {
477        let object_store = self.object_store.clone();
478        let table_dir = self.table_dir.clone();
479
480        try_stream! {
481            let mut lister = object_store
482                .lister_with(table_dir.as_str())
483                .recursive(true)
484                .await
485                .context(OpenDalSnafu)?;
486
487            while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? {
488                let metadata = entry.metadata();
489                if metadata.is_dir() {
490                    continue;
491                }
492
493                let path = entry.path();
494                if !path.ends_with(".parquet") && !path.ends_with(".puffin") {
495                    continue;
496                }
497
498                let file_size = metadata.content_length();
499                let file_size = if file_size == 0 { None } else { Some(file_size) };
500                let last_modified_ms = metadata
501                    .last_modified()
502                    .map(|ts| Timestamp::new_millisecond(ts.timestamp_millis()));
503
504                let entry = StorageSstEntry {
505                    file_path: path.to_string(),
506                    file_size,
507                    last_modified_ms,
508                    node_id: None,
509                };
510
511                yield entry;
512            }
513        }
514    }
515}
516
517/// `OperationType` represents the origin of the `SstWriteRequest`.
518#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
519pub enum OperationType {
520    Flush,
521    Compact,
522}
523
524/// Contents to build a SST.
525pub struct SstWriteRequest {
526    pub op_type: OperationType,
527    pub metadata: RegionMetadataRef,
528    pub source: FlatSource,
529    pub cache_manager: CacheManagerRef,
530    #[allow(dead_code)]
531    pub storage: Option<String>,
532    pub max_sequence: Option<SequenceNumber>,
533    pub sst_write_format: FormatType,
534
535    /// Configs for index
536    pub index_options: IndexOptions,
537    pub index_config: IndexConfig,
538    pub inverted_index_config: InvertedIndexConfig,
539    pub fulltext_index_config: FulltextIndexConfig,
540    pub bloom_filter_index_config: BloomFilterConfig,
541    #[cfg(feature = "vector_index")]
542    pub vector_index_config: crate::config::VectorIndexConfig,
543}
544
545/// Cleaner to remove temp files on the atomic write dir.
546pub(crate) struct TempFileCleaner {
547    region_id: RegionId,
548    object_store: ObjectStore,
549}
550
551impl TempFileCleaner {
552    /// Constructs the cleaner for the region and store.
553    pub(crate) fn new(region_id: RegionId, object_store: ObjectStore) -> Self {
554        Self {
555            region_id,
556            object_store,
557        }
558    }
559
560    /// Removes the SST and index file from the local atomic dir by the file id.
561    /// This only removes the initial index, since the index version is always 0 for a new SST, this method should be safe to pass 0.
562    pub(crate) async fn clean_by_file_id(&self, file_id: FileId) {
563        let sst_key = IndexKey::new(self.region_id, file_id, FileType::Parquet).to_string();
564        let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin(0)).to_string();
565
566        Self::clean_atomic_dir_files(&self.object_store, &[&sst_key, &index_key]).await;
567    }
568
569    /// Removes the files from the local atomic dir by their names.
570    pub(crate) async fn clean_atomic_dir_files(
571        local_store: &ObjectStore,
572        names_to_remove: &[&str],
573    ) {
574        // We don't know the actual suffix of the file under atomic dir, so we have
575        // to list the dir. The cost should be acceptable as there won't be to many files.
576        let Ok(entries) = local_store.list(ATOMIC_WRITE_DIR).await.inspect_err(|e| {
577            if e.kind() != ErrorKind::NotFound {
578                common_telemetry::error!(e; "Failed to list tmp files for {:?}", names_to_remove)
579            }
580        }) else {
581            return;
582        };
583
584        // In our case, we can ensure the file id is unique so it is safe to remove all files
585        // with the same file id under the atomic write dir.
586        let actual_files: Vec<_> = entries
587            .into_iter()
588            .filter_map(|entry| {
589                if entry.metadata().is_dir() {
590                    return None;
591                }
592
593                // Remove name that matches files_to_remove.
594                let should_remove = names_to_remove
595                    .iter()
596                    .any(|file| entry.name().starts_with(file));
597                if should_remove {
598                    Some(entry.path().to_string())
599                } else {
600                    None
601                }
602            })
603            .collect();
604
605        common_telemetry::warn!(
606            "Clean files {:?} under atomic write dir for {:?}",
607            actual_files,
608            names_to_remove
609        );
610
611        if let Err(e) = local_store.delete_iter(actual_files).await {
612            common_telemetry::error!(e; "Failed to delete tmp file for {:?}", names_to_remove);
613        }
614    }
615}
616
617pub(crate) async fn new_fs_cache_store(root: &str) -> Result<ObjectStore> {
618    let atomic_write_dir = join_dir(root, ATOMIC_WRITE_DIR);
619    clean_dir(&atomic_write_dir).await?;
620
621    // Compatible code. Remove this after a major release.
622    let old_atomic_temp_dir = join_dir(root, OLD_ATOMIC_WRITE_DIR);
623    clean_dir(&old_atomic_temp_dir).await?;
624
625    let builder = Fs::default().root(root).atomic_write_dir(&atomic_write_dir);
626    let store = ObjectStore::new(builder).context(OpenDalSnafu)?.finish();
627
628    Ok(with_instrument_layers(store, false))
629}
630
631/// Clean the directory.
632async fn clean_dir(dir: &str) -> Result<()> {
633    if tokio::fs::try_exists(dir)
634        .await
635        .context(CleanDirSnafu { dir })?
636    {
637        tokio::fs::remove_dir_all(dir)
638            .await
639            .context(CleanDirSnafu { dir })?;
640    }
641
642    Ok(())
643}
644
645/// Path provider for SST file and index file.
646pub trait FilePathProvider: Send + Sync {
647    /// Creates index file path of given file id. Version default to 0, and not shown in the path.
648    fn build_index_file_path(&self, file_id: RegionFileId) -> String;
649
650    /// Creates index file path of given index id (with version support).
651    fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String;
652
653    /// Creates SST file path of given file id.
654    fn build_sst_file_path(&self, file_id: RegionFileId) -> String;
655}
656
657/// Path provider that builds paths in local write cache.
658#[derive(Clone)]
659pub(crate) struct WriteCachePathProvider {
660    file_cache: FileCacheRef,
661}
662
663impl WriteCachePathProvider {
664    /// Creates a new `WriteCachePathProvider` instance.
665    pub fn new(file_cache: FileCacheRef) -> Self {
666        Self { file_cache }
667    }
668}
669
670impl FilePathProvider for WriteCachePathProvider {
671    fn build_index_file_path(&self, file_id: RegionFileId) -> String {
672        let puffin_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin(0));
673        self.file_cache.cache_file_path(puffin_key)
674    }
675
676    fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
677        let puffin_key = IndexKey::new(
678            index_id.region_id(),
679            index_id.file_id(),
680            FileType::Puffin(index_id.version),
681        );
682        self.file_cache.cache_file_path(puffin_key)
683    }
684
685    fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
686        let parquet_file_key =
687            IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
688        self.file_cache.cache_file_path(parquet_file_key)
689    }
690}
691
692/// Path provider that builds paths in region storage path.
693#[derive(Clone, Debug)]
694pub(crate) struct RegionFilePathFactory {
695    pub(crate) table_dir: String,
696    pub(crate) path_type: PathType,
697}
698
699impl RegionFilePathFactory {
700    /// Creates a new `RegionFilePathFactory` instance.
701    pub fn new(table_dir: String, path_type: PathType) -> Self {
702        Self {
703            table_dir,
704            path_type,
705        }
706    }
707}
708
709impl FilePathProvider for RegionFilePathFactory {
710    fn build_index_file_path(&self, file_id: RegionFileId) -> String {
711        location::index_file_path_legacy(&self.table_dir, file_id, self.path_type)
712    }
713
714    fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
715        location::index_file_path(&self.table_dir, index_id, self.path_type)
716    }
717
718    fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
719        location::sst_file_path(&self.table_dir, file_id, self.path_type)
720    }
721}