mito2/region/
opener.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Region opener.
16
17use std::any::TypeId;
18use std::collections::HashMap;
19use std::sync::atomic::{AtomicI64, AtomicU64};
20use std::sync::{Arc, LazyLock, Mutex};
21use std::time::Instant;
22
23use common_telemetry::{debug, error, info, warn};
24use common_wal::options::WalOptions;
25use futures::StreamExt;
26use futures::future::BoxFuture;
27use log_store::kafka::log_store::KafkaLogStore;
28use log_store::noop::log_store::NoopLogStore;
29use log_store::raft_engine::log_store::RaftEngineLogStore;
30use object_store::manager::ObjectStoreManagerRef;
31use object_store::util::normalize_dir;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::logstore::LogStore;
34use store_api::logstore::provider::Provider;
35use store_api::metadata::{
36    ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
37};
38use store_api::region_engine::RegionRole;
39use store_api::region_request::PathType;
40use store_api::storage::{ColumnId, RegionId};
41use tokio::sync::Semaphore;
42
43use crate::access_layer::AccessLayer;
44use crate::cache::CacheManagerRef;
45use crate::cache::file_cache::{FileCache, FileType, IndexKey};
46use crate::config::MitoConfig;
47use crate::error;
48use crate::error::{
49    EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu,
50    Result, StaleLogEntrySnafu,
51};
52use crate::manifest::action::RegionManifest;
53use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
54use crate::memtable::MemtableBuilderProvider;
55use crate::memtable::bulk::part::BulkPart;
56use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef};
57use crate::metrics::{CACHE_FILL_DOWNLOADED_FILES, CACHE_FILL_PENDING_FILES};
58use crate::region::options::RegionOptions;
59use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
60use crate::region::{
61    ManifestContext, ManifestStats, MitoRegion, MitoRegionRef, RegionLeaderState, RegionRoleState,
62};
63use crate::region_write_ctx::RegionWriteCtx;
64use crate::request::OptionOutputTx;
65use crate::schedule::scheduler::SchedulerRef;
66use crate::sst::FormatType;
67use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId};
68use crate::sst::file_purger::{FilePurgerRef, create_file_purger};
69use crate::sst::file_ref::FileReferenceManagerRef;
70use crate::sst::index::intermediate::IntermediateManager;
71use crate::sst::index::puffin_manager::PuffinManagerFactory;
72use crate::sst::location::{self, region_dir_from_table_dir};
73use crate::sst::parquet::metadata::MetadataLoader;
74use crate::sst::parquet::reader::MetadataCacheMetrics;
75use crate::time_provider::TimeProviderRef;
76use crate::wal::entry_reader::WalEntryReader;
77use crate::wal::{EntryId, Wal};
78
79const PARQUET_META_PRELOAD_CONCURRENCY: usize = 8;
80
81static PARQUET_META_PRELOAD_SEMAPHORE: LazyLock<Semaphore> =
82    LazyLock::new(|| Semaphore::new(PARQUET_META_PRELOAD_CONCURRENCY));
83
84/// A fetcher to retrieve partition expr for a region.
85///
86/// Compatibility: older regions didn't persist `partition_expr` in engine metadata,
87/// while newer ones do. On open, we backfill it via this fetcher and persist it
88/// to the manifest so future opens don't need refetching.
89#[async_trait::async_trait]
90pub trait PartitionExprFetcher {
91    async fn fetch_expr(&self, region_id: RegionId) -> Option<String>;
92}
93
94pub type PartitionExprFetcherRef = Arc<dyn PartitionExprFetcher + Send + Sync>;
95
96/// Builder to create a new [MitoRegion] or open an existing one.
97pub(crate) struct RegionOpener {
98    region_id: RegionId,
99    metadata_builder: Option<RegionMetadataBuilder>,
100    memtable_builder_provider: MemtableBuilderProvider,
101    object_store_manager: ObjectStoreManagerRef,
102    table_dir: String,
103    path_type: PathType,
104    purge_scheduler: SchedulerRef,
105    options: Option<RegionOptions>,
106    cache_manager: Option<CacheManagerRef>,
107    skip_wal_replay: bool,
108    puffin_manager_factory: PuffinManagerFactory,
109    intermediate_manager: IntermediateManager,
110    time_provider: TimeProviderRef,
111    stats: ManifestStats,
112    wal_entry_reader: Option<Box<dyn WalEntryReader>>,
113    replay_checkpoint: Option<u64>,
114    file_ref_manager: FileReferenceManagerRef,
115    partition_expr_fetcher: PartitionExprFetcherRef,
116}
117
118impl RegionOpener {
119    /// Returns a new opener.
120    // TODO(LFC): Reduce the number of arguments.
121    #[allow(clippy::too_many_arguments)]
122    pub(crate) fn new(
123        region_id: RegionId,
124        table_dir: &str,
125        path_type: PathType,
126        memtable_builder_provider: MemtableBuilderProvider,
127        object_store_manager: ObjectStoreManagerRef,
128        purge_scheduler: SchedulerRef,
129        puffin_manager_factory: PuffinManagerFactory,
130        intermediate_manager: IntermediateManager,
131        time_provider: TimeProviderRef,
132        file_ref_manager: FileReferenceManagerRef,
133        partition_expr_fetcher: PartitionExprFetcherRef,
134    ) -> RegionOpener {
135        RegionOpener {
136            region_id,
137            metadata_builder: None,
138            memtable_builder_provider,
139            object_store_manager,
140            table_dir: normalize_dir(table_dir),
141            path_type,
142            purge_scheduler,
143            options: None,
144            cache_manager: None,
145            skip_wal_replay: false,
146            puffin_manager_factory,
147            intermediate_manager,
148            time_provider,
149            stats: Default::default(),
150            wal_entry_reader: None,
151            replay_checkpoint: None,
152            file_ref_manager,
153            partition_expr_fetcher,
154        }
155    }
156
157    /// Sets metadata builder of the region to create.
158    pub(crate) fn metadata_builder(mut self, builder: RegionMetadataBuilder) -> Self {
159        self.metadata_builder = Some(builder);
160        self
161    }
162
163    /// Computes the region directory from table_dir and region_id.
164    fn region_dir(&self) -> String {
165        region_dir_from_table_dir(&self.table_dir, self.region_id, self.path_type)
166    }
167
168    /// Builds the region metadata.
169    ///
170    /// # Panics
171    /// - Panics if `options` is not set.
172    /// - Panics if `metadata_builder` is not set.
173    fn build_metadata(&mut self) -> Result<RegionMetadata> {
174        let options = self.options.as_ref().unwrap();
175        let mut metadata_builder = self.metadata_builder.take().unwrap();
176        metadata_builder.primary_key_encoding(options.primary_key_encoding());
177        metadata_builder.build().context(InvalidMetadataSnafu)
178    }
179
180    /// Parses and sets options for the region.
181    pub(crate) fn parse_options(self, options: HashMap<String, String>) -> Result<Self> {
182        self.options(RegionOptions::try_from(&options)?)
183    }
184
185    /// Sets the replay checkpoint for the region.
186    pub(crate) fn replay_checkpoint(mut self, replay_checkpoint: Option<u64>) -> Self {
187        self.replay_checkpoint = replay_checkpoint;
188        self
189    }
190
191    /// If a [WalEntryReader] is set, the [RegionOpener] will use [WalEntryReader] instead of
192    /// constructing a new one from scratch.
193    pub(crate) fn wal_entry_reader(
194        mut self,
195        wal_entry_reader: Option<Box<dyn WalEntryReader>>,
196    ) -> Self {
197        self.wal_entry_reader = wal_entry_reader;
198        self
199    }
200
201    /// Sets options for the region.
202    pub(crate) fn options(mut self, options: RegionOptions) -> Result<Self> {
203        options.validate()?;
204        self.options = Some(options);
205        Ok(self)
206    }
207
208    /// Sets the cache manager for the region.
209    pub(crate) fn cache(mut self, cache_manager: Option<CacheManagerRef>) -> Self {
210        self.cache_manager = cache_manager;
211        self
212    }
213
214    /// Sets the `skip_wal_replay`.
215    pub(crate) fn skip_wal_replay(mut self, skip: bool) -> Self {
216        self.skip_wal_replay = skip;
217        self
218    }
219
220    /// Writes region manifest and creates a new region if it does not exist.
221    /// Opens the region if it already exists.
222    ///
223    /// # Panics
224    /// - Panics if `metadata_builder` is not set.
225    /// - Panics if `options` is not set.
226    pub(crate) async fn create_or_open<S: LogStore>(
227        mut self,
228        config: &MitoConfig,
229        wal: &Wal<S>,
230    ) -> Result<MitoRegionRef> {
231        let region_id = self.region_id;
232        let region_dir = self.region_dir();
233        let metadata = self.build_metadata()?;
234        // Tries to open the region.
235        match self.maybe_open(config, wal).await {
236            Ok(Some(region)) => {
237                let recovered = region.metadata();
238                // Checks the schema of the region.
239                let expect = &metadata;
240                check_recovered_region(
241                    &recovered,
242                    expect.region_id,
243                    &expect.column_metadatas,
244                    &expect.primary_key,
245                )?;
246                // To keep consistency with Create behavior, set the opened Region to RegionRole::Leader.
247                region.set_role(RegionRole::Leader);
248
249                return Ok(region);
250            }
251            Ok(None) => {
252                debug!(
253                    "No data under directory {}, region_id: {}",
254                    region_dir, self.region_id
255                );
256            }
257            Err(e) => {
258                warn!(e;
259                    "Failed to open region {} before creating it, region_dir: {}",
260                    self.region_id, region_dir
261                );
262            }
263        }
264        // Safety: must be set before calling this method.
265        let mut options = self.options.take().unwrap();
266        let object_store = get_object_store(&options.storage, &self.object_store_manager)?;
267        let provider = self.provider::<S>(&options.wal_options)?;
268        let metadata = Arc::new(metadata);
269        // Sets the sst_format based on options or flat_format flag
270        let sst_format = if let Some(format) = options.sst_format {
271            format
272        } else if config.default_experimental_flat_format {
273            options.sst_format = Some(FormatType::Flat);
274            FormatType::Flat
275        } else {
276            // Default to PrimaryKeyParquet for newly created regions
277            options.sst_format = Some(FormatType::PrimaryKey);
278            FormatType::PrimaryKey
279        };
280        // Create a manifest manager for this region and writes regions to the manifest file.
281        let mut region_manifest_options =
282            RegionManifestOptions::new(config, &region_dir, &object_store);
283        // Set manifest cache if available
284        region_manifest_options.manifest_cache = self
285            .cache_manager
286            .as_ref()
287            .and_then(|cm| cm.write_cache())
288            .and_then(|wc| wc.manifest_cache());
289        // For remote WAL, we need to set flushed_entry_id to current topic's latest entry id.
290        let flushed_entry_id = provider.initial_flushed_entry_id::<S>(wal.store());
291        let manifest_manager = RegionManifestManager::new(
292            metadata.clone(),
293            flushed_entry_id,
294            region_manifest_options,
295            sst_format,
296            &self.stats,
297        )
298        .await?;
299
300        let memtable_builder = self.memtable_builder_provider.builder_for_options(&options);
301        let part_duration = options.compaction.time_window();
302        // Initial memtable id is 0.
303        let mutable = Arc::new(TimePartitions::new(
304            metadata.clone(),
305            memtable_builder.clone(),
306            0,
307            part_duration,
308        ));
309
310        debug!(
311            "Create region {} with options: {:?}, default_flat_format: {}",
312            region_id, options, config.default_experimental_flat_format
313        );
314
315        let version = VersionBuilder::new(metadata, mutable)
316            .options(options)
317            .build();
318        let version_control = Arc::new(VersionControl::new(version));
319        let access_layer = Arc::new(AccessLayer::new(
320            self.table_dir.clone(),
321            self.path_type,
322            object_store,
323            self.puffin_manager_factory,
324            self.intermediate_manager,
325        ));
326        let now = self.time_provider.current_time_millis();
327
328        Ok(Arc::new(MitoRegion {
329            region_id,
330            version_control,
331            access_layer: access_layer.clone(),
332            // Region is writable after it is created.
333            manifest_ctx: Arc::new(ManifestContext::new(
334                manifest_manager,
335                RegionRoleState::Leader(RegionLeaderState::Writable),
336            )),
337            file_purger: create_file_purger(
338                config.gc.enable,
339                self.path_type,
340                self.purge_scheduler,
341                access_layer,
342                self.cache_manager,
343                self.file_ref_manager.clone(),
344            ),
345            provider,
346            last_flush_millis: AtomicI64::new(now),
347            last_compaction_millis: AtomicI64::new(now),
348            time_provider: self.time_provider.clone(),
349            topic_latest_entry_id: AtomicU64::new(0),
350            written_bytes: Arc::new(AtomicU64::new(0)),
351            stats: self.stats,
352            staging_partition_info: Mutex::new(None),
353        }))
354    }
355
356    /// Opens an existing region in read only mode.
357    ///
358    /// Returns error if the region doesn't exist.
359    pub(crate) async fn open<S: LogStore>(
360        mut self,
361        config: &MitoConfig,
362        wal: &Wal<S>,
363    ) -> Result<MitoRegionRef> {
364        let region_id = self.region_id;
365        let region_dir = self.region_dir();
366        let region = self
367            .maybe_open(config, wal)
368            .await?
369            .with_context(|| EmptyRegionDirSnafu {
370                region_id,
371                region_dir: &region_dir,
372            })?;
373
374        ensure!(
375            region.region_id == self.region_id,
376            RegionCorruptedSnafu {
377                region_id: self.region_id,
378                reason: format!(
379                    "recovered region has different region id {}",
380                    region.region_id
381                ),
382            }
383        );
384
385        Ok(region)
386    }
387
388    fn provider<S: LogStore>(&self, wal_options: &WalOptions) -> Result<Provider> {
389        match wal_options {
390            WalOptions::RaftEngine => {
391                ensure!(
392                    TypeId::of::<RaftEngineLogStore>() == TypeId::of::<S>()
393                        || TypeId::of::<NoopLogStore>() == TypeId::of::<S>(),
394                    error::IncompatibleWalProviderChangeSnafu {
395                        global: "`kafka`",
396                        region: "`raft_engine`",
397                    }
398                );
399                Ok(Provider::raft_engine_provider(self.region_id.as_u64()))
400            }
401            WalOptions::Kafka(options) => {
402                ensure!(
403                    TypeId::of::<KafkaLogStore>() == TypeId::of::<S>()
404                        || TypeId::of::<NoopLogStore>() == TypeId::of::<S>(),
405                    error::IncompatibleWalProviderChangeSnafu {
406                        global: "`raft_engine`",
407                        region: "`kafka`",
408                    }
409                );
410                Ok(Provider::kafka_provider(options.topic.clone()))
411            }
412            WalOptions::Noop => Ok(Provider::noop_provider()),
413        }
414    }
415
416    /// Tries to open the region and returns `None` if the region directory is empty.
417    async fn maybe_open<S: LogStore>(
418        &mut self,
419        config: &MitoConfig,
420        wal: &Wal<S>,
421    ) -> Result<Option<MitoRegionRef>> {
422        let now = Instant::now();
423        let mut region_options = self.options.as_ref().unwrap().clone();
424        let object_storage = get_object_store(&region_options.storage, &self.object_store_manager)?;
425        let mut region_manifest_options =
426            RegionManifestOptions::new(config, &self.region_dir(), &object_storage);
427        // Set manifest cache if available
428        region_manifest_options.manifest_cache = self
429            .cache_manager
430            .as_ref()
431            .and_then(|cm| cm.write_cache())
432            .and_then(|wc| wc.manifest_cache());
433        let Some(manifest_manager) =
434            RegionManifestManager::open(region_manifest_options, &self.stats).await?
435        else {
436            return Ok(None);
437        };
438
439        // Backfill `partition_expr` if missing. Use the backfilled metadata in-memory during this open.
440        let manifest = manifest_manager.manifest();
441        let metadata = if manifest.metadata.partition_expr.is_none()
442            && let Some(expr_json) = self.partition_expr_fetcher.fetch_expr(self.region_id).await
443        {
444            let metadata = manifest.metadata.as_ref().clone();
445            let mut builder = RegionMetadataBuilder::from_existing(metadata);
446            builder.partition_expr_json(Some(expr_json));
447            Arc::new(builder.build().context(InvalidMetadataSnafu)?)
448        } else {
449            manifest.metadata.clone()
450        };
451        // Updates the region options with the manifest.
452        sanitize_region_options(&manifest, &mut region_options);
453
454        let region_id = self.region_id;
455        let provider = self.provider::<S>(&region_options.wal_options)?;
456        let wal_entry_reader = self
457            .wal_entry_reader
458            .take()
459            .unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
460        let on_region_opened = wal.on_region_opened();
461        let object_store = get_object_store(&region_options.storage, &self.object_store_manager)?;
462
463        debug!(
464            "Open region {} at {} with options: {:?}",
465            region_id, self.table_dir, self.options
466        );
467
468        let access_layer = Arc::new(AccessLayer::new(
469            self.table_dir.clone(),
470            self.path_type,
471            object_store,
472            self.puffin_manager_factory.clone(),
473            self.intermediate_manager.clone(),
474        ));
475        let file_purger = create_file_purger(
476            config.gc.enable,
477            self.path_type,
478            self.purge_scheduler.clone(),
479            access_layer.clone(),
480            self.cache_manager.clone(),
481            self.file_ref_manager.clone(),
482        );
483        // We should sanitize the region options before creating a new memtable.
484        let memtable_builder = self
485            .memtable_builder_provider
486            .builder_for_options(&region_options);
487        // Use compaction time window in the manifest if region doesn't provide
488        // the time window option.
489        let part_duration = region_options
490            .compaction
491            .time_window()
492            .or(manifest.compaction_time_window);
493        // Initial memtable id is 0.
494        let mutable = Arc::new(TimePartitions::new(
495            metadata.clone(),
496            memtable_builder.clone(),
497            0,
498            part_duration,
499        ));
500
501        // Updates region options by manifest before creating version.
502        let version_builder = version_builder_from_manifest(
503            &manifest,
504            metadata,
505            file_purger.clone(),
506            mutable,
507            region_options,
508        );
509        let version = version_builder.build();
510        let flushed_entry_id = version.flushed_entry_id;
511        let version_control = Arc::new(VersionControl::new(version));
512
513        let topic_latest_entry_id = if !self.skip_wal_replay {
514            let replay_from_entry_id = self
515                .replay_checkpoint
516                .unwrap_or_default()
517                .max(flushed_entry_id);
518            info!(
519                "Start replaying memtable at replay_from_entry_id: {} for region {}, manifest version: {}, flushed entry id: {}, elapsed: {:?}",
520                replay_from_entry_id,
521                region_id,
522                manifest.manifest_version,
523                flushed_entry_id,
524                now.elapsed()
525            );
526            replay_memtable(
527                &provider,
528                wal_entry_reader,
529                region_id,
530                replay_from_entry_id,
531                &version_control,
532                config.allow_stale_entries,
533                on_region_opened,
534            )
535            .await?;
536            // For remote WAL, we need to set topic_latest_entry_id to current topic's latest entry id.
537            // Only set after the WAL replay is completed.
538
539            if provider.is_remote_wal() && version_control.current().version.memtables.is_empty() {
540                wal.store().latest_entry_id(&provider).unwrap_or(0)
541            } else {
542                0
543            }
544        } else {
545            info!(
546                "Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}, elapsed: {:?}",
547                region_id,
548                manifest.manifest_version,
549                flushed_entry_id,
550                now.elapsed()
551            );
552
553            0
554        };
555
556        if let Some(committed_in_manifest) = manifest.committed_sequence {
557            let committed_after_replay = version_control.committed_sequence();
558            if committed_in_manifest > committed_after_replay {
559                info!(
560                    "Overriding committed sequence, region: {}, flushed_sequence: {}, committed_sequence: {} -> {}",
561                    self.region_id,
562                    version_control.current().version.flushed_sequence,
563                    version_control.committed_sequence(),
564                    committed_in_manifest
565                );
566                version_control.set_committed_sequence(committed_in_manifest);
567            }
568        }
569
570        let now = self.time_provider.current_time_millis();
571
572        let region = MitoRegion {
573            region_id: self.region_id,
574            version_control: version_control.clone(),
575            access_layer: access_layer.clone(),
576            // Region is always opened in read only mode.
577            manifest_ctx: Arc::new(ManifestContext::new(
578                manifest_manager,
579                RegionRoleState::Follower,
580            )),
581            file_purger,
582            provider: provider.clone(),
583            last_flush_millis: AtomicI64::new(now),
584            last_compaction_millis: AtomicI64::new(now),
585            time_provider: self.time_provider.clone(),
586            topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id),
587            written_bytes: Arc::new(AtomicU64::new(0)),
588            stats: self.stats.clone(),
589            // TODO(weny): reload the staging partition info from the manifest.
590            staging_partition_info: Mutex::new(None),
591        };
592
593        let region = Arc::new(region);
594
595        maybe_load_cache(&region, config, &self.cache_manager);
596        maybe_preload_parquet_meta_cache(&region, config, &self.cache_manager);
597
598        Ok(Some(region))
599    }
600}
601
602/// Creates a version builder from a region manifest.
603pub(crate) fn version_builder_from_manifest(
604    manifest: &RegionManifest,
605    metadata: RegionMetadataRef,
606    file_purger: FilePurgerRef,
607    mutable: TimePartitionsRef,
608    region_options: RegionOptions,
609) -> VersionBuilder {
610    VersionBuilder::new(metadata, mutable)
611        .add_files(file_purger, manifest.files.values().cloned())
612        .flushed_entry_id(manifest.flushed_entry_id)
613        .flushed_sequence(manifest.flushed_sequence)
614        .truncated_entry_id(manifest.truncated_entry_id)
615        .compaction_time_window(manifest.compaction_time_window)
616        .options(region_options)
617}
618
619/// Updates region options by persistent options.
620pub(crate) fn sanitize_region_options(manifest: &RegionManifest, options: &mut RegionOptions) {
621    let option_format = options.sst_format.unwrap_or_default();
622    if option_format != manifest.sst_format {
623        common_telemetry::warn!(
624            "Overriding SST format from {:?} to {:?} for region {}",
625            option_format,
626            manifest.sst_format,
627            manifest.metadata.region_id,
628        );
629        options.sst_format = Some(manifest.sst_format);
630    }
631    if let Some(manifest_append_mode) = manifest.append_mode
632        && options.append_mode != manifest_append_mode
633    {
634        common_telemetry::warn!(
635            "Overriding append_mode from {} to {} for region {}",
636            options.append_mode,
637            manifest_append_mode,
638            manifest.metadata.region_id,
639        );
640        options.append_mode = manifest_append_mode;
641    }
642    if options.append_mode && options.merge_mode.take().is_some() {
643        common_telemetry::warn!(
644            "Ignoring merge_mode because append_mode is enabled for region {}",
645            manifest.metadata.region_id,
646        );
647    }
648}
649
650/// Sanitizes open request options before parsing.
651pub(crate) fn sanitize_open_request_options(options: &mut HashMap<String, String>) {
652    let append_mode_enabled = options
653        .get("append_mode")
654        .is_some_and(|v| matches!(v.trim().to_ascii_lowercase().as_str(), "true" | "1"));
655
656    if append_mode_enabled && options.remove("merge_mode").is_some() {
657        common_telemetry::warn!(
658            "Ignoring merge_mode in open request options because append_mode is enabled"
659        );
660    }
661}
662
663/// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store.
664pub fn get_object_store(
665    name: &Option<String>,
666    object_store_manager: &ObjectStoreManagerRef,
667) -> Result<object_store::ObjectStore> {
668    if let Some(name) = name {
669        Ok(object_store_manager
670            .find(name)
671            .with_context(|| ObjectStoreNotFoundSnafu {
672                object_store: name.clone(),
673            })?
674            .clone())
675    } else {
676        Ok(object_store_manager.default_object_store().clone())
677    }
678}
679
680/// Checks whether the recovered region has the same schema as region to create.
681pub(crate) fn check_recovered_region(
682    recovered: &RegionMetadata,
683    region_id: RegionId,
684    column_metadatas: &[ColumnMetadata],
685    primary_key: &[ColumnId],
686) -> Result<()> {
687    if recovered.region_id != region_id {
688        error!(
689            "Recovered region {}, expect region {}",
690            recovered.region_id, region_id
691        );
692        return RegionCorruptedSnafu {
693            region_id,
694            reason: format!(
695                "recovered metadata has different region id {}",
696                recovered.region_id
697            ),
698        }
699        .fail();
700    }
701    if recovered.column_metadatas != column_metadatas {
702        error!(
703            "Unexpected schema in recovered region {}, recovered: {:?}, expect: {:?}",
704            recovered.region_id, recovered.column_metadatas, column_metadatas
705        );
706
707        return RegionCorruptedSnafu {
708            region_id,
709            reason: "recovered metadata has different schema",
710        }
711        .fail();
712    }
713    if recovered.primary_key != primary_key {
714        error!(
715            "Unexpected primary key in recovered region {}, recovered: {:?}, expect: {:?}",
716            recovered.region_id, recovered.primary_key, primary_key
717        );
718
719        return RegionCorruptedSnafu {
720            region_id,
721            reason: "recovered metadata has different primary key",
722        }
723        .fail();
724    }
725
726    Ok(())
727}
728
729/// Replays the mutations from WAL and inserts mutations to memtable of given region.
730pub(crate) async fn replay_memtable<F>(
731    provider: &Provider,
732    mut wal_entry_reader: Box<dyn WalEntryReader>,
733    region_id: RegionId,
734    flushed_entry_id: EntryId,
735    version_control: &VersionControlRef,
736    allow_stale_entries: bool,
737    on_region_opened: F,
738) -> Result<EntryId>
739where
740    F: FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> + Send,
741{
742    let now = Instant::now();
743    let mut rows_replayed = 0;
744    // Last entry id should start from flushed entry id since there might be no
745    // data in the WAL.
746    let mut last_entry_id = flushed_entry_id;
747    let replay_from_entry_id = flushed_entry_id + 1;
748
749    let mut wal_stream = wal_entry_reader.read(provider, replay_from_entry_id)?;
750    while let Some(res) = wal_stream.next().await {
751        let (entry_id, entry) = res?;
752        if entry_id <= flushed_entry_id {
753            warn!(
754                "Stale WAL entries read during replay, region id: {}, flushed entry id: {}, entry id read: {}",
755                region_id, flushed_entry_id, entry_id
756            );
757            ensure!(
758                allow_stale_entries,
759                StaleLogEntrySnafu {
760                    region_id,
761                    flushed_entry_id,
762                    unexpected_entry_id: entry_id,
763                }
764            );
765        }
766        last_entry_id = last_entry_id.max(entry_id);
767
768        let mut region_write_ctx = RegionWriteCtx::new(
769            region_id,
770            version_control,
771            provider.clone(),
772            // For WAL replay, we don't need to track the write bytes rate.
773            None,
774        );
775        for mutation in entry.mutations {
776            rows_replayed += mutation
777                .rows
778                .as_ref()
779                .map(|rows| rows.rows.len())
780                .unwrap_or(0);
781            region_write_ctx.push_mutation(
782                mutation.op_type,
783                mutation.rows,
784                mutation.write_hint,
785                OptionOutputTx::none(),
786                // We should respect the sequence in WAL during replay.
787                Some(mutation.sequence),
788            );
789        }
790
791        for bulk_entry in entry.bulk_entries {
792            let part = BulkPart::try_from(bulk_entry)?;
793            rows_replayed += part.num_rows();
794            // During replay, we should adopt the sequence from WAL.
795            let bulk_sequence_from_wal = part.sequence;
796            ensure!(
797                region_write_ctx.push_bulk(
798                    OptionOutputTx::none(),
799                    part,
800                    Some(bulk_sequence_from_wal)
801                ),
802                RegionCorruptedSnafu {
803                    region_id,
804                    reason: "unable to replay memtable with bulk entries",
805                }
806            );
807        }
808
809        // set next_entry_id and write to memtable.
810        region_write_ctx.set_next_entry_id(last_entry_id + 1);
811        region_write_ctx.write_memtable().await;
812        region_write_ctx.write_bulk().await;
813    }
814
815    // TODO(weny): We need to update `flushed_entry_id` in the region manifest
816    // to avoid reading potentially incomplete entries in the future.
817    (on_region_opened)(region_id, flushed_entry_id, provider).await?;
818
819    let series_count = version_control.current().series_count();
820    info!(
821        "Replay WAL for region: {}, provider: {:?}, rows recovered: {}, replay from entry id: {}, last entry id: {}, total timeseries replayed: {}, elapsed: {:?}",
822        region_id,
823        provider,
824        rows_replayed,
825        replay_from_entry_id,
826        last_entry_id,
827        series_count,
828        now.elapsed()
829    );
830    Ok(last_entry_id)
831}
832
833/// A task to load and fill the region file cache.
834pub(crate) struct RegionLoadCacheTask {
835    region: MitoRegionRef,
836}
837
838impl RegionLoadCacheTask {
839    pub(crate) fn new(region: MitoRegionRef) -> Self {
840        Self { region }
841    }
842
843    /// Fills the file cache with index files from the region.
844    pub(crate) async fn fill_cache(&self, file_cache: &FileCache) {
845        let region_id = self.region.region_id;
846        let table_dir = self.region.access_layer.table_dir();
847        let path_type = self.region.access_layer.path_type();
848        let object_store = self.region.access_layer.object_store();
849        let version_control = &self.region.version_control;
850
851        // Collects IndexKeys, file sizes, and max timestamps for files that need to be downloaded
852        let mut files_to_download = Vec::new();
853        let mut files_already_cached = 0;
854
855        {
856            let version = version_control.current().version;
857            for level in version.ssts.levels() {
858                for file_handle in level.files.values() {
859                    let file_meta = file_handle.meta_ref();
860                    if file_meta.exists_index() {
861                        let puffin_key = IndexKey::new(
862                            file_meta.region_id,
863                            file_meta.file_id,
864                            FileType::Puffin(file_meta.index_version),
865                        );
866
867                        if !file_cache.contains_key(&puffin_key) {
868                            files_to_download.push((
869                                puffin_key,
870                                file_meta.index_file_size,
871                                file_meta.time_range.1, // max timestamp
872                            ));
873                        } else {
874                            files_already_cached += 1;
875                        }
876                    }
877                }
878            }
879            // Releases the Version after the scope to avoid holding the memtables and file handles
880            // for a long time.
881        }
882
883        // Sorts files by max timestamp in descending order to loads latest files first
884        files_to_download.sort_by(|a, b| b.2.cmp(&a.2));
885
886        let total_files = files_to_download.len() as i64;
887
888        info!(
889            "Starting background index cache preload for region {}, total_files_to_download: {}, files_already_cached: {}",
890            region_id, total_files, files_already_cached
891        );
892
893        CACHE_FILL_PENDING_FILES.add(total_files);
894
895        let mut files_downloaded = 0;
896        let mut files_skipped = 0;
897
898        for (puffin_key, file_size, max_timestamp) in files_to_download {
899            let current_size = file_cache.puffin_cache_size();
900            let capacity = file_cache.puffin_cache_capacity();
901            let region_state = self.region.state();
902            if !can_load_cache(region_state) {
903                info!(
904                    "Stopping index cache by state: {:?}, region: {}, current_size: {}, capacity: {}",
905                    region_state, region_id, current_size, capacity
906                );
907                break;
908            }
909
910            // Checks if adding this file would exceed capacity
911            if current_size + file_size > capacity {
912                info!(
913                    "Stopping index cache preload due to capacity limit, region: {}, file_id: {}, current_size: {}, file_size: {}, capacity: {}, file_timestamp: {:?}",
914                    region_id, puffin_key.file_id, current_size, file_size, capacity, max_timestamp
915                );
916                files_skipped = (total_files - files_downloaded) as usize;
917                CACHE_FILL_PENDING_FILES.sub(total_files - files_downloaded);
918                break;
919            }
920
921            let index_version = if let FileType::Puffin(version) = puffin_key.file_type {
922                version
923            } else {
924                unreachable!("`files_to_download` should only contains Puffin files");
925            };
926            let index_id = RegionIndexId::new(
927                RegionFileId::new(puffin_key.region_id, puffin_key.file_id),
928                index_version,
929            );
930
931            let index_remote_path = location::index_file_path(table_dir, index_id, path_type);
932
933            match file_cache
934                .download(puffin_key, &index_remote_path, object_store, file_size)
935                .await
936            {
937                Ok(_) => {
938                    debug!(
939                        "Downloaded index file to write cache, region: {}, file_id: {}",
940                        region_id, puffin_key.file_id
941                    );
942                    files_downloaded += 1;
943                    CACHE_FILL_DOWNLOADED_FILES.inc_by(1);
944                    CACHE_FILL_PENDING_FILES.dec();
945                }
946                Err(e) => {
947                    warn!(
948                        e; "Failed to download index file to write cache, region: {}, file_id: {}",
949                        region_id, puffin_key.file_id
950                    );
951                    CACHE_FILL_PENDING_FILES.dec();
952                }
953            }
954        }
955
956        info!(
957            "Completed background cache fill task for region {}, total_files: {}, files_downloaded: {}, files_already_cached: {}, files_skipped: {}",
958            region_id, total_files, files_downloaded, files_already_cached, files_skipped
959        );
960    }
961}
962
963/// Loads all index (Puffin) files from the version into the write cache.
964fn maybe_load_cache(
965    region: &MitoRegionRef,
966    config: &MitoConfig,
967    cache_manager: &Option<CacheManagerRef>,
968) {
969    let Some(cache_manager) = cache_manager else {
970        return;
971    };
972    let Some(write_cache) = cache_manager.write_cache() else {
973        return;
974    };
975
976    let preload_enabled = config.preload_index_cache;
977    if !preload_enabled {
978        return;
979    }
980
981    let task = RegionLoadCacheTask::new(region.clone());
982    write_cache.load_region_cache(task);
983}
984
985/// Preloads Parquet metadata into the in-memory SST meta cache on region open.
986///
987/// This improves the latency of the first query after server start by avoiding large Parquet
988/// metadata reads on demand.
989///
990/// The preload is best-effort:
991/// - Always tries to warm from the local write cache (file cache) first.
992/// - If the region storage backend is local filesystem (`Scheme::Fs`), it may also load metadata
993///   directly from the local store.
994/// - It will not fetch metadata from remote object stores (S3/GCS/OSS/...).
995async fn preload_parquet_meta_cache_for_files(
996    region_id: RegionId,
997    cache_manager: CacheManagerRef,
998    sst_meta_cache_capacity: u64,
999    table_dir: String,
1000    path_type: PathType,
1001    object_store: object_store::ObjectStore,
1002    mut files: Vec<FileHandle>,
1003) -> usize {
1004    if !cache_manager.sst_meta_cache_enabled()
1005        || sst_meta_cache_capacity == 0
1006        || cache_manager.sst_meta_cache_weighted_size() >= sst_meta_cache_capacity
1007    {
1008        return 0;
1009    }
1010
1011    let allow_direct_load = matches!(object_store.info().scheme(), object_store::Scheme::Fs);
1012
1013    // Sort by time range so we can prefer preloading newer files first.
1014    files.sort_by(|a, b| b.meta_ref().time_range.1.cmp(&a.meta_ref().time_range.1));
1015
1016    let mut loaded = 0usize;
1017    for file_handle in files {
1018        // Stop when the shared SST meta cache is full.
1019        if cache_manager.sst_meta_cache_weighted_size() >= sst_meta_cache_capacity {
1020            break;
1021        }
1022
1023        let file_id = file_handle.file_id();
1024        let mut cache_metrics = MetadataCacheMetrics::default();
1025        if cache_manager
1026            .get_parquet_meta_data(file_id, &mut cache_metrics, Default::default())
1027            .await
1028            .is_some()
1029        {
1030            // Metadata is either already in memory or loaded from file cache.
1031            if cache_metrics.mem_cache_hit == 0 {
1032                loaded += 1;
1033            }
1034            continue;
1035        }
1036
1037        if !allow_direct_load {
1038            continue;
1039        }
1040
1041        let file_size = file_handle.meta_ref().file_size;
1042        let file_path = file_handle.file_path(&table_dir, path_type);
1043        let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
1044        match loader.load(&mut cache_metrics).await {
1045            Ok(metadata) => {
1046                cache_manager.put_parquet_meta_data(file_id, Arc::new(metadata), None);
1047                loaded += 1;
1048            }
1049            Err(err) => {
1050                // Preloading is best-effort. Failure shouldn't affect region open.
1051                warn!(
1052                    err; "Failed to preload parquet metadata from local store, region: {}, file: {}",
1053                    region_id, file_path
1054                );
1055            }
1056        }
1057    }
1058
1059    loaded
1060}
1061
1062fn maybe_preload_parquet_meta_cache(
1063    region: &MitoRegionRef,
1064    config: &MitoConfig,
1065    cache_manager: &Option<CacheManagerRef>,
1066) {
1067    let Some(cache_manager) = cache_manager else {
1068        return;
1069    };
1070    if !cache_manager.sst_meta_cache_enabled() {
1071        return;
1072    }
1073
1074    // Skip if SST meta cache is disabled.
1075    if config.sst_meta_cache_size.as_bytes() == 0 {
1076        return;
1077    }
1078    if !config.preload_index_cache {
1079        return;
1080    }
1081
1082    let region = region.clone();
1083    let cache_manager = cache_manager.clone();
1084    let sst_meta_cache_capacity = config.sst_meta_cache_size.as_bytes();
1085
1086    tokio::spawn(async move {
1087        // Safety: semaphore must exist.
1088        let _permit = PARQUET_META_PRELOAD_SEMAPHORE.acquire().await.unwrap();
1089
1090        let region_id = region.region_id;
1091        let table_dir = region.access_layer.table_dir().to_string();
1092        let path_type = region.access_layer.path_type();
1093        let object_store = region.access_layer.object_store().clone();
1094
1095        // Collect SST files. Do not hold the version longer than needed.
1096        let mut files = Vec::new();
1097        {
1098            let version = region.version_control.current().version;
1099            for level in version.ssts.levels() {
1100                for file_handle in level.files.values() {
1101                    files.push(file_handle.clone());
1102                }
1103            }
1104        }
1105        let preloading_start = Instant::now();
1106        let loaded = preload_parquet_meta_cache_for_files(
1107            region_id,
1108            cache_manager,
1109            sst_meta_cache_capacity,
1110            table_dir,
1111            path_type,
1112            object_store,
1113            files,
1114        )
1115        .await;
1116        let preloading_cost = preloading_start.elapsed();
1117
1118        if loaded > 0 {
1119            info!(
1120                "Preloaded parquet metadata for region {}, loaded_files: {}, elapsed_ms: {}",
1121                region_id,
1122                loaded,
1123                preloading_cost.as_millis()
1124            );
1125        }
1126    });
1127}
1128
1129fn can_load_cache(state: RegionRoleState) -> bool {
1130    match state {
1131        RegionRoleState::Leader(RegionLeaderState::Writable)
1132        | RegionRoleState::Leader(RegionLeaderState::Staging)
1133        | RegionRoleState::Leader(RegionLeaderState::Altering)
1134        | RegionRoleState::Leader(RegionLeaderState::EnteringStaging)
1135        | RegionRoleState::Leader(RegionLeaderState::Editing)
1136        | RegionRoleState::Follower => true,
1137        // The region will be closed soon if it is downgrading.
1138        RegionRoleState::Leader(RegionLeaderState::Downgrading)
1139        | RegionRoleState::Leader(RegionLeaderState::Dropping)
1140        | RegionRoleState::Leader(RegionLeaderState::Truncating) => false,
1141    }
1142}
1143
1144#[cfg(test)]
1145mod tests {
1146    use std::sync::Arc;
1147
1148    use common_base::readable_size::ReadableSize;
1149    use common_test_util::temp_dir::create_temp_dir;
1150    use common_time::Timestamp;
1151    use datatypes::arrow::array::{ArrayRef, Int64Array};
1152    use datatypes::arrow::record_batch::RecordBatch;
1153    use object_store::ObjectStore;
1154    use object_store::services::{Fs, Memory};
1155    use parquet::arrow::ArrowWriter;
1156    use parquet::file::metadata::KeyValue;
1157    use parquet::file::properties::WriterProperties;
1158    use store_api::region_request::PathType;
1159    use store_api::storage::{FileId, RegionId};
1160
1161    use super::preload_parquet_meta_cache_for_files;
1162    use crate::cache::CacheManager;
1163    use crate::cache::file_cache::{FileType, IndexKey};
1164    use crate::sst::file::{FileHandle, FileMeta};
1165    use crate::sst::file_purger::NoopFilePurger;
1166    use crate::sst::parquet::PARQUET_METADATA_KEY;
1167    use crate::test_util::TestEnv;
1168    use crate::test_util::sst_util::sst_region_metadata;
1169
1170    fn sst_parquet_bytes(batch: &RecordBatch) -> Vec<u8> {
1171        let key_value_meta = KeyValue::new(
1172            PARQUET_METADATA_KEY.to_string(),
1173            sst_region_metadata().to_json().unwrap(),
1174        );
1175        let props = WriterProperties::builder()
1176            .set_key_value_metadata(Some(vec![key_value_meta]))
1177            .build();
1178
1179        let mut parquet_bytes = Vec::new();
1180        let mut writer =
1181            ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), Some(props)).unwrap();
1182        writer.write(batch).unwrap();
1183        writer.close().unwrap();
1184
1185        parquet_bytes
1186    }
1187
1188    #[tokio::test]
1189    async fn test_preload_parquet_meta_cache_uses_file_cache() {
1190        let env = TestEnv::new().await;
1191
1192        let local_store = ObjectStore::new(Memory::default()).unwrap().finish();
1193        let write_cache = env
1194            .create_write_cache(local_store, ReadableSize::mb(1024))
1195            .await;
1196        let cache_manager = Arc::new(
1197            CacheManager::builder()
1198                .sst_meta_cache_size(1024 * 1024)
1199                .write_cache(Some(write_cache.clone()))
1200                .build(),
1201        );
1202
1203        let region_id = RegionId::new(1, 1);
1204        let file_id = FileId::random();
1205
1206        let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
1207        let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
1208        let parquet_bytes = sst_parquet_bytes(&batch);
1209        let file_size = parquet_bytes.len() as u64;
1210
1211        let file_meta = FileMeta {
1212            region_id,
1213            file_id,
1214            time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)),
1215            level: 0,
1216            file_size,
1217            max_row_group_uncompressed_size: 0,
1218            available_indexes: Default::default(),
1219            indexes: vec![],
1220            index_file_size: 0,
1221            index_version: 0,
1222            num_rows: 3,
1223            num_row_groups: 1,
1224            sequence: None,
1225            partition_expr: None,
1226            num_series: 0,
1227        };
1228        let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
1229
1230        let table_dir = "test_table";
1231        let path_type = PathType::Bare;
1232        let remote_path = file_handle.file_path(table_dir, path_type);
1233
1234        let source_store = ObjectStore::new(Memory::default()).unwrap().finish();
1235        source_store
1236            .write(&remote_path, parquet_bytes)
1237            .await
1238            .unwrap();
1239
1240        // Put the parquet file into the write cache, so file cache contains metadata.
1241        let index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
1242        write_cache
1243            .file_cache()
1244            .download(index_key, &remote_path, &source_store, file_size)
1245            .await
1246            .unwrap();
1247
1248        let region_file_id = file_handle.file_id();
1249        assert!(
1250            cache_manager
1251                .get_parquet_meta_data_from_mem_cache(region_file_id)
1252                .is_none()
1253        );
1254
1255        let loaded = preload_parquet_meta_cache_for_files(
1256            region_id,
1257            cache_manager.clone(),
1258            1024 * 1024,
1259            table_dir.to_string(),
1260            path_type,
1261            source_store.clone(),
1262            vec![file_handle],
1263        )
1264        .await;
1265
1266        // Should warm the in-memory cache from the local file cache.
1267        assert_eq!(loaded, 1);
1268        assert!(
1269            cache_manager
1270                .get_parquet_meta_data_from_mem_cache(region_file_id)
1271                .is_some()
1272        );
1273    }
1274
1275    #[tokio::test]
1276    async fn test_preload_parquet_meta_cache_skips_files_not_in_file_cache() {
1277        let cache_manager = Arc::new(
1278            CacheManager::builder()
1279                .sst_meta_cache_size(1024 * 1024)
1280                .build(),
1281        );
1282
1283        let region_id = RegionId::new(1, 1);
1284        let file_id = FileId::random();
1285
1286        // Without a local file cache entry, preloading should skip the file.
1287        let file_meta = FileMeta {
1288            region_id,
1289            file_id,
1290            time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)),
1291            level: 0,
1292            file_size: 0,
1293            max_row_group_uncompressed_size: 0,
1294            available_indexes: Default::default(),
1295            indexes: vec![],
1296            index_file_size: 0,
1297            index_version: 0,
1298            num_rows: 3,
1299            num_row_groups: 1,
1300            sequence: None,
1301            partition_expr: None,
1302            num_series: 0,
1303        };
1304        let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
1305
1306        let table_dir = "test_table";
1307        let path_type = PathType::Bare;
1308        let remote_path = file_handle.file_path(table_dir, path_type);
1309
1310        // Even if the remote object store has the file, we should not preload from it.
1311        let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
1312        object_store
1313            .write(&remote_path, b"noop".as_slice())
1314            .await
1315            .unwrap();
1316
1317        let region_file_id = file_handle.file_id();
1318        assert!(
1319            cache_manager
1320                .get_parquet_meta_data_from_mem_cache(region_file_id)
1321                .is_none()
1322        );
1323
1324        let loaded = preload_parquet_meta_cache_for_files(
1325            region_id,
1326            cache_manager.clone(),
1327            1024 * 1024,
1328            table_dir.to_string(),
1329            path_type,
1330            object_store,
1331            vec![file_handle],
1332        )
1333        .await;
1334
1335        assert_eq!(loaded, 0);
1336        assert!(
1337            cache_manager
1338                .get_parquet_meta_data_from_mem_cache(region_file_id)
1339                .is_none()
1340        );
1341    }
1342
1343    #[tokio::test]
1344    async fn test_preload_parquet_meta_cache_loads_from_local_fs() {
1345        let cache_manager = Arc::new(
1346            CacheManager::builder()
1347                .sst_meta_cache_size(1024 * 1024)
1348                .build(),
1349        );
1350
1351        let region_id = RegionId::new(1, 1);
1352        let file_id = FileId::random();
1353
1354        let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
1355        let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
1356        let parquet_bytes = sst_parquet_bytes(&batch);
1357
1358        // file_size is 0 when it's missing/defaulted in manifests; MetadataLoader::load will stat
1359        // the local filesystem to retrieve it.
1360        let file_meta = FileMeta {
1361            region_id,
1362            file_id,
1363            time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)),
1364            level: 0,
1365            file_size: 0,
1366            max_row_group_uncompressed_size: 0,
1367            available_indexes: Default::default(),
1368            indexes: vec![],
1369            index_file_size: 0,
1370            index_version: 0,
1371            num_rows: 3,
1372            num_row_groups: 1,
1373            sequence: None,
1374            partition_expr: None,
1375            num_series: 0,
1376        };
1377        let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
1378
1379        let table_dir = "test_table";
1380        let path_type = PathType::Bare;
1381        let file_path = file_handle.file_path(table_dir, path_type);
1382
1383        let root = create_temp_dir("parquet-meta-preload");
1384        let object_store = ObjectStore::new(Fs::default().root(root.path().to_str().unwrap()))
1385            .unwrap()
1386            .finish();
1387        object_store.write(&file_path, parquet_bytes).await.unwrap();
1388
1389        let region_file_id = file_handle.file_id();
1390        assert!(
1391            cache_manager
1392                .get_parquet_meta_data_from_mem_cache(region_file_id)
1393                .is_none()
1394        );
1395
1396        let loaded = preload_parquet_meta_cache_for_files(
1397            region_id,
1398            cache_manager.clone(),
1399            1024 * 1024,
1400            table_dir.to_string(),
1401            path_type,
1402            object_store,
1403            vec![file_handle],
1404        )
1405        .await;
1406
1407        assert_eq!(loaded, 1);
1408        assert!(
1409            cache_manager
1410                .get_parquet_meta_data_from_mem_cache(region_file_id)
1411                .is_some()
1412        );
1413    }
1414}