Skip to main content

mito2/region/
opener.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Region opener.
16
17use std::any::TypeId;
18use std::collections::HashMap;
19use std::sync::atomic::{AtomicI64, AtomicU64};
20use std::sync::{Arc, LazyLock};
21use std::time::Instant;
22
23use common_telemetry::{debug, error, info, warn};
24use common_wal::options::WalOptions;
25use futures::StreamExt;
26use futures::future::BoxFuture;
27use log_store::kafka::log_store::KafkaLogStore;
28use log_store::noop::log_store::NoopLogStore;
29use log_store::raft_engine::log_store::RaftEngineLogStore;
30use object_store::manager::ObjectStoreManagerRef;
31use object_store::util::normalize_dir;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::logstore::LogStore;
34use store_api::logstore::provider::Provider;
35use store_api::metadata::{
36    ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
37};
38use store_api::region_engine::RegionRole;
39use store_api::region_request::PathType;
40use store_api::storage::{ColumnId, RegionId};
41use tokio::sync::Semaphore;
42
43use crate::access_layer::AccessLayer;
44use crate::cache::CacheManagerRef;
45use crate::cache::file_cache::{FileCache, FileType, IndexKey};
46use crate::config::MitoConfig;
47use crate::error;
48use crate::error::{
49    EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu,
50    Result, StaleLogEntrySnafu,
51};
52use crate::manifest::action::RegionManifest;
53use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
54use crate::memtable::MemtableBuilderProvider;
55use crate::memtable::bulk::part::BulkPart;
56use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef};
57use crate::metrics::{CACHE_FILL_DOWNLOADED_FILES, CACHE_FILL_PENDING_FILES};
58use crate::region::options::RegionOptions;
59use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
60use crate::region::{
61    ManifestContext, ManifestStats, MitoRegion, MitoRegionRef, RegionLeaderState, RegionRoleState,
62};
63use crate::region_write_ctx::RegionWriteCtx;
64use crate::request::OptionOutputTx;
65use crate::schedule::scheduler::SchedulerRef;
66use crate::sst::FormatType;
67use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId};
68use crate::sst::file_purger::{FilePurgerRef, create_file_purger};
69use crate::sst::file_ref::FileReferenceManagerRef;
70use crate::sst::index::intermediate::IntermediateManager;
71use crate::sst::index::puffin_manager::PuffinManagerFactory;
72use crate::sst::location::{self, region_dir_from_table_dir};
73use crate::sst::parquet::metadata::{MetadataLoader, extract_primary_key_range};
74use crate::sst::parquet::reader::MetadataCacheMetrics;
75use crate::time_provider::TimeProviderRef;
76use crate::wal::entry_reader::WalEntryReader;
77use crate::wal::{EntryId, Wal};
78
79const PARQUET_META_PRELOAD_CONCURRENCY: usize = 8;
80
81static PARQUET_META_PRELOAD_SEMAPHORE: LazyLock<Semaphore> =
82    LazyLock::new(|| Semaphore::new(PARQUET_META_PRELOAD_CONCURRENCY));
83
84/// A fetcher to retrieve partition expr for a region.
85///
86/// Compatibility: older regions didn't persist `partition_expr` in engine metadata,
87/// while newer ones do. On open, we backfill it via this fetcher and persist it
88/// to the manifest so future opens don't need refetching.
89#[async_trait::async_trait]
90pub trait PartitionExprFetcher {
91    async fn fetch_expr(&self, region_id: RegionId) -> Option<String>;
92}
93
94pub type PartitionExprFetcherRef = Arc<dyn PartitionExprFetcher + Send + Sync>;
95
96/// Builder to create a new [MitoRegion] or open an existing one.
97pub(crate) struct RegionOpener {
98    region_id: RegionId,
99    metadata_builder: Option<RegionMetadataBuilder>,
100    memtable_builder_provider: MemtableBuilderProvider,
101    object_store_manager: ObjectStoreManagerRef,
102    table_dir: String,
103    path_type: PathType,
104    purge_scheduler: SchedulerRef,
105    options: Option<RegionOptions>,
106    cache_manager: Option<CacheManagerRef>,
107    skip_wal_replay: bool,
108    puffin_manager_factory: PuffinManagerFactory,
109    intermediate_manager: IntermediateManager,
110    time_provider: TimeProviderRef,
111    stats: ManifestStats,
112    wal_entry_reader: Option<Box<dyn WalEntryReader>>,
113    replay_checkpoint: Option<u64>,
114    file_ref_manager: FileReferenceManagerRef,
115    partition_expr_fetcher: PartitionExprFetcherRef,
116}
117
118impl RegionOpener {
119    /// Returns a new opener.
120    // TODO(LFC): Reduce the number of arguments.
121    #[allow(clippy::too_many_arguments)]
122    pub(crate) fn new(
123        region_id: RegionId,
124        table_dir: &str,
125        path_type: PathType,
126        memtable_builder_provider: MemtableBuilderProvider,
127        object_store_manager: ObjectStoreManagerRef,
128        purge_scheduler: SchedulerRef,
129        puffin_manager_factory: PuffinManagerFactory,
130        intermediate_manager: IntermediateManager,
131        time_provider: TimeProviderRef,
132        file_ref_manager: FileReferenceManagerRef,
133        partition_expr_fetcher: PartitionExprFetcherRef,
134    ) -> RegionOpener {
135        RegionOpener {
136            region_id,
137            metadata_builder: None,
138            memtable_builder_provider,
139            object_store_manager,
140            table_dir: normalize_dir(table_dir),
141            path_type,
142            purge_scheduler,
143            options: None,
144            cache_manager: None,
145            skip_wal_replay: false,
146            puffin_manager_factory,
147            intermediate_manager,
148            time_provider,
149            stats: Default::default(),
150            wal_entry_reader: None,
151            replay_checkpoint: None,
152            file_ref_manager,
153            partition_expr_fetcher,
154        }
155    }
156
157    /// Sets metadata builder of the region to create.
158    pub(crate) fn metadata_builder(mut self, builder: RegionMetadataBuilder) -> Self {
159        self.metadata_builder = Some(builder);
160        self
161    }
162
163    /// Computes the region directory from table_dir and region_id.
164    fn region_dir(&self) -> String {
165        region_dir_from_table_dir(&self.table_dir, self.region_id, self.path_type)
166    }
167
168    /// Builds the region metadata.
169    ///
170    /// # Panics
171    /// - Panics if `options` is not set.
172    /// - Panics if `metadata_builder` is not set.
173    fn build_metadata(&mut self) -> Result<RegionMetadata> {
174        let options = self.options.as_ref().unwrap();
175        let mut metadata_builder = self.metadata_builder.take().unwrap();
176        metadata_builder.primary_key_encoding(options.primary_key_encoding());
177        metadata_builder.build().context(InvalidMetadataSnafu)
178    }
179
180    /// Parses and sets options for the region.
181    pub(crate) fn parse_options(self, options: HashMap<String, String>) -> Result<Self> {
182        self.options(RegionOptions::try_from(&options)?)
183    }
184
185    /// Sets the replay checkpoint for the region.
186    pub(crate) fn replay_checkpoint(mut self, replay_checkpoint: Option<u64>) -> Self {
187        self.replay_checkpoint = replay_checkpoint;
188        self
189    }
190
191    /// If a [WalEntryReader] is set, the [RegionOpener] will use [WalEntryReader] instead of
192    /// constructing a new one from scratch.
193    pub(crate) fn wal_entry_reader(
194        mut self,
195        wal_entry_reader: Option<Box<dyn WalEntryReader>>,
196    ) -> Self {
197        self.wal_entry_reader = wal_entry_reader;
198        self
199    }
200
201    /// Sets options for the region.
202    pub(crate) fn options(mut self, options: RegionOptions) -> Result<Self> {
203        options.validate()?;
204        self.options = Some(options);
205        Ok(self)
206    }
207
208    /// Sets the cache manager for the region.
209    pub(crate) fn cache(mut self, cache_manager: Option<CacheManagerRef>) -> Self {
210        self.cache_manager = cache_manager;
211        self
212    }
213
214    /// Sets the `skip_wal_replay`.
215    pub(crate) fn skip_wal_replay(mut self, skip: bool) -> Self {
216        self.skip_wal_replay = skip;
217        self
218    }
219
220    /// Writes region manifest and creates a new region if it does not exist.
221    /// Opens the region if it already exists.
222    ///
223    /// # Panics
224    /// - Panics if `metadata_builder` is not set.
225    /// - Panics if `options` is not set.
226    pub(crate) async fn create_or_open<S: LogStore>(
227        mut self,
228        config: &MitoConfig,
229        wal: &Wal<S>,
230    ) -> Result<MitoRegionRef> {
231        let region_id = self.region_id;
232        let region_dir = self.region_dir();
233        let metadata = self.build_metadata()?;
234        // Tries to open the region.
235        match self.maybe_open(config, wal).await {
236            Ok(Some(region)) => {
237                let recovered = region.metadata();
238                // Checks the schema of the region.
239                let expect = &metadata;
240                check_recovered_region(
241                    &recovered,
242                    expect.region_id,
243                    &expect.column_metadatas,
244                    &expect.primary_key,
245                )?;
246                // To keep consistency with Create behavior, set the opened Region to RegionRole::Leader.
247                region.set_role(RegionRole::Leader);
248
249                return Ok(region);
250            }
251            Ok(None) => {
252                debug!(
253                    "No data under directory {}, region_id: {}",
254                    region_dir, self.region_id
255                );
256            }
257            Err(e) => {
258                warn!(e;
259                    "Failed to open region {} before creating it, region_dir: {}",
260                    self.region_id, region_dir
261                );
262            }
263        }
264        // Safety: must be set before calling this method.
265        let mut options = self.options.take().unwrap();
266        let object_store = get_object_store(&options.storage, &self.object_store_manager)?;
267        let provider = self.provider::<S>(&options.wal_options)?;
268        let metadata = Arc::new(metadata);
269        // Sets the sst_format based on options or flat_format flag
270        let sst_format = if let Some(format) = options.sst_format {
271            format
272        } else if config.default_flat_format {
273            options.sst_format = Some(FormatType::Flat);
274            FormatType::Flat
275        } else {
276            // Default to PrimaryKeyParquet for newly created regions
277            options.sst_format = Some(FormatType::PrimaryKey);
278            FormatType::PrimaryKey
279        };
280        // Create a manifest manager for this region and writes regions to the manifest file.
281        let mut region_manifest_options =
282            RegionManifestOptions::new(config, &region_dir, &object_store);
283        // Set manifest cache if available
284        region_manifest_options.manifest_cache = self
285            .cache_manager
286            .as_ref()
287            .and_then(|cm| cm.write_cache())
288            .and_then(|wc| wc.manifest_cache());
289        // For remote WAL, we need to set flushed_entry_id to current topic's latest entry id.
290        let flushed_entry_id = provider.initial_flushed_entry_id::<S>(wal.store());
291        let manifest_manager = RegionManifestManager::new(
292            metadata.clone(),
293            flushed_entry_id,
294            region_manifest_options,
295            sst_format,
296            &self.stats,
297        )
298        .await?;
299
300        let memtable_builder = self.memtable_builder_provider.builder_for_options(&options);
301        let part_duration = options.compaction.time_window();
302        // Initial memtable id is 0.
303        let mutable = Arc::new(TimePartitions::new(
304            metadata.clone(),
305            memtable_builder.clone(),
306            0,
307            part_duration,
308        ));
309
310        debug!(
311            "Create region {} with options: {:?}, default_flat_format: {}",
312            region_id, options, config.default_flat_format
313        );
314
315        let version = VersionBuilder::new(metadata, mutable)
316            .options(options)
317            .build();
318        let version_control = Arc::new(VersionControl::new(version));
319        let access_layer = Arc::new(AccessLayer::new(
320            self.table_dir.clone(),
321            self.path_type,
322            object_store,
323            self.puffin_manager_factory,
324            self.intermediate_manager,
325        ));
326        let now = self.time_provider.current_time_millis();
327
328        Ok(Arc::new(MitoRegion {
329            region_id,
330            version_control,
331            access_layer: access_layer.clone(),
332            // Region is writable after it is created.
333            manifest_ctx: Arc::new(ManifestContext::new(
334                manifest_manager,
335                RegionRoleState::Leader(RegionLeaderState::Writable),
336            )),
337            file_purger: create_file_purger(
338                config.gc.enable,
339                self.path_type,
340                self.purge_scheduler,
341                access_layer,
342                self.cache_manager,
343                self.file_ref_manager.clone(),
344            ),
345            provider,
346            last_flush_millis: AtomicI64::new(now),
347            last_compaction_millis: AtomicI64::new(now),
348            time_provider: self.time_provider.clone(),
349            topic_latest_entry_id: AtomicU64::new(0),
350            written_bytes: Arc::new(AtomicU64::new(0)),
351            stats: self.stats,
352        }))
353    }
354
355    /// Opens an existing region in read only mode.
356    ///
357    /// Returns error if the region doesn't exist.
358    pub(crate) async fn open<S: LogStore>(
359        mut self,
360        config: &MitoConfig,
361        wal: &Wal<S>,
362    ) -> Result<MitoRegionRef> {
363        let region_id = self.region_id;
364        let region_dir = self.region_dir();
365        let region = self
366            .maybe_open(config, wal)
367            .await?
368            .with_context(|| EmptyRegionDirSnafu {
369                region_id,
370                region_dir: &region_dir,
371            })?;
372
373        ensure!(
374            region.region_id == self.region_id,
375            RegionCorruptedSnafu {
376                region_id: self.region_id,
377                reason: format!(
378                    "recovered region has different region id {}",
379                    region.region_id
380                ),
381            }
382        );
383
384        Ok(region)
385    }
386
387    fn provider<S: LogStore>(&self, wal_options: &WalOptions) -> Result<Provider> {
388        match wal_options {
389            WalOptions::RaftEngine => {
390                ensure!(
391                    TypeId::of::<RaftEngineLogStore>() == TypeId::of::<S>()
392                        || TypeId::of::<NoopLogStore>() == TypeId::of::<S>(),
393                    error::IncompatibleWalProviderChangeSnafu {
394                        global: "`kafka`",
395                        region: "`raft_engine`",
396                    }
397                );
398                Ok(Provider::raft_engine_provider(self.region_id.as_u64()))
399            }
400            WalOptions::Kafka(options) => {
401                ensure!(
402                    TypeId::of::<KafkaLogStore>() == TypeId::of::<S>()
403                        || TypeId::of::<NoopLogStore>() == TypeId::of::<S>(),
404                    error::IncompatibleWalProviderChangeSnafu {
405                        global: "`raft_engine`",
406                        region: "`kafka`",
407                    }
408                );
409                Ok(Provider::kafka_provider(options.topic.clone()))
410            }
411            WalOptions::Noop => Ok(Provider::noop_provider()),
412        }
413    }
414
415    /// Tries to open the region and returns `None` if the region directory is empty.
416    async fn maybe_open<S: LogStore>(
417        &mut self,
418        config: &MitoConfig,
419        wal: &Wal<S>,
420    ) -> Result<Option<MitoRegionRef>> {
421        let now = Instant::now();
422        let mut region_options = self.options.as_ref().unwrap().clone();
423        let object_storage = get_object_store(&region_options.storage, &self.object_store_manager)?;
424        let mut region_manifest_options =
425            RegionManifestOptions::new(config, &self.region_dir(), &object_storage);
426        // Set manifest cache if available
427        region_manifest_options.manifest_cache = self
428            .cache_manager
429            .as_ref()
430            .and_then(|cm| cm.write_cache())
431            .and_then(|wc| wc.manifest_cache());
432        let Some(manifest_manager) =
433            RegionManifestManager::open(region_manifest_options, &self.stats).await?
434        else {
435            return Ok(None);
436        };
437
438        // Backfill `partition_expr` if missing. Use the backfilled metadata in-memory during this open.
439        let manifest = manifest_manager.manifest();
440        let metadata = if manifest.metadata.partition_expr.is_none()
441            && let Some(expr_json) = self.partition_expr_fetcher.fetch_expr(self.region_id).await
442        {
443            let metadata = manifest.metadata.as_ref().clone();
444            let mut builder = RegionMetadataBuilder::from_existing(metadata);
445            builder.partition_expr_json(Some(expr_json));
446            Arc::new(builder.build().context(InvalidMetadataSnafu)?)
447        } else {
448            manifest.metadata.clone()
449        };
450        // Updates the region options with the manifest.
451        sanitize_region_options(&manifest, &mut region_options);
452
453        let region_id = self.region_id;
454        let provider = self.provider::<S>(&region_options.wal_options)?;
455        let wal_entry_reader = self
456            .wal_entry_reader
457            .take()
458            .unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
459        let on_region_opened = wal.on_region_opened();
460        let object_store = get_object_store(&region_options.storage, &self.object_store_manager)?;
461
462        debug!(
463            "Open region {} at {} with options: {:?}",
464            region_id, self.table_dir, self.options
465        );
466
467        let access_layer = Arc::new(AccessLayer::new(
468            self.table_dir.clone(),
469            self.path_type,
470            object_store,
471            self.puffin_manager_factory.clone(),
472            self.intermediate_manager.clone(),
473        ));
474        let file_purger = create_file_purger(
475            config.gc.enable,
476            self.path_type,
477            self.purge_scheduler.clone(),
478            access_layer.clone(),
479            self.cache_manager.clone(),
480            self.file_ref_manager.clone(),
481        );
482        // We should sanitize the region options before creating a new memtable.
483        let memtable_builder = self
484            .memtable_builder_provider
485            .builder_for_options(&region_options);
486        // Use compaction time window in the manifest if region doesn't provide
487        // the time window option.
488        let part_duration = region_options
489            .compaction
490            .time_window()
491            .or(manifest.compaction_time_window);
492        // Initial memtable id is 0.
493        let mutable = Arc::new(TimePartitions::new(
494            metadata.clone(),
495            memtable_builder.clone(),
496            0,
497            part_duration,
498        ));
499
500        // Updates region options by manifest before creating version.
501        let version_builder = version_builder_from_manifest(
502            &manifest,
503            metadata,
504            file_purger.clone(),
505            mutable,
506            region_options,
507        );
508        let version = version_builder.build();
509        let flushed_entry_id = version.flushed_entry_id;
510        let version_control = Arc::new(VersionControl::new(version));
511
512        let topic_latest_entry_id = if !self.skip_wal_replay {
513            let replay_from_entry_id = self
514                .replay_checkpoint
515                .unwrap_or_default()
516                .max(flushed_entry_id);
517            info!(
518                "Start replaying memtable at replay_from_entry_id: {} for region {}, manifest version: {}, flushed entry id: {}, elapsed: {:?}",
519                replay_from_entry_id,
520                region_id,
521                manifest.manifest_version,
522                flushed_entry_id,
523                now.elapsed()
524            );
525            replay_memtable(
526                &provider,
527                wal_entry_reader,
528                region_id,
529                replay_from_entry_id,
530                &version_control,
531                config.allow_stale_entries,
532                on_region_opened,
533            )
534            .await?;
535            // For remote WAL, we need to set topic_latest_entry_id to current topic's latest entry id.
536            // Only set after the WAL replay is completed.
537
538            if provider.is_remote_wal() && version_control.current().version.memtables.is_empty() {
539                wal.store().latest_entry_id(&provider).unwrap_or(0)
540            } else {
541                0
542            }
543        } else {
544            info!(
545                "Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}, elapsed: {:?}",
546                region_id,
547                manifest.manifest_version,
548                flushed_entry_id,
549                now.elapsed()
550            );
551
552            0
553        };
554
555        if let Some(committed_in_manifest) = manifest.committed_sequence {
556            let committed_after_replay = version_control.committed_sequence();
557            if committed_in_manifest > committed_after_replay {
558                info!(
559                    "Overriding committed sequence, region: {}, flushed_sequence: {}, committed_sequence: {} -> {}",
560                    self.region_id,
561                    version_control.current().version.flushed_sequence,
562                    version_control.committed_sequence(),
563                    committed_in_manifest
564                );
565                version_control.set_committed_sequence(committed_in_manifest);
566            }
567        }
568
569        let now = self.time_provider.current_time_millis();
570
571        let region = MitoRegion {
572            region_id: self.region_id,
573            version_control: version_control.clone(),
574            access_layer: access_layer.clone(),
575            // Region is always opened in read only mode.
576            manifest_ctx: Arc::new(ManifestContext::new(
577                manifest_manager,
578                RegionRoleState::Follower,
579            )),
580            file_purger,
581            provider: provider.clone(),
582            last_flush_millis: AtomicI64::new(now),
583            last_compaction_millis: AtomicI64::new(now),
584            time_provider: self.time_provider.clone(),
585            topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id),
586            written_bytes: Arc::new(AtomicU64::new(0)),
587            stats: self.stats.clone(),
588        };
589
590        let region = Arc::new(region);
591
592        maybe_load_cache(&region, config, &self.cache_manager);
593        maybe_preload_parquet_meta_cache(&region, config, &self.cache_manager);
594
595        Ok(Some(region))
596    }
597}
598
599/// Creates a version builder from a region manifest.
600pub(crate) fn version_builder_from_manifest(
601    manifest: &RegionManifest,
602    metadata: RegionMetadataRef,
603    file_purger: FilePurgerRef,
604    mutable: TimePartitionsRef,
605    region_options: RegionOptions,
606) -> VersionBuilder {
607    VersionBuilder::new(metadata, mutable)
608        .add_files(file_purger, manifest.files.values().cloned())
609        .flushed_entry_id(manifest.flushed_entry_id)
610        .flushed_sequence(manifest.flushed_sequence)
611        .truncated_entry_id(manifest.truncated_entry_id)
612        .compaction_time_window(manifest.compaction_time_window)
613        .options(region_options)
614}
615
616/// Updates region options by persistent options.
617pub(crate) fn sanitize_region_options(manifest: &RegionManifest, options: &mut RegionOptions) {
618    let option_format = options.sst_format.unwrap_or_default();
619    if option_format != manifest.sst_format {
620        common_telemetry::warn!(
621            "Overriding SST format from {:?} to {:?} for region {}",
622            option_format,
623            manifest.sst_format,
624            manifest.metadata.region_id,
625        );
626    }
627    // Always set sst_format from manifest to ensure it's explicitly stored,
628    // even when the default matches the manifest value.
629    options.sst_format = Some(manifest.sst_format);
630    if let Some(manifest_append_mode) = manifest.append_mode
631        && options.append_mode != manifest_append_mode
632    {
633        common_telemetry::warn!(
634            "Overriding append_mode from {} to {} for region {}",
635            options.append_mode,
636            manifest_append_mode,
637            manifest.metadata.region_id,
638        );
639        options.append_mode = manifest_append_mode;
640    }
641    if options.append_mode && options.merge_mode.take().is_some() {
642        common_telemetry::warn!(
643            "Ignoring merge_mode because append_mode is enabled for region {}",
644            manifest.metadata.region_id,
645        );
646    }
647}
648
649/// Sanitizes open request options before parsing.
650pub(crate) fn sanitize_open_request_options(options: &mut HashMap<String, String>) {
651    let append_mode_enabled = options
652        .get("append_mode")
653        .is_some_and(|v| matches!(v.trim().to_ascii_lowercase().as_str(), "true" | "1"));
654
655    if append_mode_enabled && options.remove("merge_mode").is_some() {
656        common_telemetry::warn!(
657            "Ignoring merge_mode in open request options because append_mode is enabled"
658        );
659    }
660}
661
662/// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store.
663pub fn get_object_store(
664    name: &Option<String>,
665    object_store_manager: &ObjectStoreManagerRef,
666) -> Result<object_store::ObjectStore> {
667    if let Some(name) = name {
668        Ok(object_store_manager
669            .find(name)
670            .with_context(|| ObjectStoreNotFoundSnafu {
671                object_store: name.clone(),
672            })?
673            .clone())
674    } else {
675        Ok(object_store_manager.default_object_store().clone())
676    }
677}
678
679/// Checks whether the recovered region has the same schema as region to create.
680pub(crate) fn check_recovered_region(
681    recovered: &RegionMetadata,
682    region_id: RegionId,
683    column_metadatas: &[ColumnMetadata],
684    primary_key: &[ColumnId],
685) -> Result<()> {
686    if recovered.region_id != region_id {
687        error!(
688            "Recovered region {}, expect region {}",
689            recovered.region_id, region_id
690        );
691        return RegionCorruptedSnafu {
692            region_id,
693            reason: format!(
694                "recovered metadata has different region id {}",
695                recovered.region_id
696            ),
697        }
698        .fail();
699    }
700    if recovered.column_metadatas != column_metadatas {
701        error!(
702            "Unexpected schema in recovered region {}, recovered: {:?}, expect: {:?}",
703            recovered.region_id, recovered.column_metadatas, column_metadatas
704        );
705
706        return RegionCorruptedSnafu {
707            region_id,
708            reason: "recovered metadata has different schema",
709        }
710        .fail();
711    }
712    if recovered.primary_key != primary_key {
713        error!(
714            "Unexpected primary key in recovered region {}, recovered: {:?}, expect: {:?}",
715            recovered.region_id, recovered.primary_key, primary_key
716        );
717
718        return RegionCorruptedSnafu {
719            region_id,
720            reason: "recovered metadata has different primary key",
721        }
722        .fail();
723    }
724
725    Ok(())
726}
727
728/// Replays the mutations from WAL and inserts mutations to memtable of given region.
729pub(crate) async fn replay_memtable<F>(
730    provider: &Provider,
731    mut wal_entry_reader: Box<dyn WalEntryReader>,
732    region_id: RegionId,
733    flushed_entry_id: EntryId,
734    version_control: &VersionControlRef,
735    allow_stale_entries: bool,
736    on_region_opened: F,
737) -> Result<EntryId>
738where
739    F: FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> + Send,
740{
741    let now = Instant::now();
742    let mut rows_replayed = 0;
743    // Last entry id should start from flushed entry id since there might be no
744    // data in the WAL.
745    let mut last_entry_id = flushed_entry_id;
746    let replay_from_entry_id = flushed_entry_id + 1;
747
748    let mut wal_stream = wal_entry_reader.read(provider, replay_from_entry_id)?;
749    while let Some(res) = wal_stream.next().await {
750        let (entry_id, entry) = res?;
751        if entry_id <= flushed_entry_id {
752            warn!(
753                "Stale WAL entries read during replay, region id: {}, flushed entry id: {}, entry id read: {}",
754                region_id, flushed_entry_id, entry_id
755            );
756            ensure!(
757                allow_stale_entries,
758                StaleLogEntrySnafu {
759                    region_id,
760                    flushed_entry_id,
761                    unexpected_entry_id: entry_id,
762                }
763            );
764        }
765        last_entry_id = last_entry_id.max(entry_id);
766
767        let mut region_write_ctx = RegionWriteCtx::new(
768            region_id,
769            version_control,
770            provider.clone(),
771            // For WAL replay, we don't need to track the write bytes rate.
772            None,
773        );
774        for mutation in entry.mutations {
775            rows_replayed += mutation
776                .rows
777                .as_ref()
778                .map(|rows| rows.rows.len())
779                .unwrap_or(0);
780            region_write_ctx.push_mutation(
781                mutation.op_type,
782                mutation.rows,
783                mutation.write_hint,
784                OptionOutputTx::none(),
785                // We should respect the sequence in WAL during replay.
786                Some(mutation.sequence),
787            );
788        }
789
790        for bulk_entry in entry.bulk_entries {
791            let part = BulkPart::try_from(bulk_entry)?;
792            rows_replayed += part.num_rows();
793            // During replay, we should adopt the sequence from WAL.
794            let bulk_sequence_from_wal = part.sequence;
795            ensure!(
796                region_write_ctx.push_bulk(
797                    OptionOutputTx::none(),
798                    part,
799                    Some(bulk_sequence_from_wal)
800                ),
801                RegionCorruptedSnafu {
802                    region_id,
803                    reason: "unable to replay memtable with bulk entries",
804                }
805            );
806        }
807
808        // set next_entry_id and write to memtable.
809        region_write_ctx.set_next_entry_id(last_entry_id + 1);
810        region_write_ctx.write_memtable().await;
811        region_write_ctx.write_bulk().await;
812    }
813
814    // TODO(weny): We need to update `flushed_entry_id` in the region manifest
815    // to avoid reading potentially incomplete entries in the future.
816    (on_region_opened)(region_id, flushed_entry_id, provider).await?;
817
818    let series_count = version_control.current().series_count();
819    info!(
820        "Replay WAL for region: {}, provider: {:?}, rows recovered: {}, replay from entry id: {}, last entry id: {}, total timeseries replayed: {}, elapsed: {:?}",
821        region_id,
822        provider,
823        rows_replayed,
824        replay_from_entry_id,
825        last_entry_id,
826        series_count,
827        now.elapsed()
828    );
829    Ok(last_entry_id)
830}
831
832/// A task to load and fill the region file cache.
833pub(crate) struct RegionLoadCacheTask {
834    region: MitoRegionRef,
835}
836
837impl RegionLoadCacheTask {
838    pub(crate) fn new(region: MitoRegionRef) -> Self {
839        Self { region }
840    }
841
842    /// Fills the file cache with index files from the region.
843    pub(crate) async fn fill_cache(&self, file_cache: &FileCache) {
844        let region_id = self.region.region_id;
845        let table_dir = self.region.access_layer.table_dir();
846        let path_type = self.region.access_layer.path_type();
847        let object_store = self.region.access_layer.object_store();
848        let version_control = &self.region.version_control;
849
850        // Collects IndexKeys, file sizes, and max timestamps for files that need to be downloaded
851        let mut files_to_download = Vec::new();
852        let mut files_already_cached = 0;
853
854        {
855            let version = version_control.current().version;
856            for level in version.ssts.levels() {
857                for file_handle in level.files.values() {
858                    let file_meta = file_handle.meta_ref();
859                    if file_meta.exists_index() {
860                        let puffin_key = IndexKey::new(
861                            file_meta.region_id,
862                            file_meta.file_id,
863                            FileType::Puffin(file_meta.index_version),
864                        );
865
866                        if !file_cache.contains_key(&puffin_key) {
867                            files_to_download.push((
868                                puffin_key,
869                                file_meta.index_file_size,
870                                file_meta.time_range.1, // max timestamp
871                            ));
872                        } else {
873                            files_already_cached += 1;
874                        }
875                    }
876                }
877            }
878            // Releases the Version after the scope to avoid holding the memtables and file handles
879            // for a long time.
880        }
881
882        // Sorts files by max timestamp in descending order to loads latest files first
883        files_to_download.sort_by_key(|b| std::cmp::Reverse(b.2));
884
885        let total_files = files_to_download.len() as i64;
886
887        info!(
888            "Starting background index cache preload for region {}, total_files_to_download: {}, files_already_cached: {}",
889            region_id, total_files, files_already_cached
890        );
891
892        CACHE_FILL_PENDING_FILES.add(total_files);
893
894        let mut files_downloaded = 0;
895        let mut files_skipped = 0;
896
897        for (puffin_key, file_size, max_timestamp) in files_to_download {
898            let current_size = file_cache.puffin_cache_size();
899            let capacity = file_cache.puffin_cache_capacity();
900            let region_state = self.region.state();
901            if !can_load_cache(region_state) {
902                info!(
903                    "Stopping index cache by state: {:?}, region: {}, current_size: {}, capacity: {}",
904                    region_state, region_id, current_size, capacity
905                );
906                break;
907            }
908
909            // Checks if adding this file would exceed capacity
910            if current_size + file_size > capacity {
911                info!(
912                    "Stopping index cache preload due to capacity limit, region: {}, file_id: {}, current_size: {}, file_size: {}, capacity: {}, file_timestamp: {:?}",
913                    region_id, puffin_key.file_id, current_size, file_size, capacity, max_timestamp
914                );
915                files_skipped = (total_files - files_downloaded) as usize;
916                CACHE_FILL_PENDING_FILES.sub(total_files - files_downloaded);
917                break;
918            }
919
920            let index_version = if let FileType::Puffin(version) = puffin_key.file_type {
921                version
922            } else {
923                unreachable!("`files_to_download` should only contains Puffin files");
924            };
925            let index_id = RegionIndexId::new(
926                RegionFileId::new(puffin_key.region_id, puffin_key.file_id),
927                index_version,
928            );
929
930            let index_remote_path = location::index_file_path(table_dir, index_id, path_type);
931
932            match file_cache
933                .download(puffin_key, &index_remote_path, object_store, file_size)
934                .await
935            {
936                Ok(_) => {
937                    debug!(
938                        "Downloaded index file to write cache, region: {}, file_id: {}",
939                        region_id, puffin_key.file_id
940                    );
941                    files_downloaded += 1;
942                    CACHE_FILL_DOWNLOADED_FILES.inc_by(1);
943                    CACHE_FILL_PENDING_FILES.dec();
944                }
945                Err(e) => {
946                    warn!(
947                        e; "Failed to download index file to write cache, region: {}, file_id: {}",
948                        region_id, puffin_key.file_id
949                    );
950                    CACHE_FILL_PENDING_FILES.dec();
951                }
952            }
953        }
954
955        info!(
956            "Completed background cache fill task for region {}, total_files: {}, files_downloaded: {}, files_already_cached: {}, files_skipped: {}",
957            region_id, total_files, files_downloaded, files_already_cached, files_skipped
958        );
959    }
960}
961
962/// Loads all index (Puffin) files from the version into the write cache.
963fn maybe_load_cache(
964    region: &MitoRegionRef,
965    config: &MitoConfig,
966    cache_manager: &Option<CacheManagerRef>,
967) {
968    let Some(cache_manager) = cache_manager else {
969        return;
970    };
971    let Some(write_cache) = cache_manager.write_cache() else {
972        return;
973    };
974
975    let preload_enabled = config.preload_index_cache;
976    if !preload_enabled {
977        return;
978    }
979
980    let task = RegionLoadCacheTask::new(region.clone());
981    write_cache.load_region_cache(task);
982}
983
984/// Preloads Parquet metadata into the in-memory SST meta cache on region open.
985///
986/// This improves the latency of the first query after server start by avoiding large Parquet
987/// metadata reads on demand.
988///
989/// The preload is best-effort:
990/// - Always tries to warm from the local write cache (file cache) first.
991/// - If the region storage backend is local filesystem (`Scheme::Fs`), it may also load metadata
992///   directly from the local store.
993/// - It will not fetch metadata from remote object stores (S3/GCS/OSS/...).
994#[allow(clippy::too_many_arguments)]
995async fn preload_parquet_meta_cache_for_files(
996    region_id: RegionId,
997    cache_manager: CacheManagerRef,
998    sst_meta_cache_capacity: u64,
999    table_dir: String,
1000    path_type: PathType,
1001    object_store: object_store::ObjectStore,
1002    region_metadata: RegionMetadataRef,
1003    mut files: Vec<FileHandle>,
1004) -> usize {
1005    if !cache_manager.sst_meta_cache_enabled()
1006        || sst_meta_cache_capacity == 0
1007        || cache_manager.sst_meta_cache_weighted_size() >= sst_meta_cache_capacity
1008    {
1009        return 0;
1010    }
1011
1012    let allow_direct_load = matches!(object_store.info().scheme(), object_store::Scheme::Fs);
1013
1014    // Sort by time range so we can prefer preloading newer files first.
1015    files.sort_by_key(|b| std::cmp::Reverse(b.meta_ref().time_range.1));
1016
1017    let mut loaded = 0usize;
1018    for file_handle in files {
1019        // Stop when the shared SST meta cache is full.
1020        if cache_manager.sst_meta_cache_weighted_size() >= sst_meta_cache_capacity {
1021            break;
1022        }
1023
1024        let file_id = file_handle.file_id();
1025        let mut cache_metrics = MetadataCacheMetrics::default();
1026        if let Some(metadata) = cache_manager
1027            .get_parquet_meta_data(file_id, &mut cache_metrics, Default::default())
1028            .await
1029        {
1030            if file_handle.primary_key_range().is_none()
1031                && let Some(primary_key_range) =
1032                    extract_primary_key_range(&metadata, &region_metadata)
1033            {
1034                file_handle.set_primary_key_range(primary_key_range);
1035            }
1036            // Metadata is either already in memory or loaded from file cache.
1037            if cache_metrics.mem_cache_hit == 0 {
1038                loaded += 1;
1039            }
1040            continue;
1041        }
1042
1043        if !allow_direct_load {
1044            continue;
1045        }
1046
1047        let file_size = file_handle.meta_ref().file_size;
1048        let file_path = file_handle.file_path(&table_dir, path_type);
1049        let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
1050        match loader.load(&mut cache_metrics).await {
1051            Ok(metadata) => {
1052                if let Some(primary_key_range) =
1053                    extract_primary_key_range(&metadata, &region_metadata)
1054                {
1055                    file_handle.set_primary_key_range(primary_key_range);
1056                }
1057                cache_manager.put_parquet_meta_data(file_id, Arc::new(metadata), None);
1058                loaded += 1;
1059            }
1060            Err(err) => {
1061                // Preloading is best-effort. Failure shouldn't affect region open.
1062                warn!(
1063                    err; "Failed to preload parquet metadata from local store, region: {}, file: {}",
1064                    region_id, file_path
1065                );
1066            }
1067        }
1068    }
1069
1070    loaded
1071}
1072
1073fn maybe_preload_parquet_meta_cache(
1074    region: &MitoRegionRef,
1075    config: &MitoConfig,
1076    cache_manager: &Option<CacheManagerRef>,
1077) {
1078    let Some(cache_manager) = cache_manager else {
1079        return;
1080    };
1081    if !cache_manager.sst_meta_cache_enabled() {
1082        return;
1083    }
1084
1085    // Skip if SST meta cache is disabled.
1086    if config.sst_meta_cache_size.as_bytes() == 0 {
1087        return;
1088    }
1089    if !config.preload_index_cache {
1090        return;
1091    }
1092
1093    let region = region.clone();
1094    let cache_manager = cache_manager.clone();
1095    let sst_meta_cache_capacity = config.sst_meta_cache_size.as_bytes();
1096
1097    tokio::spawn(async move {
1098        // Safety: semaphore must exist.
1099        let _permit = PARQUET_META_PRELOAD_SEMAPHORE.acquire().await.unwrap();
1100
1101        let region_id = region.region_id;
1102        let table_dir = region.access_layer.table_dir().to_string();
1103        let path_type = region.access_layer.path_type();
1104        let object_store = region.access_layer.object_store().clone();
1105        let region_metadata = region.version_control.current().version.metadata.clone();
1106
1107        // Collect SST files. Do not hold the version longer than needed.
1108        let mut files = Vec::new();
1109        {
1110            let version = region.version_control.current().version;
1111            for level in version.ssts.levels() {
1112                for file_handle in level.files.values() {
1113                    files.push(file_handle.clone());
1114                }
1115            }
1116        }
1117        let preloading_start = Instant::now();
1118        let loaded = preload_parquet_meta_cache_for_files(
1119            region_id,
1120            cache_manager,
1121            sst_meta_cache_capacity,
1122            table_dir,
1123            path_type,
1124            object_store,
1125            region_metadata,
1126            files,
1127        )
1128        .await;
1129        let preloading_cost = preloading_start.elapsed();
1130
1131        if loaded > 0 {
1132            info!(
1133                "Preloaded parquet metadata for region {}, loaded_files: {}, elapsed_ms: {}",
1134                region_id,
1135                loaded,
1136                preloading_cost.as_millis()
1137            );
1138        }
1139    });
1140}
1141
1142fn can_load_cache(state: RegionRoleState) -> bool {
1143    match state {
1144        RegionRoleState::Leader(RegionLeaderState::Writable)
1145        | RegionRoleState::Leader(RegionLeaderState::Staging)
1146        | RegionRoleState::Leader(RegionLeaderState::Altering)
1147        | RegionRoleState::Leader(RegionLeaderState::EnteringStaging)
1148        | RegionRoleState::Leader(RegionLeaderState::Editing)
1149        | RegionRoleState::Follower => true,
1150        // The region will be closed soon if it is downgrading.
1151        RegionRoleState::Leader(RegionLeaderState::Downgrading)
1152        | RegionRoleState::Leader(RegionLeaderState::Dropping)
1153        | RegionRoleState::Leader(RegionLeaderState::Truncating) => false,
1154    }
1155}
1156
1157#[cfg(test)]
1158mod tests {
1159    use std::sync::Arc;
1160
1161    use common_base::readable_size::ReadableSize;
1162    use common_test_util::temp_dir::create_temp_dir;
1163    use common_time::Timestamp;
1164    use datatypes::arrow::array::{ArrayRef, BinaryArray, Int64Array};
1165    use datatypes::arrow::record_batch::RecordBatch;
1166    use object_store::ObjectStore;
1167    use object_store::services::{Fs, Memory};
1168    use parquet::arrow::ArrowWriter;
1169    use parquet::file::metadata::KeyValue;
1170    use parquet::file::properties::WriterProperties;
1171    use store_api::region_request::PathType;
1172    use store_api::storage::{FileId, RegionId};
1173
1174    use super::preload_parquet_meta_cache_for_files;
1175    use crate::cache::CacheManager;
1176    use crate::cache::file_cache::{FileType, IndexKey};
1177    use crate::sst::file::{FileHandle, FileMeta};
1178    use crate::sst::file_purger::NoopFilePurger;
1179    use crate::sst::parquet::PARQUET_METADATA_KEY;
1180    use crate::test_util::TestEnv;
1181    use crate::test_util::sst_util::sst_region_metadata;
1182
1183    fn sst_parquet_bytes(batch: &RecordBatch) -> Vec<u8> {
1184        let key_value_meta = KeyValue::new(
1185            PARQUET_METADATA_KEY.to_string(),
1186            sst_region_metadata().to_json().unwrap(),
1187        );
1188        let props = WriterProperties::builder()
1189            .set_key_value_metadata(Some(vec![key_value_meta]))
1190            .build();
1191
1192        let mut parquet_bytes = Vec::new();
1193        let mut writer =
1194            ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), Some(props)).unwrap();
1195        writer.write(batch).unwrap();
1196        writer.close().unwrap();
1197
1198        parquet_bytes
1199    }
1200
1201    #[tokio::test]
1202    async fn test_preload_parquet_meta_cache_uses_file_cache() {
1203        let env = TestEnv::new().await;
1204
1205        let local_store = ObjectStore::new(Memory::default()).unwrap().finish();
1206        let write_cache = env
1207            .create_write_cache(local_store, ReadableSize::mb(1024))
1208            .await;
1209        let cache_manager = Arc::new(
1210            CacheManager::builder()
1211                .sst_meta_cache_size(1024 * 1024)
1212                .write_cache(Some(write_cache.clone()))
1213                .build(),
1214        );
1215
1216        let region_id = RegionId::new(1, 1);
1217        let file_id = FileId::random();
1218
1219        let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
1220        let primary_key = Arc::new(BinaryArray::from_iter_values([b"a", b"b", b"c"])) as ArrayRef;
1221        let batch = RecordBatch::try_from_iter([
1222            ("col", col),
1223            (
1224                store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME,
1225                primary_key,
1226            ),
1227        ])
1228        .unwrap();
1229        let parquet_bytes = sst_parquet_bytes(&batch);
1230        let file_size = parquet_bytes.len() as u64;
1231
1232        let file_meta = FileMeta {
1233            region_id,
1234            file_id,
1235            time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)),
1236            level: 0,
1237            file_size,
1238            max_row_group_uncompressed_size: 0,
1239            available_indexes: Default::default(),
1240            indexes: vec![],
1241            index_file_size: 0,
1242            index_version: 0,
1243            num_rows: 3,
1244            num_row_groups: 1,
1245            sequence: None,
1246            partition_expr: None,
1247            num_series: 0,
1248            ..Default::default()
1249        };
1250        let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
1251
1252        let table_dir = "test_table";
1253        let path_type = PathType::Bare;
1254        let remote_path = file_handle.file_path(table_dir, path_type);
1255
1256        let source_store = ObjectStore::new(Memory::default()).unwrap().finish();
1257        source_store
1258            .write(&remote_path, parquet_bytes)
1259            .await
1260            .unwrap();
1261
1262        // Put the parquet file into the write cache, so file cache contains metadata.
1263        let index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
1264        write_cache
1265            .file_cache()
1266            .download(index_key, &remote_path, &source_store, file_size)
1267            .await
1268            .unwrap();
1269
1270        let region_file_id = file_handle.file_id();
1271        assert!(
1272            cache_manager
1273                .get_parquet_meta_data_from_mem_cache(region_file_id)
1274                .is_none()
1275        );
1276
1277        let loaded = preload_parquet_meta_cache_for_files(
1278            region_id,
1279            cache_manager.clone(),
1280            1024 * 1024,
1281            table_dir.to_string(),
1282            path_type,
1283            source_store.clone(),
1284            Arc::new(sst_region_metadata()),
1285            vec![file_handle.clone()],
1286        )
1287        .await;
1288
1289        // Should warm the in-memory cache from the local file cache.
1290        assert_eq!(loaded, 1);
1291        assert!(
1292            cache_manager
1293                .get_parquet_meta_data_from_mem_cache(region_file_id)
1294                .is_some()
1295        );
1296        assert!(file_handle.primary_key_range().is_some());
1297    }
1298
1299    #[tokio::test]
1300    async fn test_preload_parquet_meta_cache_skips_files_not_in_file_cache() {
1301        let cache_manager = Arc::new(
1302            CacheManager::builder()
1303                .sst_meta_cache_size(1024 * 1024)
1304                .build(),
1305        );
1306
1307        let region_id = RegionId::new(1, 1);
1308        let file_id = FileId::random();
1309
1310        // Without a local file cache entry, preloading should skip the file.
1311        let file_meta = FileMeta {
1312            region_id,
1313            file_id,
1314            time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)),
1315            level: 0,
1316            file_size: 0,
1317            max_row_group_uncompressed_size: 0,
1318            available_indexes: Default::default(),
1319            indexes: vec![],
1320            index_file_size: 0,
1321            index_version: 0,
1322            num_rows: 3,
1323            num_row_groups: 1,
1324            sequence: None,
1325            partition_expr: None,
1326            num_series: 0,
1327            ..Default::default()
1328        };
1329        let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
1330
1331        let table_dir = "test_table";
1332        let path_type = PathType::Bare;
1333        let remote_path = file_handle.file_path(table_dir, path_type);
1334
1335        // Even if the remote object store has the file, we should not preload from it.
1336        let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
1337        object_store
1338            .write(&remote_path, b"noop".as_slice())
1339            .await
1340            .unwrap();
1341
1342        let region_file_id = file_handle.file_id();
1343        assert!(
1344            cache_manager
1345                .get_parquet_meta_data_from_mem_cache(region_file_id)
1346                .is_none()
1347        );
1348
1349        let loaded = preload_parquet_meta_cache_for_files(
1350            region_id,
1351            cache_manager.clone(),
1352            1024 * 1024,
1353            table_dir.to_string(),
1354            path_type,
1355            object_store,
1356            Arc::new(sst_region_metadata()),
1357            vec![file_handle],
1358        )
1359        .await;
1360
1361        assert_eq!(loaded, 0);
1362        assert!(
1363            cache_manager
1364                .get_parquet_meta_data_from_mem_cache(region_file_id)
1365                .is_none()
1366        );
1367    }
1368
1369    #[tokio::test]
1370    async fn test_preload_parquet_meta_cache_loads_from_local_fs() {
1371        let cache_manager = Arc::new(
1372            CacheManager::builder()
1373                .sst_meta_cache_size(1024 * 1024)
1374                .build(),
1375        );
1376
1377        let region_id = RegionId::new(1, 1);
1378        let file_id = FileId::random();
1379
1380        let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
1381        let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
1382        let parquet_bytes = sst_parquet_bytes(&batch);
1383
1384        // file_size is 0 when it's missing/defaulted in manifests; MetadataLoader::load will stat
1385        // the local filesystem to retrieve it.
1386        let file_meta = FileMeta {
1387            region_id,
1388            file_id,
1389            time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)),
1390            level: 0,
1391            file_size: 0,
1392            max_row_group_uncompressed_size: 0,
1393            available_indexes: Default::default(),
1394            indexes: vec![],
1395            index_file_size: 0,
1396            index_version: 0,
1397            num_rows: 3,
1398            num_row_groups: 1,
1399            sequence: None,
1400            partition_expr: None,
1401            num_series: 0,
1402            ..Default::default()
1403        };
1404        let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
1405
1406        let table_dir = "test_table";
1407        let path_type = PathType::Bare;
1408        let file_path = file_handle.file_path(table_dir, path_type);
1409
1410        let root = create_temp_dir("parquet-meta-preload");
1411        let object_store = ObjectStore::new(Fs::default().root(root.path().to_str().unwrap()))
1412            .unwrap()
1413            .finish();
1414        object_store.write(&file_path, parquet_bytes).await.unwrap();
1415
1416        let region_file_id = file_handle.file_id();
1417        assert!(
1418            cache_manager
1419                .get_parquet_meta_data_from_mem_cache(region_file_id)
1420                .is_none()
1421        );
1422
1423        let loaded = preload_parquet_meta_cache_for_files(
1424            region_id,
1425            cache_manager.clone(),
1426            1024 * 1024,
1427            table_dir.to_string(),
1428            path_type,
1429            object_store,
1430            Arc::new(sst_region_metadata()),
1431            vec![file_handle],
1432        )
1433        .await;
1434
1435        assert_eq!(loaded, 1);
1436        assert!(
1437            cache_manager
1438                .get_parquet_meta_data_from_mem_cache(region_file_id)
1439                .is_some()
1440        );
1441    }
1442}