Skip to main content

mito2/region/
opener.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Region opener.
16
17use std::any::TypeId;
18use std::collections::HashMap;
19use std::sync::atomic::{AtomicI64, AtomicU64};
20use std::sync::{Arc, LazyLock};
21use std::time::Instant;
22
23use common_telemetry::{debug, error, info, warn};
24use common_wal::options::WalOptions;
25use futures::StreamExt;
26use futures::future::BoxFuture;
27use log_store::kafka::log_store::KafkaLogStore;
28use log_store::noop::log_store::NoopLogStore;
29use log_store::raft_engine::log_store::RaftEngineLogStore;
30use object_store::ObjectStore;
31use object_store::manager::ObjectStoreManagerRef;
32use object_store::util::{is_object_storage, normalize_dir};
33use parquet::file::metadata::PageIndexPolicy;
34use snafu::{OptionExt, ResultExt, ensure};
35use store_api::logstore::LogStore;
36use store_api::logstore::provider::Provider;
37use store_api::metadata::{
38    ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
39};
40use store_api::region_engine::RegionRole;
41use store_api::region_request::{PathType, RegionRequirements};
42use store_api::storage::{ColumnId, RegionId};
43use tokio::sync::Semaphore;
44
45use crate::access_layer::AccessLayer;
46use crate::cache::CacheManagerRef;
47use crate::cache::file_cache::{FileCache, FileType, IndexKey};
48use crate::config::MitoConfig;
49use crate::engine::region_hook::RegionHookRef;
50use crate::error;
51use crate::error::{
52    EmptyRegionDirSnafu, InvalidMetadataSnafu, InvalidRegionOptionsSnafu, ObjectStoreNotFoundSnafu,
53    RegionCorruptedSnafu, Result, StaleLogEntrySnafu,
54};
55use crate::manifest::action::RegionManifest;
56use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
57use crate::memtable::MemtableBuilderProvider;
58use crate::memtable::bulk::part::BulkPart;
59use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef};
60use crate::metrics::{CACHE_FILL_DOWNLOADED_FILES, CACHE_FILL_PENDING_FILES};
61use crate::region::options::RegionOptions;
62use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
63use crate::region::{
64    ManifestContext, ManifestStats, MitoRegion, MitoRegionRef, RegionLeaderState, RegionRoleState,
65};
66use crate::region_write_ctx::RegionWriteCtx;
67use crate::request::OptionOutputTx;
68use crate::schedule::scheduler::SchedulerRef;
69use crate::sst::FormatType;
70use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId};
71use crate::sst::file_purger::{FilePurgerRef, create_file_purger};
72use crate::sst::file_ref::FileReferenceManagerRef;
73use crate::sst::index::intermediate::IntermediateManager;
74use crate::sst::index::puffin_manager::PuffinManagerFactory;
75use crate::sst::location::{self, region_dir_from_table_dir};
76use crate::sst::parquet::metadata::{MetadataLoader, extract_primary_key_range};
77use crate::sst::parquet::reader::MetadataCacheMetrics;
78use crate::time_provider::TimeProviderRef;
79use crate::wal::entry_reader::WalEntryReader;
80use crate::wal::{EntryId, Wal};
81
82const PARQUET_META_PRELOAD_CONCURRENCY: usize = 8;
83
84static PARQUET_META_PRELOAD_SEMAPHORE: LazyLock<Semaphore> =
85    LazyLock::new(|| Semaphore::new(PARQUET_META_PRELOAD_CONCURRENCY));
86
87fn initial_pruned_entry_id(wal_options: &WalOptions) -> EntryId {
88    match wal_options {
89        WalOptions::Kafka(options) => options.initial_pruned_entry_id.unwrap_or(0),
90        WalOptions::RaftEngine | WalOptions::Noop => 0,
91    }
92}
93
94/// A fetcher to retrieve partition expr for a region.
95///
96/// Compatibility: older regions didn't persist `partition_expr` in engine metadata,
97/// while newer ones do. On open, we backfill it via this fetcher and persist it
98/// to the manifest so future opens don't need refetching.
99#[async_trait::async_trait]
100pub trait PartitionExprFetcher {
101    async fn fetch_expr(&self, region_id: RegionId) -> Option<String>;
102}
103
104pub type PartitionExprFetcherRef = Arc<dyn PartitionExprFetcher + Send + Sync>;
105
106/// Builder to create a new [MitoRegion] or open an existing one.
107pub(crate) struct RegionOpener {
108    region_id: RegionId,
109    metadata_builder: Option<RegionMetadataBuilder>,
110    memtable_builder_provider: MemtableBuilderProvider,
111    object_store_manager: ObjectStoreManagerRef,
112    table_dir: String,
113    path_type: PathType,
114    purge_scheduler: SchedulerRef,
115    options: Option<RegionOptions>,
116    cache_manager: Option<CacheManagerRef>,
117    skip_wal_replay: bool,
118    puffin_manager_factory: PuffinManagerFactory,
119    intermediate_manager: IntermediateManager,
120    time_provider: TimeProviderRef,
121    stats: ManifestStats,
122    wal_entry_reader: Option<Box<dyn WalEntryReader>>,
123    replay_checkpoint: Option<u64>,
124    file_ref_manager: FileReferenceManagerRef,
125    partition_expr_fetcher: PartitionExprFetcherRef,
126    hook: Option<RegionHookRef>,
127}
128
129impl RegionOpener {
130    /// Returns a new opener.
131    // TODO(LFC): Reduce the number of arguments.
132    #[allow(clippy::too_many_arguments)]
133    pub(crate) fn new(
134        region_id: RegionId,
135        table_dir: &str,
136        path_type: PathType,
137        memtable_builder_provider: MemtableBuilderProvider,
138        object_store_manager: ObjectStoreManagerRef,
139        purge_scheduler: SchedulerRef,
140        puffin_manager_factory: PuffinManagerFactory,
141        intermediate_manager: IntermediateManager,
142        time_provider: TimeProviderRef,
143        file_ref_manager: FileReferenceManagerRef,
144        partition_expr_fetcher: PartitionExprFetcherRef,
145    ) -> RegionOpener {
146        RegionOpener {
147            region_id,
148            metadata_builder: None,
149            memtable_builder_provider,
150            object_store_manager,
151            table_dir: normalize_dir(table_dir),
152            path_type,
153            purge_scheduler,
154            options: None,
155            cache_manager: None,
156            skip_wal_replay: false,
157            puffin_manager_factory,
158            intermediate_manager,
159            time_provider,
160            stats: Default::default(),
161            wal_entry_reader: None,
162            replay_checkpoint: None,
163            file_ref_manager,
164            partition_expr_fetcher,
165            hook: None,
166        }
167    }
168
169    /// Sets the region hook for observing manifest mutations.
170    pub(crate) fn hook(mut self, hook: Option<RegionHookRef>) -> Self {
171        self.hook = hook;
172        self
173    }
174
175    /// Sets metadata builder of the region to create.
176    pub(crate) fn metadata_builder(mut self, builder: RegionMetadataBuilder) -> Self {
177        self.metadata_builder = Some(builder);
178        self
179    }
180
181    /// Computes the region directory from table_dir and region_id.
182    fn region_dir(&self) -> String {
183        region_dir_from_table_dir(&self.table_dir, self.region_id, self.path_type)
184    }
185
186    /// Builds the region metadata.
187    ///
188    /// # Panics
189    /// - Panics if `options` is not set.
190    /// - Panics if `metadata_builder` is not set.
191    fn build_metadata(&mut self) -> Result<RegionMetadata> {
192        let options = self.options.as_ref().unwrap();
193        let mut metadata_builder = self.metadata_builder.take().unwrap();
194        metadata_builder.primary_key_encoding(options.primary_key_encoding());
195        metadata_builder.build().context(InvalidMetadataSnafu)
196    }
197
198    /// Parses and sets options for the region.
199    pub(crate) fn parse_options(self, options: HashMap<String, String>) -> Result<Self> {
200        let region_id = self.region_id;
201        self.options(RegionOptions::try_from_options(region_id, &options)?)
202    }
203
204    /// Sets the replay checkpoint for the region.
205    pub(crate) fn replay_checkpoint(mut self, replay_checkpoint: Option<u64>) -> Self {
206        self.replay_checkpoint = replay_checkpoint;
207        self
208    }
209
210    /// If a [WalEntryReader] is set, the [RegionOpener] will use [WalEntryReader] instead of
211    /// constructing a new one from scratch.
212    pub(crate) fn wal_entry_reader(
213        mut self,
214        wal_entry_reader: Option<Box<dyn WalEntryReader>>,
215    ) -> Self {
216        self.wal_entry_reader = wal_entry_reader;
217        self
218    }
219
220    /// Sets options for the region.
221    pub(crate) fn options(mut self, options: RegionOptions) -> Result<Self> {
222        options.validate()?;
223        self.options = Some(options);
224        Ok(self)
225    }
226
227    /// Ensures the current region request satisfies its requirements.
228    pub(crate) fn ensure_region_requirements(
229        &self,
230        requirements: RegionRequirements,
231    ) -> Result<()> {
232        if !requirements.object_storage {
233            return Ok(());
234        }
235
236        let options = self.options.as_ref().context(InvalidRegionOptionsSnafu {
237            reason: "missing region options before requirement check".to_string(),
238        })?;
239        let object_store = get_object_store(&options.storage, &self.object_store_manager)?;
240
241        ensure!(
242            supports_open_region_object_storage_requirement(&object_store),
243            error::RegionRequirementSnafu {
244                region_id: self.region_id,
245                requirement: "object storage",
246                reason: "region data must be accessible from another datanode",
247            }
248        );
249
250        Ok(())
251    }
252
253    /// Sets the cache manager for the region.
254    pub(crate) fn cache(mut self, cache_manager: Option<CacheManagerRef>) -> Self {
255        self.cache_manager = cache_manager;
256        self
257    }
258
259    /// Sets the `skip_wal_replay`.
260    pub(crate) fn skip_wal_replay(mut self, skip: bool) -> Self {
261        self.skip_wal_replay = skip;
262        self
263    }
264
265    /// Writes region manifest and creates a new region if it does not exist.
266    /// Opens the region if it already exists.
267    ///
268    /// # Panics
269    /// - Panics if `metadata_builder` is not set.
270    /// - Panics if `options` is not set.
271    pub(crate) async fn create_or_open<S: LogStore>(
272        mut self,
273        config: &MitoConfig,
274        wal: &Wal<S>,
275    ) -> Result<MitoRegionRef> {
276        let region_id = self.region_id;
277        let region_dir = self.region_dir();
278        let metadata = self.build_metadata()?;
279        // Tries to open the region.
280        match self.maybe_open(config, wal).await {
281            Ok(Some(region)) => {
282                let recovered = region.metadata();
283                // Checks the schema of the region.
284                let expect = &metadata;
285                check_recovered_region(
286                    &recovered,
287                    expect.region_id,
288                    &expect.column_metadatas,
289                    &expect.primary_key,
290                )?;
291                // To keep consistency with Create behavior, set the opened Region to RegionRole::Leader.
292                region.set_role(RegionRole::Leader);
293
294                return Ok(region);
295            }
296            Ok(None) => {
297                debug!(
298                    "No data under directory {}, region_id: {}",
299                    region_dir, self.region_id
300                );
301            }
302            Err(e) => {
303                warn!(e;
304                    "Failed to open region {} before creating it, region_dir: {}",
305                    self.region_id, region_dir
306                );
307            }
308        }
309        // Safety: must be set before calling this method.
310        let mut options = self.options.take().unwrap();
311        let object_store = get_object_store(&options.storage, &self.object_store_manager)?;
312        let provider = self.provider::<S>(&options.wal_options)?;
313        let metadata = Arc::new(metadata);
314        // Sets the sst_format based on options or flat_format flag
315        let sst_format = if let Some(format) = options.sst_format {
316            format
317        } else if config.default_flat_format {
318            options.sst_format = Some(FormatType::Flat);
319            FormatType::Flat
320        } else {
321            // Default to PrimaryKeyParquet for newly created regions
322            options.sst_format = Some(FormatType::PrimaryKey);
323            FormatType::PrimaryKey
324        };
325        // Create a manifest manager for this region and writes regions to the manifest file.
326        let mut region_manifest_options =
327            RegionManifestOptions::new(config, &region_dir, &object_store);
328        // Set manifest cache if available
329        region_manifest_options.manifest_cache = self
330            .cache_manager
331            .as_ref()
332            .and_then(|cm| cm.write_cache())
333            .and_then(|wc| wc.manifest_cache());
334        // For remote WAL, we need to set flushed_entry_id to current topic's latest entry id.
335        // Kafka WAL allocation also carries the topic's pruned entry id as a create-time hint.
336        let flushed_entry_id = provider
337            .initial_flushed_entry_id::<S>(wal.store())
338            .max(initial_pruned_entry_id(&options.wal_options));
339        let manifest_manager = RegionManifestManager::new(
340            metadata.clone(),
341            flushed_entry_id,
342            region_manifest_options,
343            sst_format,
344            &self.stats,
345        )
346        .await?;
347
348        let memtable_builder = self.memtable_builder_provider.builder_for_options(&options);
349        let part_duration = options.compaction.time_window();
350        // Initial memtable id is 0.
351        let mutable = Arc::new(TimePartitions::new(
352            metadata.clone(),
353            memtable_builder.clone(),
354            0,
355            part_duration,
356        ));
357
358        debug!(
359            "Create region {} with options: {:?}, default_flat_format: {}",
360            region_id, options, config.default_flat_format
361        );
362
363        let version = VersionBuilder::new(metadata, mutable)
364            .options(options)
365            .flushed_entry_id(flushed_entry_id)
366            .build();
367        let version_control = Arc::new(VersionControl::new(version));
368        let access_layer = Arc::new(AccessLayer::new(
369            self.table_dir.clone(),
370            self.path_type,
371            object_store,
372            self.puffin_manager_factory,
373            self.intermediate_manager,
374        ));
375        let now = self.time_provider.current_time_millis();
376
377        Ok(Arc::new(MitoRegion {
378            region_id,
379            version_control,
380            access_layer: access_layer.clone(),
381            // Region is writable after it is created.
382            manifest_ctx: Arc::new(ManifestContext::new(
383                manifest_manager,
384                RegionRoleState::Leader(RegionLeaderState::Writable),
385                self.hook.clone(),
386            )),
387            file_purger: create_file_purger(
388                config.gc.enable,
389                self.path_type,
390                self.purge_scheduler,
391                access_layer,
392                self.cache_manager,
393                self.file_ref_manager.clone(),
394            ),
395            provider,
396            last_flush_millis: AtomicI64::new(now),
397            last_schedule_compaction_millis: AtomicI64::new(now),
398            time_provider: self.time_provider.clone(),
399            topic_latest_entry_id: AtomicU64::new(flushed_entry_id),
400            written_bytes: Arc::new(AtomicU64::new(0)),
401            stats: self.stats,
402        }))
403    }
404
405    /// Opens an existing region in read only mode.
406    ///
407    /// Returns error if the region doesn't exist.
408    pub(crate) async fn open<S: LogStore>(
409        mut self,
410        config: &MitoConfig,
411        wal: &Wal<S>,
412    ) -> Result<MitoRegionRef> {
413        let region_id = self.region_id;
414        let region_dir = self.region_dir();
415        let region = self
416            .maybe_open(config, wal)
417            .await?
418            .with_context(|| EmptyRegionDirSnafu {
419                region_id,
420                region_dir: &region_dir,
421            })?;
422
423        ensure!(
424            region.region_id == self.region_id,
425            RegionCorruptedSnafu {
426                region_id: self.region_id,
427                reason: format!(
428                    "recovered region has different region id {}",
429                    region.region_id
430                ),
431            }
432        );
433
434        Ok(region)
435    }
436
437    fn provider<S: LogStore>(&self, wal_options: &WalOptions) -> Result<Provider> {
438        match wal_options {
439            WalOptions::RaftEngine => {
440                ensure!(
441                    TypeId::of::<RaftEngineLogStore>() == TypeId::of::<S>()
442                        || TypeId::of::<NoopLogStore>() == TypeId::of::<S>(),
443                    error::IncompatibleWalProviderChangeSnafu {
444                        global: "`kafka`",
445                        region: "`raft_engine`",
446                    }
447                );
448                Ok(Provider::raft_engine_provider(self.region_id.as_u64()))
449            }
450            WalOptions::Kafka(options) => {
451                ensure!(
452                    TypeId::of::<KafkaLogStore>() == TypeId::of::<S>()
453                        || TypeId::of::<NoopLogStore>() == TypeId::of::<S>(),
454                    error::IncompatibleWalProviderChangeSnafu {
455                        global: "`raft_engine`",
456                        region: "`kafka`",
457                    }
458                );
459                Ok(Provider::kafka_provider(options.topic.clone()))
460            }
461            WalOptions::Noop => Ok(Provider::noop_provider()),
462        }
463    }
464
465    /// Tries to open the region and returns `None` if the region directory is empty.
466    async fn maybe_open<S: LogStore>(
467        &mut self,
468        config: &MitoConfig,
469        wal: &Wal<S>,
470    ) -> Result<Option<MitoRegionRef>> {
471        let now = Instant::now();
472        let mut region_options = self.options.as_ref().unwrap().clone();
473        let object_storage = get_object_store(&region_options.storage, &self.object_store_manager)?;
474        let mut region_manifest_options =
475            RegionManifestOptions::new(config, &self.region_dir(), &object_storage);
476        // Set manifest cache if available
477        region_manifest_options.manifest_cache = self
478            .cache_manager
479            .as_ref()
480            .and_then(|cm| cm.write_cache())
481            .and_then(|wc| wc.manifest_cache());
482        let Some(manifest_manager) =
483            RegionManifestManager::open(region_manifest_options, &self.stats).await?
484        else {
485            return Ok(None);
486        };
487
488        // Backfill `partition_expr` if missing. Use the backfilled metadata in-memory during this open.
489        let manifest = manifest_manager.manifest();
490        let metadata = if manifest.metadata.partition_expr.is_none()
491            && let Some(expr_json) = self.partition_expr_fetcher.fetch_expr(self.region_id).await
492        {
493            let metadata = manifest.metadata.as_ref().clone();
494            let mut builder = RegionMetadataBuilder::from_existing(metadata);
495            builder.partition_expr_json(Some(expr_json));
496            Arc::new(builder.build().context(InvalidMetadataSnafu)?)
497        } else {
498            manifest.metadata.clone()
499        };
500        // Updates the region options with the manifest.
501        sanitize_region_options(&manifest, &mut region_options);
502
503        let region_id = self.region_id;
504        let provider = self.provider::<S>(&region_options.wal_options)?;
505        let wal_entry_reader = self
506            .wal_entry_reader
507            .take()
508            .unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
509        let on_region_opened = wal.on_region_opened();
510        let object_store = get_object_store(&region_options.storage, &self.object_store_manager)?;
511
512        debug!(
513            "Open region {} at {} with options: {:?}",
514            region_id, self.table_dir, self.options
515        );
516
517        let access_layer = Arc::new(AccessLayer::new(
518            self.table_dir.clone(),
519            self.path_type,
520            object_store,
521            self.puffin_manager_factory.clone(),
522            self.intermediate_manager.clone(),
523        ));
524        let file_purger = create_file_purger(
525            config.gc.enable,
526            self.path_type,
527            self.purge_scheduler.clone(),
528            access_layer.clone(),
529            self.cache_manager.clone(),
530            self.file_ref_manager.clone(),
531        );
532        // We should sanitize the region options before creating a new memtable.
533        let memtable_builder = self
534            .memtable_builder_provider
535            .builder_for_options(&region_options);
536        // Use compaction time window in the manifest if region doesn't provide
537        // the time window option.
538        let part_duration = region_options
539            .compaction
540            .time_window()
541            .or(manifest.compaction_time_window);
542        // Initial memtable id is 0.
543        let mutable = Arc::new(TimePartitions::new(
544            metadata.clone(),
545            memtable_builder.clone(),
546            0,
547            part_duration,
548        ));
549
550        // Updates region options by manifest before creating version.
551        let version_builder = version_builder_from_manifest(
552            &manifest,
553            metadata,
554            file_purger.clone(),
555            mutable,
556            region_options,
557        );
558        let version = version_builder.build();
559        let flushed_entry_id = version.flushed_entry_id;
560        let version_control = Arc::new(VersionControl::new(version));
561
562        let replay_from_entry_id = self
563            .replay_checkpoint
564            .unwrap_or_default()
565            .max(flushed_entry_id);
566        let topic_latest_entry_id = if !self.skip_wal_replay {
567            info!(
568                "Start replaying memtable at replay_from_entry_id: {} for region {}, manifest version: {}, flushed entry id: {}, elapsed: {:?}",
569                replay_from_entry_id,
570                region_id,
571                manifest.manifest_version,
572                flushed_entry_id,
573                now.elapsed()
574            );
575            replay_memtable(
576                &provider,
577                wal_entry_reader,
578                region_id,
579                replay_from_entry_id,
580                &version_control,
581                config.allow_stale_entries,
582                on_region_opened,
583            )
584            .await?;
585            // For remote WAL, we need to set topic_latest_entry_id to current topic's latest entry id.
586            // Only set after the WAL replay is completed.
587
588            if provider.is_remote_wal() && version_control.current().version.memtables.is_empty() {
589                wal.store()
590                    .latest_entry_id(&provider)
591                    .unwrap_or(replay_from_entry_id)
592            } else if provider.is_remote_wal() {
593                replay_from_entry_id
594            } else {
595                0
596            }
597        } else {
598            info!(
599                "Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}, elapsed: {:?}",
600                region_id,
601                manifest.manifest_version,
602                flushed_entry_id,
603                now.elapsed()
604            );
605
606            if provider.is_remote_wal() {
607                replay_from_entry_id
608            } else {
609                0
610            }
611        };
612
613        if let Some(committed_in_manifest) = manifest.committed_sequence {
614            let committed_after_replay = version_control.committed_sequence();
615            if committed_in_manifest > committed_after_replay {
616                info!(
617                    "Overriding committed sequence, region: {}, flushed_sequence: {}, committed_sequence: {} -> {}",
618                    self.region_id,
619                    version_control.current().version.flushed_sequence,
620                    version_control.committed_sequence(),
621                    committed_in_manifest
622                );
623                version_control.set_committed_sequence(committed_in_manifest);
624            }
625        }
626
627        let now = self.time_provider.current_time_millis();
628
629        let region = MitoRegion {
630            region_id: self.region_id,
631            version_control: version_control.clone(),
632            access_layer: access_layer.clone(),
633            // Region is always opened in read only mode.
634            manifest_ctx: Arc::new(ManifestContext::new(
635                manifest_manager,
636                RegionRoleState::Follower,
637                self.hook.clone(),
638            )),
639            file_purger,
640            provider: provider.clone(),
641            last_flush_millis: AtomicI64::new(now),
642            last_schedule_compaction_millis: AtomicI64::new(now),
643            time_provider: self.time_provider.clone(),
644            topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id),
645            written_bytes: Arc::new(AtomicU64::new(0)),
646            stats: self.stats.clone(),
647        };
648
649        let region = Arc::new(region);
650
651        maybe_load_cache(&region, config, &self.cache_manager);
652        maybe_preload_parquet_meta_cache(&region, config, &self.cache_manager);
653
654        Ok(Some(region))
655    }
656}
657
658#[cfg(not(feature = "test-shared-fs-region-migration"))]
659fn supports_open_region_object_storage_requirement(object_store: &ObjectStore) -> bool {
660    is_object_storage(object_store)
661}
662
663#[cfg(feature = "test-shared-fs-region-migration")]
664fn supports_open_region_object_storage_requirement(object_store: &ObjectStore) -> bool {
665    // Integration tests can configure multiple datanodes to share the same
666    // temporary home dir. That makes file storage accessible to all test
667    // datanodes, but production file storage still does not satisfy this
668    // requirement.
669    is_object_storage(object_store)
670        || object_store.info().scheme() == object_store::services::FS_SCHEME
671}
672
673/// Creates a version builder from a region manifest.
674pub(crate) fn version_builder_from_manifest(
675    manifest: &RegionManifest,
676    metadata: RegionMetadataRef,
677    file_purger: FilePurgerRef,
678    mutable: TimePartitionsRef,
679    region_options: RegionOptions,
680) -> VersionBuilder {
681    VersionBuilder::new(metadata, mutable)
682        .add_files(file_purger, manifest.files.values().cloned())
683        .flushed_entry_id(manifest.flushed_entry_id)
684        .flushed_sequence(manifest.flushed_sequence)
685        .truncated_entry_id(manifest.truncated_entry_id)
686        .compaction_time_window(manifest.compaction_time_window)
687        .options(region_options)
688}
689
690/// Updates region options by persistent options.
691pub(crate) fn sanitize_region_options(manifest: &RegionManifest, options: &mut RegionOptions) {
692    // The caller-supplied options win when they specify an explicit `sst_format`,
693    // so re-parsed options (e.g. bulk memtable forcing flat format) take effect on
694    // the running region instead of being clobbered by the persisted manifest value.
695    // Fall back to the manifest only when the caller did not provide a value.
696    match options.sst_format {
697        Some(format) if format != manifest.sst_format => {
698            common_telemetry::warn!(
699                "Overriding SST format from {:?} (manifest) to {:?} (options) for region {}",
700                manifest.sst_format,
701                format,
702                manifest.metadata.region_id,
703            );
704        }
705        Some(_) => {}
706        None => {
707            options.sst_format = Some(manifest.sst_format);
708        }
709    }
710    if let Some(manifest_append_mode) = manifest.append_mode
711        && options.append_mode != manifest_append_mode
712    {
713        common_telemetry::warn!(
714            "Overriding append_mode from {} to {} for region {}",
715            options.append_mode,
716            manifest_append_mode,
717            manifest.metadata.region_id,
718        );
719        options.append_mode = manifest_append_mode;
720    }
721    if options.append_mode && options.merge_mode.take().is_some() {
722        common_telemetry::warn!(
723            "Ignoring merge_mode because append_mode is enabled for region {}",
724            manifest.metadata.region_id,
725        );
726    }
727}
728
729/// Sanitizes open request options before parsing.
730pub(crate) fn sanitize_open_request_options(options: &mut HashMap<String, String>) {
731    let append_mode_enabled = options
732        .get("append_mode")
733        .is_some_and(|v| matches!(v.trim().to_ascii_lowercase().as_str(), "true" | "1"));
734
735    if append_mode_enabled && options.remove("merge_mode").is_some() {
736        common_telemetry::warn!(
737            "Ignoring merge_mode in open request options because append_mode is enabled"
738        );
739    }
740}
741
742/// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store.
743pub fn get_object_store(
744    name: &Option<String>,
745    object_store_manager: &ObjectStoreManagerRef,
746) -> Result<object_store::ObjectStore> {
747    if let Some(name) = name {
748        Ok(object_store_manager
749            .find(name)
750            .with_context(|| ObjectStoreNotFoundSnafu {
751                object_store: name.clone(),
752            })?
753            .clone())
754    } else {
755        Ok(object_store_manager.default_object_store().clone())
756    }
757}
758
759/// Checks whether the recovered region has the same schema as region to create.
760pub(crate) fn check_recovered_region(
761    recovered: &RegionMetadata,
762    region_id: RegionId,
763    column_metadatas: &[ColumnMetadata],
764    primary_key: &[ColumnId],
765) -> Result<()> {
766    if recovered.region_id != region_id {
767        error!(
768            "Recovered region {}, expect region {}",
769            recovered.region_id, region_id
770        );
771        return RegionCorruptedSnafu {
772            region_id,
773            reason: format!(
774                "recovered metadata has different region id {}",
775                recovered.region_id
776            ),
777        }
778        .fail();
779    }
780    if recovered.column_metadatas != column_metadatas {
781        error!(
782            "Unexpected schema in recovered region {}, recovered: {:?}, expect: {:?}",
783            recovered.region_id, recovered.column_metadatas, column_metadatas
784        );
785
786        return RegionCorruptedSnafu {
787            region_id,
788            reason: "recovered metadata has different schema",
789        }
790        .fail();
791    }
792    if recovered.primary_key != primary_key {
793        error!(
794            "Unexpected primary key in recovered region {}, recovered: {:?}, expect: {:?}",
795            recovered.region_id, recovered.primary_key, primary_key
796        );
797
798        return RegionCorruptedSnafu {
799            region_id,
800            reason: "recovered metadata has different primary key",
801        }
802        .fail();
803    }
804
805    Ok(())
806}
807
808/// Replays the mutations from WAL and inserts mutations to memtable of given region.
809pub(crate) async fn replay_memtable<F>(
810    provider: &Provider,
811    mut wal_entry_reader: Box<dyn WalEntryReader>,
812    region_id: RegionId,
813    flushed_entry_id: EntryId,
814    version_control: &VersionControlRef,
815    allow_stale_entries: bool,
816    on_region_opened: F,
817) -> Result<EntryId>
818where
819    F: FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> + Send,
820{
821    let now = Instant::now();
822    let mut rows_replayed = 0;
823    // Last entry id should start from flushed entry id since there might be no
824    // data in the WAL.
825    let mut last_entry_id = flushed_entry_id;
826    let replay_from_entry_id = flushed_entry_id + 1;
827
828    let mut wal_stream = wal_entry_reader.read(provider, replay_from_entry_id)?;
829    while let Some(res) = wal_stream.next().await {
830        let (entry_id, entry) = res?;
831        if entry_id <= flushed_entry_id {
832            warn!(
833                "Stale WAL entries read during replay, region id: {}, flushed entry id: {}, entry id read: {}",
834                region_id, flushed_entry_id, entry_id
835            );
836            ensure!(
837                allow_stale_entries,
838                StaleLogEntrySnafu {
839                    region_id,
840                    flushed_entry_id,
841                    unexpected_entry_id: entry_id,
842                }
843            );
844        }
845        last_entry_id = last_entry_id.max(entry_id);
846
847        let mut region_write_ctx = RegionWriteCtx::new(
848            region_id,
849            version_control,
850            provider.clone(),
851            // For WAL replay, we don't need to track the write bytes rate.
852            None,
853        );
854        for mutation in entry.mutations {
855            rows_replayed += mutation
856                .rows
857                .as_ref()
858                .map(|rows| rows.rows.len())
859                .unwrap_or(0);
860            region_write_ctx.push_mutation(
861                mutation.op_type,
862                mutation.rows,
863                mutation.write_hint,
864                OptionOutputTx::none(),
865                // We should respect the sequence in WAL during replay.
866                Some(mutation.sequence),
867            );
868        }
869
870        for bulk_entry in entry.bulk_entries {
871            let part = BulkPart::try_from(bulk_entry)?;
872            rows_replayed += part.num_rows();
873            // During replay, we should adopt the sequence from WAL.
874            let bulk_sequence_from_wal = part.sequence;
875            ensure!(
876                region_write_ctx.push_bulk(
877                    OptionOutputTx::none(),
878                    part,
879                    Some(bulk_sequence_from_wal)
880                ),
881                RegionCorruptedSnafu {
882                    region_id,
883                    reason: "unable to replay memtable with bulk entries",
884                }
885            );
886        }
887
888        // set next_entry_id and write to memtable.
889        region_write_ctx.set_next_entry_id(last_entry_id + 1);
890        region_write_ctx.write_memtable().await;
891        region_write_ctx.write_bulk().await;
892    }
893
894    // TODO(weny): We need to update `flushed_entry_id` in the region manifest
895    // to avoid reading potentially incomplete entries in the future.
896    (on_region_opened)(region_id, flushed_entry_id, provider).await?;
897
898    let series_count = version_control.current().series_count();
899    info!(
900        "Replay WAL for region: {}, provider: {:?}, rows recovered: {}, replay from entry id: {}, last entry id: {}, total timeseries replayed: {}, elapsed: {:?}",
901        region_id,
902        provider,
903        rows_replayed,
904        replay_from_entry_id,
905        last_entry_id,
906        series_count,
907        now.elapsed()
908    );
909    Ok(last_entry_id)
910}
911
912/// A task to load and fill the region file cache.
913pub(crate) struct RegionLoadCacheTask {
914    region: MitoRegionRef,
915}
916
917impl RegionLoadCacheTask {
918    pub(crate) fn new(region: MitoRegionRef) -> Self {
919        Self { region }
920    }
921
922    /// Fills the file cache with index files from the region.
923    pub(crate) async fn fill_cache(&self, file_cache: &FileCache) {
924        let region_id = self.region.region_id;
925        let table_dir = self.region.access_layer.table_dir();
926        let path_type = self.region.access_layer.path_type();
927        let object_store = self.region.access_layer.object_store();
928        let version_control = &self.region.version_control;
929
930        // Collects IndexKeys, file sizes, and max timestamps for files that need to be downloaded
931        let mut files_to_download = Vec::new();
932        let mut files_already_cached = 0;
933
934        {
935            let version = version_control.current().version;
936            for level in version.ssts.levels() {
937                for file_handle in level.files.values() {
938                    let file_meta = file_handle.meta_ref();
939                    if file_meta.exists_index() {
940                        let puffin_key = IndexKey::new(
941                            file_meta.region_id,
942                            file_meta.file_id,
943                            FileType::Puffin(file_meta.index_version),
944                        );
945
946                        if !file_cache.contains_key(&puffin_key) {
947                            files_to_download.push((
948                                puffin_key,
949                                file_meta.index_file_size,
950                                file_meta.time_range.1, // max timestamp
951                            ));
952                        } else {
953                            files_already_cached += 1;
954                        }
955                    }
956                }
957            }
958            // Releases the Version after the scope to avoid holding the memtables and file handles
959            // for a long time.
960        }
961
962        // Sorts files by max timestamp in descending order to loads latest files first
963        files_to_download.sort_by_key(|b| std::cmp::Reverse(b.2));
964
965        let total_files = files_to_download.len() as i64;
966
967        info!(
968            "Starting background index cache preload for region {}, total_files_to_download: {}, files_already_cached: {}",
969            region_id, total_files, files_already_cached
970        );
971
972        CACHE_FILL_PENDING_FILES.add(total_files);
973
974        let mut files_downloaded = 0;
975        let mut files_skipped = 0;
976
977        for (puffin_key, file_size, max_timestamp) in files_to_download {
978            let current_size = file_cache.puffin_cache_size();
979            let capacity = file_cache.puffin_cache_capacity();
980            let region_state = self.region.state();
981            if !can_load_cache(region_state) {
982                info!(
983                    "Stopping index cache by state: {:?}, region: {}, current_size: {}, capacity: {}",
984                    region_state, region_id, current_size, capacity
985                );
986                break;
987            }
988
989            // Checks if adding this file would exceed capacity
990            if current_size + file_size > capacity {
991                info!(
992                    "Stopping index cache preload due to capacity limit, region: {}, file_id: {}, current_size: {}, file_size: {}, capacity: {}, file_timestamp: {:?}",
993                    region_id, puffin_key.file_id, current_size, file_size, capacity, max_timestamp
994                );
995                files_skipped = (total_files - files_downloaded) as usize;
996                CACHE_FILL_PENDING_FILES.sub(total_files - files_downloaded);
997                break;
998            }
999
1000            let index_version = if let FileType::Puffin(version) = puffin_key.file_type {
1001                version
1002            } else {
1003                unreachable!("`files_to_download` should only contains Puffin files");
1004            };
1005            let index_id = RegionIndexId::new(
1006                RegionFileId::new(puffin_key.region_id, puffin_key.file_id),
1007                index_version,
1008            );
1009
1010            let index_remote_path = location::index_file_path(table_dir, index_id, path_type);
1011
1012            match file_cache
1013                .download(puffin_key, &index_remote_path, object_store, file_size)
1014                .await
1015            {
1016                Ok(_) => {
1017                    debug!(
1018                        "Downloaded index file to write cache, region: {}, file_id: {}",
1019                        region_id, puffin_key.file_id
1020                    );
1021                    files_downloaded += 1;
1022                    CACHE_FILL_DOWNLOADED_FILES.inc_by(1);
1023                    CACHE_FILL_PENDING_FILES.dec();
1024                }
1025                Err(e) => {
1026                    warn!(
1027                        e; "Failed to download index file to write cache, region: {}, file_id: {}",
1028                        region_id, puffin_key.file_id
1029                    );
1030                    CACHE_FILL_PENDING_FILES.dec();
1031                }
1032            }
1033        }
1034
1035        info!(
1036            "Completed background cache fill task for region {}, total_files: {}, files_downloaded: {}, files_already_cached: {}, files_skipped: {}",
1037            region_id, total_files, files_downloaded, files_already_cached, files_skipped
1038        );
1039    }
1040}
1041
1042/// Loads all index (Puffin) files from the version into the write cache.
1043fn maybe_load_cache(
1044    region: &MitoRegionRef,
1045    config: &MitoConfig,
1046    cache_manager: &Option<CacheManagerRef>,
1047) {
1048    let Some(cache_manager) = cache_manager else {
1049        return;
1050    };
1051    let Some(write_cache) = cache_manager.write_cache() else {
1052        return;
1053    };
1054
1055    let preload_enabled = config.preload_index_cache;
1056    if !preload_enabled {
1057        return;
1058    }
1059
1060    let task = RegionLoadCacheTask::new(region.clone());
1061    write_cache.load_region_cache(task);
1062}
1063
1064/// Preloads Parquet metadata into the in-memory SST meta cache on region open.
1065///
1066/// This improves the latency of the first query after server start by avoiding large Parquet
1067/// metadata reads on demand.
1068///
1069/// The preload is best-effort:
1070/// - Always tries to warm from the local write cache (file cache) first.
1071/// - If the region storage backend is local filesystem (`Scheme::Fs`), it may also load metadata
1072///   directly from the local store.
1073/// - It will not fetch metadata from remote object stores (S3/GCS/OSS/...).
1074#[allow(clippy::too_many_arguments)]
1075async fn preload_parquet_meta_cache_for_files(
1076    region_id: RegionId,
1077    cache_manager: CacheManagerRef,
1078    sst_meta_cache_capacity: u64,
1079    table_dir: String,
1080    path_type: PathType,
1081    object_store: object_store::ObjectStore,
1082    region_metadata: RegionMetadataRef,
1083    mut files: Vec<FileHandle>,
1084) -> usize {
1085    if !cache_manager.sst_meta_cache_enabled()
1086        || sst_meta_cache_capacity == 0
1087        || cache_manager.sst_meta_cache_weighted_size() >= sst_meta_cache_capacity
1088    {
1089        return 0;
1090    }
1091
1092    let allow_direct_load = object_store.info().scheme() == object_store::services::FS_SCHEME;
1093
1094    // Sort by time range so we can prefer preloading newer files first.
1095    files.sort_by_key(|b| std::cmp::Reverse(b.meta_ref().time_range.1));
1096
1097    let mut loaded = 0usize;
1098    for file_handle in files {
1099        // Stop when the shared SST meta cache is full.
1100        if cache_manager.sst_meta_cache_weighted_size() >= sst_meta_cache_capacity {
1101            break;
1102        }
1103
1104        let file_id = file_handle.file_id();
1105        let mut cache_metrics = MetadataCacheMetrics::default();
1106        if let Some(metadata) = cache_manager
1107            .get_parquet_meta_data(file_id, &mut cache_metrics, PageIndexPolicy::Optional)
1108            .await
1109        {
1110            if file_handle.primary_key_range().is_none()
1111                && let Some(primary_key_range) =
1112                    extract_primary_key_range(&metadata, &region_metadata)
1113            {
1114                file_handle.set_primary_key_range(primary_key_range);
1115            }
1116            // Metadata is either already in memory or loaded from file cache.
1117            if cache_metrics.mem_cache_hit == 0 {
1118                loaded += 1;
1119            }
1120            continue;
1121        }
1122
1123        if !allow_direct_load {
1124            continue;
1125        }
1126
1127        let file_size = file_handle.meta_ref().file_size;
1128        let file_path = file_handle.file_path(&table_dir, path_type);
1129        let mut loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
1130        loader.with_page_index_policy(PageIndexPolicy::Optional);
1131        match loader.load(&mut cache_metrics).await {
1132            Ok(metadata) => {
1133                if let Some(primary_key_range) =
1134                    extract_primary_key_range(&metadata, &region_metadata)
1135                {
1136                    file_handle.set_primary_key_range(primary_key_range);
1137                }
1138                cache_manager.put_parquet_meta_data(file_id, Arc::new(metadata), None);
1139                loaded += 1;
1140            }
1141            Err(err) => {
1142                // Preloading is best-effort. Failure shouldn't affect region open.
1143                warn!(
1144                    err; "Failed to preload parquet metadata from local store, region: {}, file: {}",
1145                    region_id, file_path
1146                );
1147            }
1148        }
1149    }
1150
1151    loaded
1152}
1153
1154fn maybe_preload_parquet_meta_cache(
1155    region: &MitoRegionRef,
1156    config: &MitoConfig,
1157    cache_manager: &Option<CacheManagerRef>,
1158) {
1159    let Some(cache_manager) = cache_manager else {
1160        return;
1161    };
1162    if !cache_manager.sst_meta_cache_enabled() {
1163        return;
1164    }
1165
1166    // Skip if SST meta cache is disabled.
1167    if config.sst_meta_cache_size.as_bytes() == 0 {
1168        return;
1169    }
1170    if !config.preload_index_cache {
1171        return;
1172    }
1173
1174    let region = region.clone();
1175    let cache_manager = cache_manager.clone();
1176    let sst_meta_cache_capacity = config.sst_meta_cache_size.as_bytes();
1177
1178    tokio::spawn(async move {
1179        // Safety: semaphore must exist.
1180        let _permit = PARQUET_META_PRELOAD_SEMAPHORE.acquire().await.unwrap();
1181
1182        let region_id = region.region_id;
1183        let table_dir = region.access_layer.table_dir().to_string();
1184        let path_type = region.access_layer.path_type();
1185        let object_store = region.access_layer.object_store().clone();
1186        let region_metadata = region.version_control.current().version.metadata.clone();
1187
1188        // Collect SST files. Do not hold the version longer than needed.
1189        let mut files = Vec::new();
1190        {
1191            let version = region.version_control.current().version;
1192            for level in version.ssts.levels() {
1193                for file_handle in level.files.values() {
1194                    files.push(file_handle.clone());
1195                }
1196            }
1197        }
1198        let preloading_start = Instant::now();
1199        let loaded = preload_parquet_meta_cache_for_files(
1200            region_id,
1201            cache_manager,
1202            sst_meta_cache_capacity,
1203            table_dir,
1204            path_type,
1205            object_store,
1206            region_metadata,
1207            files,
1208        )
1209        .await;
1210        let preloading_cost = preloading_start.elapsed();
1211
1212        if loaded > 0 {
1213            info!(
1214                "Preloaded parquet metadata for region {}, loaded_files: {}, elapsed_ms: {}",
1215                region_id,
1216                loaded,
1217                preloading_cost.as_millis()
1218            );
1219        }
1220    });
1221}
1222
1223fn can_load_cache(state: RegionRoleState) -> bool {
1224    match state {
1225        RegionRoleState::Leader(RegionLeaderState::Writable)
1226        | RegionRoleState::Leader(RegionLeaderState::Staging)
1227        | RegionRoleState::Leader(RegionLeaderState::Altering)
1228        | RegionRoleState::Leader(RegionLeaderState::EnteringStaging)
1229        | RegionRoleState::Leader(RegionLeaderState::Editing)
1230        | RegionRoleState::Follower => true,
1231        // The region will be closed soon if it is downgrading.
1232        RegionRoleState::Leader(RegionLeaderState::Downgrading)
1233        | RegionRoleState::Leader(RegionLeaderState::Dropping)
1234        | RegionRoleState::Leader(RegionLeaderState::Truncating) => false,
1235    }
1236}
1237
1238#[cfg(test)]
1239mod tests {
1240    use std::collections::HashMap;
1241    use std::sync::Arc;
1242
1243    use common_base::readable_size::ReadableSize;
1244    use common_test_util::temp_dir::create_temp_dir;
1245    use common_time::Timestamp;
1246    use common_wal::options::{KafkaWalOptions, WalOptions};
1247    use datatypes::arrow::array::{ArrayRef, BinaryArray, Int64Array};
1248    use datatypes::arrow::record_batch::RecordBatch;
1249    use object_store::ObjectStore;
1250    use object_store::services::{Fs, Memory, S3};
1251    use parquet::arrow::ArrowWriter;
1252    use parquet::file::metadata::KeyValue;
1253    use parquet::file::properties::WriterProperties;
1254    use store_api::region_request::PathType;
1255    use store_api::storage::{FileId, RegionId};
1256
1257    use super::{
1258        initial_pruned_entry_id, preload_parquet_meta_cache_for_files, sanitize_region_options,
1259        supports_open_region_object_storage_requirement,
1260    };
1261    use crate::cache::CacheManager;
1262    use crate::cache::file_cache::{FileType, IndexKey};
1263    use crate::manifest::action::{RegionManifest, RemovedFilesRecord};
1264    use crate::region::options::RegionOptions;
1265    use crate::sst::FormatType;
1266    use crate::sst::file::{FileHandle, FileMeta};
1267    use crate::sst::file_purger::NoopFilePurger;
1268    use crate::sst::parquet::PARQUET_METADATA_KEY;
1269    use crate::test_util::TestEnv;
1270    use crate::test_util::sst_util::sst_region_metadata;
1271
1272    fn build_test_manifest(sst_format: FormatType) -> RegionManifest {
1273        RegionManifest {
1274            metadata: Arc::new(sst_region_metadata()),
1275            files: HashMap::new(),
1276            removed_files: RemovedFilesRecord::default(),
1277            flushed_entry_id: 0,
1278            flushed_sequence: 0,
1279            committed_sequence: None,
1280            manifest_version: 0,
1281            truncated_entry_id: None,
1282            compaction_time_window: None,
1283            sst_format,
1284            append_mode: None,
1285        }
1286    }
1287
1288    fn build_fs_object_store() -> ObjectStore {
1289        ObjectStore::new(Fs::default().root("/tmp"))
1290            .unwrap()
1291            .finish()
1292    }
1293
1294    #[test]
1295    fn test_initial_pruned_entry_id() {
1296        assert_eq!(0, initial_pruned_entry_id(&WalOptions::RaftEngine));
1297        assert_eq!(0, initial_pruned_entry_id(&WalOptions::Noop));
1298        assert_eq!(
1299            0,
1300            initial_pruned_entry_id(&WalOptions::Kafka(KafkaWalOptions::new(
1301                "test_topic".to_string()
1302            )))
1303        );
1304        assert_eq!(
1305            42,
1306            initial_pruned_entry_id(&WalOptions::Kafka(KafkaWalOptions {
1307                topic: "test_topic".to_string(),
1308                initial_pruned_entry_id: Some(42),
1309            }))
1310        );
1311    }
1312
1313    #[test]
1314    #[cfg(not(feature = "test-shared-fs-region-migration"))]
1315    fn test_open_requirement_rejects_fs_object_store() {
1316        let object_store = build_fs_object_store();
1317
1318        assert!(!supports_open_region_object_storage_requirement(
1319            &object_store
1320        ));
1321    }
1322
1323    #[test]
1324    #[cfg(feature = "test-shared-fs-region-migration")]
1325    fn test_open_requirement_accepts_shared_fs_object_store_for_tests() {
1326        let object_store = build_fs_object_store();
1327
1328        assert!(supports_open_region_object_storage_requirement(
1329            &object_store
1330        ));
1331    }
1332
1333    #[test]
1334    fn test_open_requirement_accepts_s3_object_store() {
1335        let object_store = ObjectStore::new(
1336            S3::default()
1337                .bucket("test-bucket")
1338                .region("us-east-1")
1339                .disable_ec2_metadata(),
1340        )
1341        .unwrap()
1342        .finish();
1343
1344        assert!(supports_open_region_object_storage_requirement(
1345            &object_store
1346        ));
1347    }
1348
1349    #[test]
1350    fn test_sanitize_region_options_options_format_wins() {
1351        // Manifest persisted PrimaryKey, but the re-parsed options now request Flat
1352        // (e.g., bulk memtable). The options' value must win.
1353        let manifest = build_test_manifest(FormatType::PrimaryKey);
1354        let mut options = RegionOptions {
1355            sst_format: Some(FormatType::Flat),
1356            ..Default::default()
1357        };
1358        sanitize_region_options(&manifest, &mut options);
1359        assert_eq!(options.sst_format, Some(FormatType::Flat));
1360    }
1361
1362    #[test]
1363    fn test_sanitize_region_options_fills_from_manifest_when_unset() {
1364        // When the caller didn't specify sst_format, fall back to the manifest value
1365        // so downstream code always sees a concrete format.
1366        let manifest = build_test_manifest(FormatType::Flat);
1367        let mut options = RegionOptions {
1368            sst_format: None,
1369            ..Default::default()
1370        };
1371        sanitize_region_options(&manifest, &mut options);
1372        assert_eq!(options.sst_format, Some(FormatType::Flat));
1373    }
1374
1375    fn sst_parquet_bytes(batch: &RecordBatch) -> Vec<u8> {
1376        let key_value_meta = KeyValue::new(
1377            PARQUET_METADATA_KEY.to_string(),
1378            sst_region_metadata().to_json().unwrap(),
1379        );
1380        let props = WriterProperties::builder()
1381            .set_key_value_metadata(Some(vec![key_value_meta]))
1382            .build();
1383
1384        let mut parquet_bytes = Vec::new();
1385        let mut writer =
1386            ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), Some(props)).unwrap();
1387        writer.write(batch).unwrap();
1388        writer.close().unwrap();
1389
1390        parquet_bytes
1391    }
1392
1393    #[tokio::test]
1394    async fn test_preload_parquet_meta_cache_uses_file_cache() {
1395        let env = TestEnv::new().await;
1396
1397        let local_store = ObjectStore::new(Memory::default()).unwrap().finish();
1398        let write_cache = env
1399            .create_write_cache(local_store, ReadableSize::mb(1024))
1400            .await;
1401        let cache_manager = Arc::new(
1402            CacheManager::builder()
1403                .sst_meta_cache_size(1024 * 1024)
1404                .write_cache(Some(write_cache.clone()))
1405                .build(),
1406        );
1407
1408        let region_id = RegionId::new(1, 1);
1409        let file_id = FileId::random();
1410
1411        let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
1412        let primary_key = Arc::new(BinaryArray::from_iter_values([b"a", b"b", b"c"])) as ArrayRef;
1413        let batch = RecordBatch::try_from_iter([
1414            ("col", col),
1415            (
1416                store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME,
1417                primary_key,
1418            ),
1419        ])
1420        .unwrap();
1421        let parquet_bytes = sst_parquet_bytes(&batch);
1422        let file_size = parquet_bytes.len() as u64;
1423
1424        let file_meta = FileMeta {
1425            region_id,
1426            file_id,
1427            time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)),
1428            level: 0,
1429            file_size,
1430            max_row_group_uncompressed_size: 0,
1431            available_indexes: Default::default(),
1432            indexes: vec![],
1433            index_file_size: 0,
1434            index_version: 0,
1435            num_rows: 3,
1436            num_row_groups: 1,
1437            sequence: None,
1438            partition_expr: None,
1439            num_series: 0,
1440            ..Default::default()
1441        };
1442        let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
1443
1444        let table_dir = "test_table";
1445        let path_type = PathType::Bare;
1446        let remote_path = file_handle.file_path(table_dir, path_type);
1447
1448        let source_store = ObjectStore::new(Memory::default()).unwrap().finish();
1449        source_store
1450            .write(&remote_path, parquet_bytes)
1451            .await
1452            .unwrap();
1453
1454        // Put the parquet file into the write cache, so file cache contains metadata.
1455        let index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
1456        write_cache
1457            .file_cache()
1458            .download(index_key, &remote_path, &source_store, file_size)
1459            .await
1460            .unwrap();
1461
1462        let region_file_id = file_handle.file_id();
1463        assert!(
1464            cache_manager
1465                .get_parquet_meta_data_from_mem_cache(region_file_id)
1466                .is_none()
1467        );
1468
1469        let loaded = preload_parquet_meta_cache_for_files(
1470            region_id,
1471            cache_manager.clone(),
1472            1024 * 1024,
1473            table_dir.to_string(),
1474            path_type,
1475            source_store.clone(),
1476            Arc::new(sst_region_metadata()),
1477            vec![file_handle.clone()],
1478        )
1479        .await;
1480
1481        // Should warm the in-memory cache from the local file cache.
1482        assert_eq!(loaded, 1);
1483        assert!(
1484            cache_manager
1485                .get_parquet_meta_data_from_mem_cache(region_file_id)
1486                .is_some()
1487        );
1488        // The cached entry must carry the page index so that later `Optional` queries hit
1489        // the in-memory cache instead of reloading metadata on demand.
1490        assert!(
1491            cache_manager
1492                .get_sst_meta_data_from_mem_cache(
1493                    region_file_id,
1494                    parquet::file::metadata::PageIndexPolicy::Optional,
1495                )
1496                .is_some()
1497        );
1498        assert!(file_handle.primary_key_range().is_some());
1499    }
1500
1501    #[tokio::test]
1502    async fn test_preload_parquet_meta_cache_skips_files_not_in_file_cache() {
1503        let cache_manager = Arc::new(
1504            CacheManager::builder()
1505                .sst_meta_cache_size(1024 * 1024)
1506                .build(),
1507        );
1508
1509        let region_id = RegionId::new(1, 1);
1510        let file_id = FileId::random();
1511
1512        // Without a local file cache entry, preloading should skip the file.
1513        let file_meta = FileMeta {
1514            region_id,
1515            file_id,
1516            time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)),
1517            level: 0,
1518            file_size: 0,
1519            max_row_group_uncompressed_size: 0,
1520            available_indexes: Default::default(),
1521            indexes: vec![],
1522            index_file_size: 0,
1523            index_version: 0,
1524            num_rows: 3,
1525            num_row_groups: 1,
1526            sequence: None,
1527            partition_expr: None,
1528            num_series: 0,
1529            ..Default::default()
1530        };
1531        let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
1532
1533        let table_dir = "test_table";
1534        let path_type = PathType::Bare;
1535        let remote_path = file_handle.file_path(table_dir, path_type);
1536
1537        // Even if the remote object store has the file, we should not preload from it.
1538        let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
1539        object_store
1540            .write(&remote_path, b"noop".as_slice())
1541            .await
1542            .unwrap();
1543
1544        let region_file_id = file_handle.file_id();
1545        assert!(
1546            cache_manager
1547                .get_parquet_meta_data_from_mem_cache(region_file_id)
1548                .is_none()
1549        );
1550
1551        let loaded = preload_parquet_meta_cache_for_files(
1552            region_id,
1553            cache_manager.clone(),
1554            1024 * 1024,
1555            table_dir.to_string(),
1556            path_type,
1557            object_store,
1558            Arc::new(sst_region_metadata()),
1559            vec![file_handle],
1560        )
1561        .await;
1562
1563        assert_eq!(loaded, 0);
1564        assert!(
1565            cache_manager
1566                .get_parquet_meta_data_from_mem_cache(region_file_id)
1567                .is_none()
1568        );
1569    }
1570
1571    #[tokio::test]
1572    async fn test_preload_parquet_meta_cache_loads_from_local_fs() {
1573        let cache_manager = Arc::new(
1574            CacheManager::builder()
1575                .sst_meta_cache_size(1024 * 1024)
1576                .build(),
1577        );
1578
1579        let region_id = RegionId::new(1, 1);
1580        let file_id = FileId::random();
1581
1582        let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
1583        let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
1584        let parquet_bytes = sst_parquet_bytes(&batch);
1585
1586        // file_size is 0 when it's missing/defaulted in manifests; MetadataLoader::load will stat
1587        // the local filesystem to retrieve it.
1588        let file_meta = FileMeta {
1589            region_id,
1590            file_id,
1591            time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)),
1592            level: 0,
1593            file_size: 0,
1594            max_row_group_uncompressed_size: 0,
1595            available_indexes: Default::default(),
1596            indexes: vec![],
1597            index_file_size: 0,
1598            index_version: 0,
1599            num_rows: 3,
1600            num_row_groups: 1,
1601            sequence: None,
1602            partition_expr: None,
1603            num_series: 0,
1604            ..Default::default()
1605        };
1606        let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
1607
1608        let table_dir = "test_table";
1609        let path_type = PathType::Bare;
1610        let file_path = file_handle.file_path(table_dir, path_type);
1611
1612        let root = create_temp_dir("parquet-meta-preload");
1613        let object_store = ObjectStore::new(Fs::default().root(root.path().to_str().unwrap()))
1614            .unwrap()
1615            .finish();
1616        object_store.write(&file_path, parquet_bytes).await.unwrap();
1617
1618        let region_file_id = file_handle.file_id();
1619        assert!(
1620            cache_manager
1621                .get_parquet_meta_data_from_mem_cache(region_file_id)
1622                .is_none()
1623        );
1624
1625        let loaded = preload_parquet_meta_cache_for_files(
1626            region_id,
1627            cache_manager.clone(),
1628            1024 * 1024,
1629            table_dir.to_string(),
1630            path_type,
1631            object_store,
1632            Arc::new(sst_region_metadata()),
1633            vec![file_handle],
1634        )
1635        .await;
1636
1637        assert_eq!(loaded, 1);
1638        assert!(
1639            cache_manager
1640                .get_parquet_meta_data_from_mem_cache(region_file_id)
1641                .is_some()
1642        );
1643    }
1644}