Skip to main content

mito2/region/
opener.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Region opener.
16
17use std::any::TypeId;
18use std::collections::HashMap;
19use std::sync::atomic::{AtomicI64, AtomicU64};
20use std::sync::{Arc, LazyLock};
21use std::time::Instant;
22
23use common_telemetry::{debug, error, info, warn};
24use common_wal::options::WalOptions;
25use futures::StreamExt;
26use futures::future::BoxFuture;
27use log_store::kafka::log_store::KafkaLogStore;
28use log_store::noop::log_store::NoopLogStore;
29use log_store::raft_engine::log_store::RaftEngineLogStore;
30use object_store::manager::ObjectStoreManagerRef;
31use object_store::util::normalize_dir;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::logstore::LogStore;
34use store_api::logstore::provider::Provider;
35use store_api::metadata::{
36    ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
37};
38use store_api::region_engine::RegionRole;
39use store_api::region_request::PathType;
40use store_api::storage::{ColumnId, RegionId};
41use tokio::sync::Semaphore;
42
43use crate::access_layer::AccessLayer;
44use crate::cache::CacheManagerRef;
45use crate::cache::file_cache::{FileCache, FileType, IndexKey};
46use crate::config::MitoConfig;
47use crate::error;
48use crate::error::{
49    EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu,
50    Result, StaleLogEntrySnafu,
51};
52use crate::manifest::action::RegionManifest;
53use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
54use crate::memtable::MemtableBuilderProvider;
55use crate::memtable::bulk::part::BulkPart;
56use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef};
57use crate::metrics::{CACHE_FILL_DOWNLOADED_FILES, CACHE_FILL_PENDING_FILES};
58use crate::region::options::RegionOptions;
59use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
60use crate::region::{
61    ManifestContext, ManifestStats, MitoRegion, MitoRegionRef, RegionLeaderState, RegionRoleState,
62};
63use crate::region_write_ctx::RegionWriteCtx;
64use crate::request::OptionOutputTx;
65use crate::schedule::scheduler::SchedulerRef;
66use crate::sst::FormatType;
67use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId};
68use crate::sst::file_purger::{FilePurgerRef, create_file_purger};
69use crate::sst::file_ref::FileReferenceManagerRef;
70use crate::sst::index::intermediate::IntermediateManager;
71use crate::sst::index::puffin_manager::PuffinManagerFactory;
72use crate::sst::location::{self, region_dir_from_table_dir};
73use crate::sst::parquet::metadata::{MetadataLoader, extract_primary_key_range};
74use crate::sst::parquet::reader::MetadataCacheMetrics;
75use crate::time_provider::TimeProviderRef;
76use crate::wal::entry_reader::WalEntryReader;
77use crate::wal::{EntryId, Wal};
78
79const PARQUET_META_PRELOAD_CONCURRENCY: usize = 8;
80
81static PARQUET_META_PRELOAD_SEMAPHORE: LazyLock<Semaphore> =
82    LazyLock::new(|| Semaphore::new(PARQUET_META_PRELOAD_CONCURRENCY));
83
84/// A fetcher to retrieve partition expr for a region.
85///
86/// Compatibility: older regions didn't persist `partition_expr` in engine metadata,
87/// while newer ones do. On open, we backfill it via this fetcher and persist it
88/// to the manifest so future opens don't need refetching.
89#[async_trait::async_trait]
90pub trait PartitionExprFetcher {
91    async fn fetch_expr(&self, region_id: RegionId) -> Option<String>;
92}
93
94pub type PartitionExprFetcherRef = Arc<dyn PartitionExprFetcher + Send + Sync>;
95
96/// Builder to create a new [MitoRegion] or open an existing one.
97pub(crate) struct RegionOpener {
98    region_id: RegionId,
99    metadata_builder: Option<RegionMetadataBuilder>,
100    memtable_builder_provider: MemtableBuilderProvider,
101    object_store_manager: ObjectStoreManagerRef,
102    table_dir: String,
103    path_type: PathType,
104    purge_scheduler: SchedulerRef,
105    options: Option<RegionOptions>,
106    cache_manager: Option<CacheManagerRef>,
107    skip_wal_replay: bool,
108    puffin_manager_factory: PuffinManagerFactory,
109    intermediate_manager: IntermediateManager,
110    time_provider: TimeProviderRef,
111    stats: ManifestStats,
112    wal_entry_reader: Option<Box<dyn WalEntryReader>>,
113    replay_checkpoint: Option<u64>,
114    file_ref_manager: FileReferenceManagerRef,
115    partition_expr_fetcher: PartitionExprFetcherRef,
116}
117
118impl RegionOpener {
119    /// Returns a new opener.
120    // TODO(LFC): Reduce the number of arguments.
121    #[allow(clippy::too_many_arguments)]
122    pub(crate) fn new(
123        region_id: RegionId,
124        table_dir: &str,
125        path_type: PathType,
126        memtable_builder_provider: MemtableBuilderProvider,
127        object_store_manager: ObjectStoreManagerRef,
128        purge_scheduler: SchedulerRef,
129        puffin_manager_factory: PuffinManagerFactory,
130        intermediate_manager: IntermediateManager,
131        time_provider: TimeProviderRef,
132        file_ref_manager: FileReferenceManagerRef,
133        partition_expr_fetcher: PartitionExprFetcherRef,
134    ) -> RegionOpener {
135        RegionOpener {
136            region_id,
137            metadata_builder: None,
138            memtable_builder_provider,
139            object_store_manager,
140            table_dir: normalize_dir(table_dir),
141            path_type,
142            purge_scheduler,
143            options: None,
144            cache_manager: None,
145            skip_wal_replay: false,
146            puffin_manager_factory,
147            intermediate_manager,
148            time_provider,
149            stats: Default::default(),
150            wal_entry_reader: None,
151            replay_checkpoint: None,
152            file_ref_manager,
153            partition_expr_fetcher,
154        }
155    }
156
157    /// Sets metadata builder of the region to create.
158    pub(crate) fn metadata_builder(mut self, builder: RegionMetadataBuilder) -> Self {
159        self.metadata_builder = Some(builder);
160        self
161    }
162
163    /// Computes the region directory from table_dir and region_id.
164    fn region_dir(&self) -> String {
165        region_dir_from_table_dir(&self.table_dir, self.region_id, self.path_type)
166    }
167
168    /// Builds the region metadata.
169    ///
170    /// # Panics
171    /// - Panics if `options` is not set.
172    /// - Panics if `metadata_builder` is not set.
173    fn build_metadata(&mut self) -> Result<RegionMetadata> {
174        let options = self.options.as_ref().unwrap();
175        let mut metadata_builder = self.metadata_builder.take().unwrap();
176        metadata_builder.primary_key_encoding(options.primary_key_encoding());
177        metadata_builder.build().context(InvalidMetadataSnafu)
178    }
179
180    /// Parses and sets options for the region.
181    pub(crate) fn parse_options(self, options: HashMap<String, String>) -> Result<Self> {
182        let region_id = self.region_id;
183        self.options(RegionOptions::try_from_options(region_id, &options)?)
184    }
185
186    /// Sets the replay checkpoint for the region.
187    pub(crate) fn replay_checkpoint(mut self, replay_checkpoint: Option<u64>) -> Self {
188        self.replay_checkpoint = replay_checkpoint;
189        self
190    }
191
192    /// If a [WalEntryReader] is set, the [RegionOpener] will use [WalEntryReader] instead of
193    /// constructing a new one from scratch.
194    pub(crate) fn wal_entry_reader(
195        mut self,
196        wal_entry_reader: Option<Box<dyn WalEntryReader>>,
197    ) -> Self {
198        self.wal_entry_reader = wal_entry_reader;
199        self
200    }
201
202    /// Sets options for the region.
203    pub(crate) fn options(mut self, options: RegionOptions) -> Result<Self> {
204        options.validate()?;
205        self.options = Some(options);
206        Ok(self)
207    }
208
209    /// Sets the cache manager for the region.
210    pub(crate) fn cache(mut self, cache_manager: Option<CacheManagerRef>) -> Self {
211        self.cache_manager = cache_manager;
212        self
213    }
214
215    /// Sets the `skip_wal_replay`.
216    pub(crate) fn skip_wal_replay(mut self, skip: bool) -> Self {
217        self.skip_wal_replay = skip;
218        self
219    }
220
221    /// Writes region manifest and creates a new region if it does not exist.
222    /// Opens the region if it already exists.
223    ///
224    /// # Panics
225    /// - Panics if `metadata_builder` is not set.
226    /// - Panics if `options` is not set.
227    pub(crate) async fn create_or_open<S: LogStore>(
228        mut self,
229        config: &MitoConfig,
230        wal: &Wal<S>,
231    ) -> Result<MitoRegionRef> {
232        let region_id = self.region_id;
233        let region_dir = self.region_dir();
234        let metadata = self.build_metadata()?;
235        // Tries to open the region.
236        match self.maybe_open(config, wal).await {
237            Ok(Some(region)) => {
238                let recovered = region.metadata();
239                // Checks the schema of the region.
240                let expect = &metadata;
241                check_recovered_region(
242                    &recovered,
243                    expect.region_id,
244                    &expect.column_metadatas,
245                    &expect.primary_key,
246                )?;
247                // To keep consistency with Create behavior, set the opened Region to RegionRole::Leader.
248                region.set_role(RegionRole::Leader);
249
250                return Ok(region);
251            }
252            Ok(None) => {
253                debug!(
254                    "No data under directory {}, region_id: {}",
255                    region_dir, self.region_id
256                );
257            }
258            Err(e) => {
259                warn!(e;
260                    "Failed to open region {} before creating it, region_dir: {}",
261                    self.region_id, region_dir
262                );
263            }
264        }
265        // Safety: must be set before calling this method.
266        let mut options = self.options.take().unwrap();
267        let object_store = get_object_store(&options.storage, &self.object_store_manager)?;
268        let provider = self.provider::<S>(&options.wal_options)?;
269        let metadata = Arc::new(metadata);
270        // Sets the sst_format based on options or flat_format flag
271        let sst_format = if let Some(format) = options.sst_format {
272            format
273        } else if config.default_flat_format {
274            options.sst_format = Some(FormatType::Flat);
275            FormatType::Flat
276        } else {
277            // Default to PrimaryKeyParquet for newly created regions
278            options.sst_format = Some(FormatType::PrimaryKey);
279            FormatType::PrimaryKey
280        };
281        // Create a manifest manager for this region and writes regions to the manifest file.
282        let mut region_manifest_options =
283            RegionManifestOptions::new(config, &region_dir, &object_store);
284        // Set manifest cache if available
285        region_manifest_options.manifest_cache = self
286            .cache_manager
287            .as_ref()
288            .and_then(|cm| cm.write_cache())
289            .and_then(|wc| wc.manifest_cache());
290        // For remote WAL, we need to set flushed_entry_id to current topic's latest entry id.
291        let flushed_entry_id = provider.initial_flushed_entry_id::<S>(wal.store());
292        let manifest_manager = RegionManifestManager::new(
293            metadata.clone(),
294            flushed_entry_id,
295            region_manifest_options,
296            sst_format,
297            &self.stats,
298        )
299        .await?;
300
301        let memtable_builder = self.memtable_builder_provider.builder_for_options(&options);
302        let part_duration = options.compaction.time_window();
303        // Initial memtable id is 0.
304        let mutable = Arc::new(TimePartitions::new(
305            metadata.clone(),
306            memtable_builder.clone(),
307            0,
308            part_duration,
309        ));
310
311        debug!(
312            "Create region {} with options: {:?}, default_flat_format: {}",
313            region_id, options, config.default_flat_format
314        );
315
316        let version = VersionBuilder::new(metadata, mutable)
317            .options(options)
318            .build();
319        let version_control = Arc::new(VersionControl::new(version));
320        let access_layer = Arc::new(AccessLayer::new(
321            self.table_dir.clone(),
322            self.path_type,
323            object_store,
324            self.puffin_manager_factory,
325            self.intermediate_manager,
326        ));
327        let now = self.time_provider.current_time_millis();
328
329        Ok(Arc::new(MitoRegion {
330            region_id,
331            version_control,
332            access_layer: access_layer.clone(),
333            // Region is writable after it is created.
334            manifest_ctx: Arc::new(ManifestContext::new(
335                manifest_manager,
336                RegionRoleState::Leader(RegionLeaderState::Writable),
337            )),
338            file_purger: create_file_purger(
339                config.gc.enable,
340                self.path_type,
341                self.purge_scheduler,
342                access_layer,
343                self.cache_manager,
344                self.file_ref_manager.clone(),
345            ),
346            provider,
347            last_flush_millis: AtomicI64::new(now),
348            last_schedule_compaction_millis: AtomicI64::new(now),
349            time_provider: self.time_provider.clone(),
350            topic_latest_entry_id: AtomicU64::new(0),
351            written_bytes: Arc::new(AtomicU64::new(0)),
352            stats: self.stats,
353        }))
354    }
355
356    /// Opens an existing region in read only mode.
357    ///
358    /// Returns error if the region doesn't exist.
359    pub(crate) async fn open<S: LogStore>(
360        mut self,
361        config: &MitoConfig,
362        wal: &Wal<S>,
363    ) -> Result<MitoRegionRef> {
364        let region_id = self.region_id;
365        let region_dir = self.region_dir();
366        let region = self
367            .maybe_open(config, wal)
368            .await?
369            .with_context(|| EmptyRegionDirSnafu {
370                region_id,
371                region_dir: &region_dir,
372            })?;
373
374        ensure!(
375            region.region_id == self.region_id,
376            RegionCorruptedSnafu {
377                region_id: self.region_id,
378                reason: format!(
379                    "recovered region has different region id {}",
380                    region.region_id
381                ),
382            }
383        );
384
385        Ok(region)
386    }
387
388    fn provider<S: LogStore>(&self, wal_options: &WalOptions) -> Result<Provider> {
389        match wal_options {
390            WalOptions::RaftEngine => {
391                ensure!(
392                    TypeId::of::<RaftEngineLogStore>() == TypeId::of::<S>()
393                        || TypeId::of::<NoopLogStore>() == TypeId::of::<S>(),
394                    error::IncompatibleWalProviderChangeSnafu {
395                        global: "`kafka`",
396                        region: "`raft_engine`",
397                    }
398                );
399                Ok(Provider::raft_engine_provider(self.region_id.as_u64()))
400            }
401            WalOptions::Kafka(options) => {
402                ensure!(
403                    TypeId::of::<KafkaLogStore>() == TypeId::of::<S>()
404                        || TypeId::of::<NoopLogStore>() == TypeId::of::<S>(),
405                    error::IncompatibleWalProviderChangeSnafu {
406                        global: "`raft_engine`",
407                        region: "`kafka`",
408                    }
409                );
410                Ok(Provider::kafka_provider(options.topic.clone()))
411            }
412            WalOptions::Noop => Ok(Provider::noop_provider()),
413        }
414    }
415
416    /// Tries to open the region and returns `None` if the region directory is empty.
417    async fn maybe_open<S: LogStore>(
418        &mut self,
419        config: &MitoConfig,
420        wal: &Wal<S>,
421    ) -> Result<Option<MitoRegionRef>> {
422        let now = Instant::now();
423        let mut region_options = self.options.as_ref().unwrap().clone();
424        let object_storage = get_object_store(&region_options.storage, &self.object_store_manager)?;
425        let mut region_manifest_options =
426            RegionManifestOptions::new(config, &self.region_dir(), &object_storage);
427        // Set manifest cache if available
428        region_manifest_options.manifest_cache = self
429            .cache_manager
430            .as_ref()
431            .and_then(|cm| cm.write_cache())
432            .and_then(|wc| wc.manifest_cache());
433        let Some(manifest_manager) =
434            RegionManifestManager::open(region_manifest_options, &self.stats).await?
435        else {
436            return Ok(None);
437        };
438
439        // Backfill `partition_expr` if missing. Use the backfilled metadata in-memory during this open.
440        let manifest = manifest_manager.manifest();
441        let metadata = if manifest.metadata.partition_expr.is_none()
442            && let Some(expr_json) = self.partition_expr_fetcher.fetch_expr(self.region_id).await
443        {
444            let metadata = manifest.metadata.as_ref().clone();
445            let mut builder = RegionMetadataBuilder::from_existing(metadata);
446            builder.partition_expr_json(Some(expr_json));
447            Arc::new(builder.build().context(InvalidMetadataSnafu)?)
448        } else {
449            manifest.metadata.clone()
450        };
451        // Updates the region options with the manifest.
452        sanitize_region_options(&manifest, &mut region_options);
453
454        let region_id = self.region_id;
455        let provider = self.provider::<S>(&region_options.wal_options)?;
456        let wal_entry_reader = self
457            .wal_entry_reader
458            .take()
459            .unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
460        let on_region_opened = wal.on_region_opened();
461        let object_store = get_object_store(&region_options.storage, &self.object_store_manager)?;
462
463        debug!(
464            "Open region {} at {} with options: {:?}",
465            region_id, self.table_dir, self.options
466        );
467
468        let access_layer = Arc::new(AccessLayer::new(
469            self.table_dir.clone(),
470            self.path_type,
471            object_store,
472            self.puffin_manager_factory.clone(),
473            self.intermediate_manager.clone(),
474        ));
475        let file_purger = create_file_purger(
476            config.gc.enable,
477            self.path_type,
478            self.purge_scheduler.clone(),
479            access_layer.clone(),
480            self.cache_manager.clone(),
481            self.file_ref_manager.clone(),
482        );
483        // We should sanitize the region options before creating a new memtable.
484        let memtable_builder = self
485            .memtable_builder_provider
486            .builder_for_options(&region_options);
487        // Use compaction time window in the manifest if region doesn't provide
488        // the time window option.
489        let part_duration = region_options
490            .compaction
491            .time_window()
492            .or(manifest.compaction_time_window);
493        // Initial memtable id is 0.
494        let mutable = Arc::new(TimePartitions::new(
495            metadata.clone(),
496            memtable_builder.clone(),
497            0,
498            part_duration,
499        ));
500
501        // Updates region options by manifest before creating version.
502        let version_builder = version_builder_from_manifest(
503            &manifest,
504            metadata,
505            file_purger.clone(),
506            mutable,
507            region_options,
508        );
509        let version = version_builder.build();
510        let flushed_entry_id = version.flushed_entry_id;
511        let version_control = Arc::new(VersionControl::new(version));
512
513        let topic_latest_entry_id = if !self.skip_wal_replay {
514            let replay_from_entry_id = self
515                .replay_checkpoint
516                .unwrap_or_default()
517                .max(flushed_entry_id);
518            info!(
519                "Start replaying memtable at replay_from_entry_id: {} for region {}, manifest version: {}, flushed entry id: {}, elapsed: {:?}",
520                replay_from_entry_id,
521                region_id,
522                manifest.manifest_version,
523                flushed_entry_id,
524                now.elapsed()
525            );
526            replay_memtable(
527                &provider,
528                wal_entry_reader,
529                region_id,
530                replay_from_entry_id,
531                &version_control,
532                config.allow_stale_entries,
533                on_region_opened,
534            )
535            .await?;
536            // For remote WAL, we need to set topic_latest_entry_id to current topic's latest entry id.
537            // Only set after the WAL replay is completed.
538
539            if provider.is_remote_wal() && version_control.current().version.memtables.is_empty() {
540                wal.store().latest_entry_id(&provider).unwrap_or(0)
541            } else {
542                0
543            }
544        } else {
545            info!(
546                "Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}, elapsed: {:?}",
547                region_id,
548                manifest.manifest_version,
549                flushed_entry_id,
550                now.elapsed()
551            );
552
553            0
554        };
555
556        if let Some(committed_in_manifest) = manifest.committed_sequence {
557            let committed_after_replay = version_control.committed_sequence();
558            if committed_in_manifest > committed_after_replay {
559                info!(
560                    "Overriding committed sequence, region: {}, flushed_sequence: {}, committed_sequence: {} -> {}",
561                    self.region_id,
562                    version_control.current().version.flushed_sequence,
563                    version_control.committed_sequence(),
564                    committed_in_manifest
565                );
566                version_control.set_committed_sequence(committed_in_manifest);
567            }
568        }
569
570        let now = self.time_provider.current_time_millis();
571
572        let region = MitoRegion {
573            region_id: self.region_id,
574            version_control: version_control.clone(),
575            access_layer: access_layer.clone(),
576            // Region is always opened in read only mode.
577            manifest_ctx: Arc::new(ManifestContext::new(
578                manifest_manager,
579                RegionRoleState::Follower,
580            )),
581            file_purger,
582            provider: provider.clone(),
583            last_flush_millis: AtomicI64::new(now),
584            last_schedule_compaction_millis: AtomicI64::new(now),
585            time_provider: self.time_provider.clone(),
586            topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id),
587            written_bytes: Arc::new(AtomicU64::new(0)),
588            stats: self.stats.clone(),
589        };
590
591        let region = Arc::new(region);
592
593        maybe_load_cache(&region, config, &self.cache_manager);
594        maybe_preload_parquet_meta_cache(&region, config, &self.cache_manager);
595
596        Ok(Some(region))
597    }
598}
599
600/// Creates a version builder from a region manifest.
601pub(crate) fn version_builder_from_manifest(
602    manifest: &RegionManifest,
603    metadata: RegionMetadataRef,
604    file_purger: FilePurgerRef,
605    mutable: TimePartitionsRef,
606    region_options: RegionOptions,
607) -> VersionBuilder {
608    VersionBuilder::new(metadata, mutable)
609        .add_files(file_purger, manifest.files.values().cloned())
610        .flushed_entry_id(manifest.flushed_entry_id)
611        .flushed_sequence(manifest.flushed_sequence)
612        .truncated_entry_id(manifest.truncated_entry_id)
613        .compaction_time_window(manifest.compaction_time_window)
614        .options(region_options)
615}
616
617/// Updates region options by persistent options.
618pub(crate) fn sanitize_region_options(manifest: &RegionManifest, options: &mut RegionOptions) {
619    // The caller-supplied options win when they specify an explicit `sst_format`,
620    // so re-parsed options (e.g. bulk memtable forcing flat format) take effect on
621    // the running region instead of being clobbered by the persisted manifest value.
622    // Fall back to the manifest only when the caller did not provide a value.
623    match options.sst_format {
624        Some(format) if format != manifest.sst_format => {
625            common_telemetry::warn!(
626                "Overriding SST format from {:?} (manifest) to {:?} (options) for region {}",
627                manifest.sst_format,
628                format,
629                manifest.metadata.region_id,
630            );
631        }
632        Some(_) => {}
633        None => {
634            options.sst_format = Some(manifest.sst_format);
635        }
636    }
637    if let Some(manifest_append_mode) = manifest.append_mode
638        && options.append_mode != manifest_append_mode
639    {
640        common_telemetry::warn!(
641            "Overriding append_mode from {} to {} for region {}",
642            options.append_mode,
643            manifest_append_mode,
644            manifest.metadata.region_id,
645        );
646        options.append_mode = manifest_append_mode;
647    }
648    if options.append_mode && options.merge_mode.take().is_some() {
649        common_telemetry::warn!(
650            "Ignoring merge_mode because append_mode is enabled for region {}",
651            manifest.metadata.region_id,
652        );
653    }
654}
655
656/// Sanitizes open request options before parsing.
657pub(crate) fn sanitize_open_request_options(options: &mut HashMap<String, String>) {
658    let append_mode_enabled = options
659        .get("append_mode")
660        .is_some_and(|v| matches!(v.trim().to_ascii_lowercase().as_str(), "true" | "1"));
661
662    if append_mode_enabled && options.remove("merge_mode").is_some() {
663        common_telemetry::warn!(
664            "Ignoring merge_mode in open request options because append_mode is enabled"
665        );
666    }
667}
668
669/// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store.
670pub fn get_object_store(
671    name: &Option<String>,
672    object_store_manager: &ObjectStoreManagerRef,
673) -> Result<object_store::ObjectStore> {
674    if let Some(name) = name {
675        Ok(object_store_manager
676            .find(name)
677            .with_context(|| ObjectStoreNotFoundSnafu {
678                object_store: name.clone(),
679            })?
680            .clone())
681    } else {
682        Ok(object_store_manager.default_object_store().clone())
683    }
684}
685
686/// Checks whether the recovered region has the same schema as region to create.
687pub(crate) fn check_recovered_region(
688    recovered: &RegionMetadata,
689    region_id: RegionId,
690    column_metadatas: &[ColumnMetadata],
691    primary_key: &[ColumnId],
692) -> Result<()> {
693    if recovered.region_id != region_id {
694        error!(
695            "Recovered region {}, expect region {}",
696            recovered.region_id, region_id
697        );
698        return RegionCorruptedSnafu {
699            region_id,
700            reason: format!(
701                "recovered metadata has different region id {}",
702                recovered.region_id
703            ),
704        }
705        .fail();
706    }
707    if recovered.column_metadatas != column_metadatas {
708        error!(
709            "Unexpected schema in recovered region {}, recovered: {:?}, expect: {:?}",
710            recovered.region_id, recovered.column_metadatas, column_metadatas
711        );
712
713        return RegionCorruptedSnafu {
714            region_id,
715            reason: "recovered metadata has different schema",
716        }
717        .fail();
718    }
719    if recovered.primary_key != primary_key {
720        error!(
721            "Unexpected primary key in recovered region {}, recovered: {:?}, expect: {:?}",
722            recovered.region_id, recovered.primary_key, primary_key
723        );
724
725        return RegionCorruptedSnafu {
726            region_id,
727            reason: "recovered metadata has different primary key",
728        }
729        .fail();
730    }
731
732    Ok(())
733}
734
735/// Replays the mutations from WAL and inserts mutations to memtable of given region.
736pub(crate) async fn replay_memtable<F>(
737    provider: &Provider,
738    mut wal_entry_reader: Box<dyn WalEntryReader>,
739    region_id: RegionId,
740    flushed_entry_id: EntryId,
741    version_control: &VersionControlRef,
742    allow_stale_entries: bool,
743    on_region_opened: F,
744) -> Result<EntryId>
745where
746    F: FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> + Send,
747{
748    let now = Instant::now();
749    let mut rows_replayed = 0;
750    // Last entry id should start from flushed entry id since there might be no
751    // data in the WAL.
752    let mut last_entry_id = flushed_entry_id;
753    let replay_from_entry_id = flushed_entry_id + 1;
754
755    let mut wal_stream = wal_entry_reader.read(provider, replay_from_entry_id)?;
756    while let Some(res) = wal_stream.next().await {
757        let (entry_id, entry) = res?;
758        if entry_id <= flushed_entry_id {
759            warn!(
760                "Stale WAL entries read during replay, region id: {}, flushed entry id: {}, entry id read: {}",
761                region_id, flushed_entry_id, entry_id
762            );
763            ensure!(
764                allow_stale_entries,
765                StaleLogEntrySnafu {
766                    region_id,
767                    flushed_entry_id,
768                    unexpected_entry_id: entry_id,
769                }
770            );
771        }
772        last_entry_id = last_entry_id.max(entry_id);
773
774        let mut region_write_ctx = RegionWriteCtx::new(
775            region_id,
776            version_control,
777            provider.clone(),
778            // For WAL replay, we don't need to track the write bytes rate.
779            None,
780        );
781        for mutation in entry.mutations {
782            rows_replayed += mutation
783                .rows
784                .as_ref()
785                .map(|rows| rows.rows.len())
786                .unwrap_or(0);
787            region_write_ctx.push_mutation(
788                mutation.op_type,
789                mutation.rows,
790                mutation.write_hint,
791                OptionOutputTx::none(),
792                // We should respect the sequence in WAL during replay.
793                Some(mutation.sequence),
794            );
795        }
796
797        for bulk_entry in entry.bulk_entries {
798            let part = BulkPart::try_from(bulk_entry)?;
799            rows_replayed += part.num_rows();
800            // During replay, we should adopt the sequence from WAL.
801            let bulk_sequence_from_wal = part.sequence;
802            ensure!(
803                region_write_ctx.push_bulk(
804                    OptionOutputTx::none(),
805                    part,
806                    Some(bulk_sequence_from_wal)
807                ),
808                RegionCorruptedSnafu {
809                    region_id,
810                    reason: "unable to replay memtable with bulk entries",
811                }
812            );
813        }
814
815        // set next_entry_id and write to memtable.
816        region_write_ctx.set_next_entry_id(last_entry_id + 1);
817        region_write_ctx.write_memtable().await;
818        region_write_ctx.write_bulk().await;
819    }
820
821    // TODO(weny): We need to update `flushed_entry_id` in the region manifest
822    // to avoid reading potentially incomplete entries in the future.
823    (on_region_opened)(region_id, flushed_entry_id, provider).await?;
824
825    let series_count = version_control.current().series_count();
826    info!(
827        "Replay WAL for region: {}, provider: {:?}, rows recovered: {}, replay from entry id: {}, last entry id: {}, total timeseries replayed: {}, elapsed: {:?}",
828        region_id,
829        provider,
830        rows_replayed,
831        replay_from_entry_id,
832        last_entry_id,
833        series_count,
834        now.elapsed()
835    );
836    Ok(last_entry_id)
837}
838
839/// A task to load and fill the region file cache.
840pub(crate) struct RegionLoadCacheTask {
841    region: MitoRegionRef,
842}
843
844impl RegionLoadCacheTask {
845    pub(crate) fn new(region: MitoRegionRef) -> Self {
846        Self { region }
847    }
848
849    /// Fills the file cache with index files from the region.
850    pub(crate) async fn fill_cache(&self, file_cache: &FileCache) {
851        let region_id = self.region.region_id;
852        let table_dir = self.region.access_layer.table_dir();
853        let path_type = self.region.access_layer.path_type();
854        let object_store = self.region.access_layer.object_store();
855        let version_control = &self.region.version_control;
856
857        // Collects IndexKeys, file sizes, and max timestamps for files that need to be downloaded
858        let mut files_to_download = Vec::new();
859        let mut files_already_cached = 0;
860
861        {
862            let version = version_control.current().version;
863            for level in version.ssts.levels() {
864                for file_handle in level.files.values() {
865                    let file_meta = file_handle.meta_ref();
866                    if file_meta.exists_index() {
867                        let puffin_key = IndexKey::new(
868                            file_meta.region_id,
869                            file_meta.file_id,
870                            FileType::Puffin(file_meta.index_version),
871                        );
872
873                        if !file_cache.contains_key(&puffin_key) {
874                            files_to_download.push((
875                                puffin_key,
876                                file_meta.index_file_size,
877                                file_meta.time_range.1, // max timestamp
878                            ));
879                        } else {
880                            files_already_cached += 1;
881                        }
882                    }
883                }
884            }
885            // Releases the Version after the scope to avoid holding the memtables and file handles
886            // for a long time.
887        }
888
889        // Sorts files by max timestamp in descending order to loads latest files first
890        files_to_download.sort_by_key(|b| std::cmp::Reverse(b.2));
891
892        let total_files = files_to_download.len() as i64;
893
894        info!(
895            "Starting background index cache preload for region {}, total_files_to_download: {}, files_already_cached: {}",
896            region_id, total_files, files_already_cached
897        );
898
899        CACHE_FILL_PENDING_FILES.add(total_files);
900
901        let mut files_downloaded = 0;
902        let mut files_skipped = 0;
903
904        for (puffin_key, file_size, max_timestamp) in files_to_download {
905            let current_size = file_cache.puffin_cache_size();
906            let capacity = file_cache.puffin_cache_capacity();
907            let region_state = self.region.state();
908            if !can_load_cache(region_state) {
909                info!(
910                    "Stopping index cache by state: {:?}, region: {}, current_size: {}, capacity: {}",
911                    region_state, region_id, current_size, capacity
912                );
913                break;
914            }
915
916            // Checks if adding this file would exceed capacity
917            if current_size + file_size > capacity {
918                info!(
919                    "Stopping index cache preload due to capacity limit, region: {}, file_id: {}, current_size: {}, file_size: {}, capacity: {}, file_timestamp: {:?}",
920                    region_id, puffin_key.file_id, current_size, file_size, capacity, max_timestamp
921                );
922                files_skipped = (total_files - files_downloaded) as usize;
923                CACHE_FILL_PENDING_FILES.sub(total_files - files_downloaded);
924                break;
925            }
926
927            let index_version = if let FileType::Puffin(version) = puffin_key.file_type {
928                version
929            } else {
930                unreachable!("`files_to_download` should only contains Puffin files");
931            };
932            let index_id = RegionIndexId::new(
933                RegionFileId::new(puffin_key.region_id, puffin_key.file_id),
934                index_version,
935            );
936
937            let index_remote_path = location::index_file_path(table_dir, index_id, path_type);
938
939            match file_cache
940                .download(puffin_key, &index_remote_path, object_store, file_size)
941                .await
942            {
943                Ok(_) => {
944                    debug!(
945                        "Downloaded index file to write cache, region: {}, file_id: {}",
946                        region_id, puffin_key.file_id
947                    );
948                    files_downloaded += 1;
949                    CACHE_FILL_DOWNLOADED_FILES.inc_by(1);
950                    CACHE_FILL_PENDING_FILES.dec();
951                }
952                Err(e) => {
953                    warn!(
954                        e; "Failed to download index file to write cache, region: {}, file_id: {}",
955                        region_id, puffin_key.file_id
956                    );
957                    CACHE_FILL_PENDING_FILES.dec();
958                }
959            }
960        }
961
962        info!(
963            "Completed background cache fill task for region {}, total_files: {}, files_downloaded: {}, files_already_cached: {}, files_skipped: {}",
964            region_id, total_files, files_downloaded, files_already_cached, files_skipped
965        );
966    }
967}
968
969/// Loads all index (Puffin) files from the version into the write cache.
970fn maybe_load_cache(
971    region: &MitoRegionRef,
972    config: &MitoConfig,
973    cache_manager: &Option<CacheManagerRef>,
974) {
975    let Some(cache_manager) = cache_manager else {
976        return;
977    };
978    let Some(write_cache) = cache_manager.write_cache() else {
979        return;
980    };
981
982    let preload_enabled = config.preload_index_cache;
983    if !preload_enabled {
984        return;
985    }
986
987    let task = RegionLoadCacheTask::new(region.clone());
988    write_cache.load_region_cache(task);
989}
990
991/// Preloads Parquet metadata into the in-memory SST meta cache on region open.
992///
993/// This improves the latency of the first query after server start by avoiding large Parquet
994/// metadata reads on demand.
995///
996/// The preload is best-effort:
997/// - Always tries to warm from the local write cache (file cache) first.
998/// - If the region storage backend is local filesystem (`Scheme::Fs`), it may also load metadata
999///   directly from the local store.
1000/// - It will not fetch metadata from remote object stores (S3/GCS/OSS/...).
1001#[allow(clippy::too_many_arguments)]
1002async fn preload_parquet_meta_cache_for_files(
1003    region_id: RegionId,
1004    cache_manager: CacheManagerRef,
1005    sst_meta_cache_capacity: u64,
1006    table_dir: String,
1007    path_type: PathType,
1008    object_store: object_store::ObjectStore,
1009    region_metadata: RegionMetadataRef,
1010    mut files: Vec<FileHandle>,
1011) -> usize {
1012    if !cache_manager.sst_meta_cache_enabled()
1013        || sst_meta_cache_capacity == 0
1014        || cache_manager.sst_meta_cache_weighted_size() >= sst_meta_cache_capacity
1015    {
1016        return 0;
1017    }
1018
1019    let allow_direct_load = object_store.info().scheme() == object_store::services::FS_SCHEME;
1020
1021    // Sort by time range so we can prefer preloading newer files first.
1022    files.sort_by_key(|b| std::cmp::Reverse(b.meta_ref().time_range.1));
1023
1024    let mut loaded = 0usize;
1025    for file_handle in files {
1026        // Stop when the shared SST meta cache is full.
1027        if cache_manager.sst_meta_cache_weighted_size() >= sst_meta_cache_capacity {
1028            break;
1029        }
1030
1031        let file_id = file_handle.file_id();
1032        let mut cache_metrics = MetadataCacheMetrics::default();
1033        if let Some(metadata) = cache_manager
1034            .get_parquet_meta_data(file_id, &mut cache_metrics, Default::default())
1035            .await
1036        {
1037            if file_handle.primary_key_range().is_none()
1038                && let Some(primary_key_range) =
1039                    extract_primary_key_range(&metadata, &region_metadata)
1040            {
1041                file_handle.set_primary_key_range(primary_key_range);
1042            }
1043            // Metadata is either already in memory or loaded from file cache.
1044            if cache_metrics.mem_cache_hit == 0 {
1045                loaded += 1;
1046            }
1047            continue;
1048        }
1049
1050        if !allow_direct_load {
1051            continue;
1052        }
1053
1054        let file_size = file_handle.meta_ref().file_size;
1055        let file_path = file_handle.file_path(&table_dir, path_type);
1056        let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
1057        match loader.load(&mut cache_metrics).await {
1058            Ok(metadata) => {
1059                if let Some(primary_key_range) =
1060                    extract_primary_key_range(&metadata, &region_metadata)
1061                {
1062                    file_handle.set_primary_key_range(primary_key_range);
1063                }
1064                cache_manager.put_parquet_meta_data(file_id, Arc::new(metadata), None);
1065                loaded += 1;
1066            }
1067            Err(err) => {
1068                // Preloading is best-effort. Failure shouldn't affect region open.
1069                warn!(
1070                    err; "Failed to preload parquet metadata from local store, region: {}, file: {}",
1071                    region_id, file_path
1072                );
1073            }
1074        }
1075    }
1076
1077    loaded
1078}
1079
1080fn maybe_preload_parquet_meta_cache(
1081    region: &MitoRegionRef,
1082    config: &MitoConfig,
1083    cache_manager: &Option<CacheManagerRef>,
1084) {
1085    let Some(cache_manager) = cache_manager else {
1086        return;
1087    };
1088    if !cache_manager.sst_meta_cache_enabled() {
1089        return;
1090    }
1091
1092    // Skip if SST meta cache is disabled.
1093    if config.sst_meta_cache_size.as_bytes() == 0 {
1094        return;
1095    }
1096    if !config.preload_index_cache {
1097        return;
1098    }
1099
1100    let region = region.clone();
1101    let cache_manager = cache_manager.clone();
1102    let sst_meta_cache_capacity = config.sst_meta_cache_size.as_bytes();
1103
1104    tokio::spawn(async move {
1105        // Safety: semaphore must exist.
1106        let _permit = PARQUET_META_PRELOAD_SEMAPHORE.acquire().await.unwrap();
1107
1108        let region_id = region.region_id;
1109        let table_dir = region.access_layer.table_dir().to_string();
1110        let path_type = region.access_layer.path_type();
1111        let object_store = region.access_layer.object_store().clone();
1112        let region_metadata = region.version_control.current().version.metadata.clone();
1113
1114        // Collect SST files. Do not hold the version longer than needed.
1115        let mut files = Vec::new();
1116        {
1117            let version = region.version_control.current().version;
1118            for level in version.ssts.levels() {
1119                for file_handle in level.files.values() {
1120                    files.push(file_handle.clone());
1121                }
1122            }
1123        }
1124        let preloading_start = Instant::now();
1125        let loaded = preload_parquet_meta_cache_for_files(
1126            region_id,
1127            cache_manager,
1128            sst_meta_cache_capacity,
1129            table_dir,
1130            path_type,
1131            object_store,
1132            region_metadata,
1133            files,
1134        )
1135        .await;
1136        let preloading_cost = preloading_start.elapsed();
1137
1138        if loaded > 0 {
1139            info!(
1140                "Preloaded parquet metadata for region {}, loaded_files: {}, elapsed_ms: {}",
1141                region_id,
1142                loaded,
1143                preloading_cost.as_millis()
1144            );
1145        }
1146    });
1147}
1148
1149fn can_load_cache(state: RegionRoleState) -> bool {
1150    match state {
1151        RegionRoleState::Leader(RegionLeaderState::Writable)
1152        | RegionRoleState::Leader(RegionLeaderState::Staging)
1153        | RegionRoleState::Leader(RegionLeaderState::Altering)
1154        | RegionRoleState::Leader(RegionLeaderState::EnteringStaging)
1155        | RegionRoleState::Leader(RegionLeaderState::Editing)
1156        | RegionRoleState::Follower => true,
1157        // The region will be closed soon if it is downgrading.
1158        RegionRoleState::Leader(RegionLeaderState::Downgrading)
1159        | RegionRoleState::Leader(RegionLeaderState::Dropping)
1160        | RegionRoleState::Leader(RegionLeaderState::Truncating) => false,
1161    }
1162}
1163
1164#[cfg(test)]
1165mod tests {
1166    use std::collections::HashMap;
1167    use std::sync::Arc;
1168
1169    use common_base::readable_size::ReadableSize;
1170    use common_test_util::temp_dir::create_temp_dir;
1171    use common_time::Timestamp;
1172    use datatypes::arrow::array::{ArrayRef, BinaryArray, Int64Array};
1173    use datatypes::arrow::record_batch::RecordBatch;
1174    use object_store::ObjectStore;
1175    use object_store::services::{Fs, Memory};
1176    use parquet::arrow::ArrowWriter;
1177    use parquet::file::metadata::KeyValue;
1178    use parquet::file::properties::WriterProperties;
1179    use store_api::region_request::PathType;
1180    use store_api::storage::{FileId, RegionId};
1181
1182    use super::{preload_parquet_meta_cache_for_files, sanitize_region_options};
1183    use crate::cache::CacheManager;
1184    use crate::cache::file_cache::{FileType, IndexKey};
1185    use crate::manifest::action::{RegionManifest, RemovedFilesRecord};
1186    use crate::region::options::RegionOptions;
1187    use crate::sst::FormatType;
1188    use crate::sst::file::{FileHandle, FileMeta};
1189    use crate::sst::file_purger::NoopFilePurger;
1190    use crate::sst::parquet::PARQUET_METADATA_KEY;
1191    use crate::test_util::TestEnv;
1192    use crate::test_util::sst_util::sst_region_metadata;
1193
1194    fn build_test_manifest(sst_format: FormatType) -> RegionManifest {
1195        RegionManifest {
1196            metadata: Arc::new(sst_region_metadata()),
1197            files: HashMap::new(),
1198            removed_files: RemovedFilesRecord::default(),
1199            flushed_entry_id: 0,
1200            flushed_sequence: 0,
1201            committed_sequence: None,
1202            manifest_version: 0,
1203            truncated_entry_id: None,
1204            compaction_time_window: None,
1205            sst_format,
1206            append_mode: None,
1207        }
1208    }
1209
1210    #[test]
1211    fn test_sanitize_region_options_options_format_wins() {
1212        // Manifest persisted PrimaryKey, but the re-parsed options now request Flat
1213        // (e.g., bulk memtable). The options' value must win.
1214        let manifest = build_test_manifest(FormatType::PrimaryKey);
1215        let mut options = RegionOptions {
1216            sst_format: Some(FormatType::Flat),
1217            ..Default::default()
1218        };
1219        sanitize_region_options(&manifest, &mut options);
1220        assert_eq!(options.sst_format, Some(FormatType::Flat));
1221    }
1222
1223    #[test]
1224    fn test_sanitize_region_options_fills_from_manifest_when_unset() {
1225        // When the caller didn't specify sst_format, fall back to the manifest value
1226        // so downstream code always sees a concrete format.
1227        let manifest = build_test_manifest(FormatType::Flat);
1228        let mut options = RegionOptions {
1229            sst_format: None,
1230            ..Default::default()
1231        };
1232        sanitize_region_options(&manifest, &mut options);
1233        assert_eq!(options.sst_format, Some(FormatType::Flat));
1234    }
1235
1236    fn sst_parquet_bytes(batch: &RecordBatch) -> Vec<u8> {
1237        let key_value_meta = KeyValue::new(
1238            PARQUET_METADATA_KEY.to_string(),
1239            sst_region_metadata().to_json().unwrap(),
1240        );
1241        let props = WriterProperties::builder()
1242            .set_key_value_metadata(Some(vec![key_value_meta]))
1243            .build();
1244
1245        let mut parquet_bytes = Vec::new();
1246        let mut writer =
1247            ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), Some(props)).unwrap();
1248        writer.write(batch).unwrap();
1249        writer.close().unwrap();
1250
1251        parquet_bytes
1252    }
1253
1254    #[tokio::test]
1255    async fn test_preload_parquet_meta_cache_uses_file_cache() {
1256        let env = TestEnv::new().await;
1257
1258        let local_store = ObjectStore::new(Memory::default()).unwrap().finish();
1259        let write_cache = env
1260            .create_write_cache(local_store, ReadableSize::mb(1024))
1261            .await;
1262        let cache_manager = Arc::new(
1263            CacheManager::builder()
1264                .sst_meta_cache_size(1024 * 1024)
1265                .write_cache(Some(write_cache.clone()))
1266                .build(),
1267        );
1268
1269        let region_id = RegionId::new(1, 1);
1270        let file_id = FileId::random();
1271
1272        let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
1273        let primary_key = Arc::new(BinaryArray::from_iter_values([b"a", b"b", b"c"])) as ArrayRef;
1274        let batch = RecordBatch::try_from_iter([
1275            ("col", col),
1276            (
1277                store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME,
1278                primary_key,
1279            ),
1280        ])
1281        .unwrap();
1282        let parquet_bytes = sst_parquet_bytes(&batch);
1283        let file_size = parquet_bytes.len() as u64;
1284
1285        let file_meta = FileMeta {
1286            region_id,
1287            file_id,
1288            time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)),
1289            level: 0,
1290            file_size,
1291            max_row_group_uncompressed_size: 0,
1292            available_indexes: Default::default(),
1293            indexes: vec![],
1294            index_file_size: 0,
1295            index_version: 0,
1296            num_rows: 3,
1297            num_row_groups: 1,
1298            sequence: None,
1299            partition_expr: None,
1300            num_series: 0,
1301            ..Default::default()
1302        };
1303        let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
1304
1305        let table_dir = "test_table";
1306        let path_type = PathType::Bare;
1307        let remote_path = file_handle.file_path(table_dir, path_type);
1308
1309        let source_store = ObjectStore::new(Memory::default()).unwrap().finish();
1310        source_store
1311            .write(&remote_path, parquet_bytes)
1312            .await
1313            .unwrap();
1314
1315        // Put the parquet file into the write cache, so file cache contains metadata.
1316        let index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
1317        write_cache
1318            .file_cache()
1319            .download(index_key, &remote_path, &source_store, file_size)
1320            .await
1321            .unwrap();
1322
1323        let region_file_id = file_handle.file_id();
1324        assert!(
1325            cache_manager
1326                .get_parquet_meta_data_from_mem_cache(region_file_id)
1327                .is_none()
1328        );
1329
1330        let loaded = preload_parquet_meta_cache_for_files(
1331            region_id,
1332            cache_manager.clone(),
1333            1024 * 1024,
1334            table_dir.to_string(),
1335            path_type,
1336            source_store.clone(),
1337            Arc::new(sst_region_metadata()),
1338            vec![file_handle.clone()],
1339        )
1340        .await;
1341
1342        // Should warm the in-memory cache from the local file cache.
1343        assert_eq!(loaded, 1);
1344        assert!(
1345            cache_manager
1346                .get_parquet_meta_data_from_mem_cache(region_file_id)
1347                .is_some()
1348        );
1349        assert!(file_handle.primary_key_range().is_some());
1350    }
1351
1352    #[tokio::test]
1353    async fn test_preload_parquet_meta_cache_skips_files_not_in_file_cache() {
1354        let cache_manager = Arc::new(
1355            CacheManager::builder()
1356                .sst_meta_cache_size(1024 * 1024)
1357                .build(),
1358        );
1359
1360        let region_id = RegionId::new(1, 1);
1361        let file_id = FileId::random();
1362
1363        // Without a local file cache entry, preloading should skip the file.
1364        let file_meta = FileMeta {
1365            region_id,
1366            file_id,
1367            time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)),
1368            level: 0,
1369            file_size: 0,
1370            max_row_group_uncompressed_size: 0,
1371            available_indexes: Default::default(),
1372            indexes: vec![],
1373            index_file_size: 0,
1374            index_version: 0,
1375            num_rows: 3,
1376            num_row_groups: 1,
1377            sequence: None,
1378            partition_expr: None,
1379            num_series: 0,
1380            ..Default::default()
1381        };
1382        let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
1383
1384        let table_dir = "test_table";
1385        let path_type = PathType::Bare;
1386        let remote_path = file_handle.file_path(table_dir, path_type);
1387
1388        // Even if the remote object store has the file, we should not preload from it.
1389        let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
1390        object_store
1391            .write(&remote_path, b"noop".as_slice())
1392            .await
1393            .unwrap();
1394
1395        let region_file_id = file_handle.file_id();
1396        assert!(
1397            cache_manager
1398                .get_parquet_meta_data_from_mem_cache(region_file_id)
1399                .is_none()
1400        );
1401
1402        let loaded = preload_parquet_meta_cache_for_files(
1403            region_id,
1404            cache_manager.clone(),
1405            1024 * 1024,
1406            table_dir.to_string(),
1407            path_type,
1408            object_store,
1409            Arc::new(sst_region_metadata()),
1410            vec![file_handle],
1411        )
1412        .await;
1413
1414        assert_eq!(loaded, 0);
1415        assert!(
1416            cache_manager
1417                .get_parquet_meta_data_from_mem_cache(region_file_id)
1418                .is_none()
1419        );
1420    }
1421
1422    #[tokio::test]
1423    async fn test_preload_parquet_meta_cache_loads_from_local_fs() {
1424        let cache_manager = Arc::new(
1425            CacheManager::builder()
1426                .sst_meta_cache_size(1024 * 1024)
1427                .build(),
1428        );
1429
1430        let region_id = RegionId::new(1, 1);
1431        let file_id = FileId::random();
1432
1433        let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
1434        let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
1435        let parquet_bytes = sst_parquet_bytes(&batch);
1436
1437        // file_size is 0 when it's missing/defaulted in manifests; MetadataLoader::load will stat
1438        // the local filesystem to retrieve it.
1439        let file_meta = FileMeta {
1440            region_id,
1441            file_id,
1442            time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)),
1443            level: 0,
1444            file_size: 0,
1445            max_row_group_uncompressed_size: 0,
1446            available_indexes: Default::default(),
1447            indexes: vec![],
1448            index_file_size: 0,
1449            index_version: 0,
1450            num_rows: 3,
1451            num_row_groups: 1,
1452            sequence: None,
1453            partition_expr: None,
1454            num_series: 0,
1455            ..Default::default()
1456        };
1457        let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
1458
1459        let table_dir = "test_table";
1460        let path_type = PathType::Bare;
1461        let file_path = file_handle.file_path(table_dir, path_type);
1462
1463        let root = create_temp_dir("parquet-meta-preload");
1464        let object_store = ObjectStore::new(Fs::default().root(root.path().to_str().unwrap()))
1465            .unwrap()
1466            .finish();
1467        object_store.write(&file_path, parquet_bytes).await.unwrap();
1468
1469        let region_file_id = file_handle.file_id();
1470        assert!(
1471            cache_manager
1472                .get_parquet_meta_data_from_mem_cache(region_file_id)
1473                .is_none()
1474        );
1475
1476        let loaded = preload_parquet_meta_cache_for_files(
1477            region_id,
1478            cache_manager.clone(),
1479            1024 * 1024,
1480            table_dir.to_string(),
1481            path_type,
1482            object_store,
1483            Arc::new(sst_region_metadata()),
1484            vec![file_handle],
1485        )
1486        .await;
1487
1488        assert_eq!(loaded, 1);
1489        assert!(
1490            cache_manager
1491                .get_parquet_meta_data_from_mem_cache(region_file_id)
1492                .is_some()
1493        );
1494    }
1495}