mito2/region/
opener.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Region opener.
16
17use std::any::TypeId;
18use std::collections::HashMap;
19use std::sync::Arc;
20use std::sync::atomic::{AtomicI64, AtomicU64};
21use std::time::Instant;
22
23use common_telemetry::{debug, error, info, warn};
24use common_wal::options::WalOptions;
25use futures::StreamExt;
26use futures::future::BoxFuture;
27use log_store::kafka::log_store::KafkaLogStore;
28use log_store::noop::log_store::NoopLogStore;
29use log_store::raft_engine::log_store::RaftEngineLogStore;
30use object_store::manager::ObjectStoreManagerRef;
31use object_store::util::{join_dir, normalize_dir};
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::logstore::LogStore;
34use store_api::logstore::provider::Provider;
35use store_api::metadata::{
36    ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
37};
38use store_api::region_engine::RegionRole;
39use store_api::region_request::PathType;
40use store_api::storage::{ColumnId, RegionId};
41
42use crate::access_layer::AccessLayer;
43use crate::cache::CacheManagerRef;
44use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
45use crate::config::MitoConfig;
46use crate::error;
47use crate::error::{
48    EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu,
49    Result, StaleLogEntrySnafu,
50};
51use crate::manifest::action::RegionManifest;
52use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions};
53use crate::manifest::storage::manifest_compress_type;
54use crate::memtable::MemtableBuilderProvider;
55use crate::memtable::bulk::part::BulkPart;
56use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef};
57use crate::metrics::{CACHE_FILL_DOWNLOADED_FILES, CACHE_FILL_PENDING_FILES};
58use crate::region::options::RegionOptions;
59use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
60use crate::region::{
61    ManifestContext, ManifestStats, MitoRegion, MitoRegionRef, RegionLeaderState, RegionRoleState,
62};
63use crate::region_write_ctx::RegionWriteCtx;
64use crate::request::OptionOutputTx;
65use crate::schedule::scheduler::SchedulerRef;
66use crate::sst::FormatType;
67use crate::sst::file::RegionFileId;
68use crate::sst::file_purger::{FilePurgerRef, create_file_purger};
69use crate::sst::file_ref::FileReferenceManagerRef;
70use crate::sst::index::intermediate::IntermediateManager;
71use crate::sst::index::puffin_manager::PuffinManagerFactory;
72use crate::sst::location::{self, region_dir_from_table_dir};
73use crate::time_provider::TimeProviderRef;
74use crate::wal::entry_reader::WalEntryReader;
75use crate::wal::{EntryId, Wal};
76
77/// A fetcher to retrieve partition expr for a region.
78///
79/// Compatibility: older regions didn't persist `partition_expr` in engine metadata,
80/// while newer ones do. On open, we backfill it via this fetcher and persist it
81/// to the manifest so future opens don't need refetching.
82#[async_trait::async_trait]
83pub trait PartitionExprFetcher {
84    async fn fetch_expr(&self, region_id: RegionId) -> Option<String>;
85}
86
87pub type PartitionExprFetcherRef = Arc<dyn PartitionExprFetcher + Send + Sync>;
88
89/// Builder to create a new [MitoRegion] or open an existing one.
90pub(crate) struct RegionOpener {
91    region_id: RegionId,
92    metadata_builder: Option<RegionMetadataBuilder>,
93    memtable_builder_provider: MemtableBuilderProvider,
94    object_store_manager: ObjectStoreManagerRef,
95    table_dir: String,
96    path_type: PathType,
97    purge_scheduler: SchedulerRef,
98    options: Option<RegionOptions>,
99    cache_manager: Option<CacheManagerRef>,
100    skip_wal_replay: bool,
101    puffin_manager_factory: PuffinManagerFactory,
102    intermediate_manager: IntermediateManager,
103    time_provider: TimeProviderRef,
104    stats: ManifestStats,
105    wal_entry_reader: Option<Box<dyn WalEntryReader>>,
106    replay_checkpoint: Option<u64>,
107    file_ref_manager: FileReferenceManagerRef,
108    partition_expr_fetcher: PartitionExprFetcherRef,
109}
110
111impl RegionOpener {
112    /// Returns a new opener.
113    // TODO(LFC): Reduce the number of arguments.
114    #[allow(clippy::too_many_arguments)]
115    pub(crate) fn new(
116        region_id: RegionId,
117        table_dir: &str,
118        path_type: PathType,
119        memtable_builder_provider: MemtableBuilderProvider,
120        object_store_manager: ObjectStoreManagerRef,
121        purge_scheduler: SchedulerRef,
122        puffin_manager_factory: PuffinManagerFactory,
123        intermediate_manager: IntermediateManager,
124        time_provider: TimeProviderRef,
125        file_ref_manager: FileReferenceManagerRef,
126        partition_expr_fetcher: PartitionExprFetcherRef,
127    ) -> RegionOpener {
128        RegionOpener {
129            region_id,
130            metadata_builder: None,
131            memtable_builder_provider,
132            object_store_manager,
133            table_dir: normalize_dir(table_dir),
134            path_type,
135            purge_scheduler,
136            options: None,
137            cache_manager: None,
138            skip_wal_replay: false,
139            puffin_manager_factory,
140            intermediate_manager,
141            time_provider,
142            stats: Default::default(),
143            wal_entry_reader: None,
144            replay_checkpoint: None,
145            file_ref_manager,
146            partition_expr_fetcher,
147        }
148    }
149
150    /// Sets metadata builder of the region to create.
151    pub(crate) fn metadata_builder(mut self, builder: RegionMetadataBuilder) -> Self {
152        self.metadata_builder = Some(builder);
153        self
154    }
155
156    /// Computes the region directory from table_dir and region_id.
157    fn region_dir(&self) -> String {
158        region_dir_from_table_dir(&self.table_dir, self.region_id, self.path_type)
159    }
160
161    /// Builds the region metadata.
162    ///
163    /// # Panics
164    /// - Panics if `options` is not set.
165    /// - Panics if `metadata_builder` is not set.
166    fn build_metadata(&mut self) -> Result<RegionMetadata> {
167        let options = self.options.as_ref().unwrap();
168        let mut metadata_builder = self.metadata_builder.take().unwrap();
169        metadata_builder.primary_key_encoding(options.primary_key_encoding());
170        metadata_builder.build().context(InvalidMetadataSnafu)
171    }
172
173    /// Parses and sets options for the region.
174    pub(crate) fn parse_options(self, options: HashMap<String, String>) -> Result<Self> {
175        self.options(RegionOptions::try_from(&options)?)
176    }
177
178    /// Sets the replay checkpoint for the region.
179    pub(crate) fn replay_checkpoint(mut self, replay_checkpoint: Option<u64>) -> Self {
180        self.replay_checkpoint = replay_checkpoint;
181        self
182    }
183
184    /// If a [WalEntryReader] is set, the [RegionOpener] will use [WalEntryReader] instead of
185    /// constructing a new one from scratch.
186    pub(crate) fn wal_entry_reader(
187        mut self,
188        wal_entry_reader: Option<Box<dyn WalEntryReader>>,
189    ) -> Self {
190        self.wal_entry_reader = wal_entry_reader;
191        self
192    }
193
194    /// Sets options for the region.
195    pub(crate) fn options(mut self, options: RegionOptions) -> Result<Self> {
196        options.validate()?;
197        self.options = Some(options);
198        Ok(self)
199    }
200
201    /// Sets the cache manager for the region.
202    pub(crate) fn cache(mut self, cache_manager: Option<CacheManagerRef>) -> Self {
203        self.cache_manager = cache_manager;
204        self
205    }
206
207    /// Sets the `skip_wal_replay`.
208    pub(crate) fn skip_wal_replay(mut self, skip: bool) -> Self {
209        self.skip_wal_replay = skip;
210        self
211    }
212
213    /// Writes region manifest and creates a new region if it does not exist.
214    /// Opens the region if it already exists.
215    ///
216    /// # Panics
217    /// - Panics if `metadata_builder` is not set.
218    /// - Panics if `options` is not set.
219    pub(crate) async fn create_or_open<S: LogStore>(
220        mut self,
221        config: &MitoConfig,
222        wal: &Wal<S>,
223    ) -> Result<MitoRegionRef> {
224        let region_id = self.region_id;
225        let region_dir = self.region_dir();
226        let metadata = self.build_metadata()?;
227        // Tries to open the region.
228        match self.maybe_open(config, wal).await {
229            Ok(Some(region)) => {
230                let recovered = region.metadata();
231                // Checks the schema of the region.
232                let expect = &metadata;
233                check_recovered_region(
234                    &recovered,
235                    expect.region_id,
236                    &expect.column_metadatas,
237                    &expect.primary_key,
238                )?;
239                // To keep consistency with Create behavior, set the opened Region to RegionRole::Leader.
240                region.set_role(RegionRole::Leader);
241
242                return Ok(region);
243            }
244            Ok(None) => {
245                debug!(
246                    "No data under directory {}, region_id: {}",
247                    region_dir, self.region_id
248                );
249            }
250            Err(e) => {
251                warn!(e;
252                    "Failed to open region {} before creating it, region_dir: {}",
253                    self.region_id, region_dir
254                );
255            }
256        }
257        // Safety: must be set before calling this method.
258        let mut options = self.options.take().unwrap();
259        let object_store = get_object_store(&options.storage, &self.object_store_manager)?;
260        let provider = self.provider::<S>(&options.wal_options)?;
261        let metadata = Arc::new(metadata);
262        // Sets the sst_format based on options or flat_format flag
263        let sst_format = if let Some(format) = options.sst_format {
264            format
265        } else if config.default_experimental_flat_format {
266            options.sst_format = Some(FormatType::Flat);
267            FormatType::Flat
268        } else {
269            // Default to PrimaryKeyParquet for newly created regions
270            options.sst_format = Some(FormatType::PrimaryKey);
271            FormatType::PrimaryKey
272        };
273        // Create a manifest manager for this region and writes regions to the manifest file.
274        let region_manifest_options =
275            Self::manifest_options(config, &options, &region_dir, &self.object_store_manager)?;
276        // For remote WAL, we need to set flushed_entry_id to current topic's latest entry id.
277        let flushed_entry_id = provider.initial_flushed_entry_id::<S>(wal.store());
278        let manifest_manager = RegionManifestManager::new(
279            metadata.clone(),
280            flushed_entry_id,
281            region_manifest_options,
282            sst_format,
283            &self.stats,
284        )
285        .await?;
286
287        let memtable_builder = self.memtable_builder_provider.builder_for_options(&options);
288        let part_duration = options.compaction.time_window();
289        // Initial memtable id is 0.
290        let mutable = Arc::new(TimePartitions::new(
291            metadata.clone(),
292            memtable_builder.clone(),
293            0,
294            part_duration,
295        ));
296
297        debug!(
298            "Create region {} with options: {:?}, default_flat_format: {}",
299            region_id, options, config.default_experimental_flat_format
300        );
301
302        let version = VersionBuilder::new(metadata, mutable)
303            .options(options)
304            .build();
305        let version_control = Arc::new(VersionControl::new(version));
306        let access_layer = Arc::new(AccessLayer::new(
307            self.table_dir.clone(),
308            self.path_type,
309            object_store,
310            self.puffin_manager_factory,
311            self.intermediate_manager,
312        ));
313        let now = self.time_provider.current_time_millis();
314
315        Ok(Arc::new(MitoRegion {
316            region_id,
317            version_control,
318            access_layer: access_layer.clone(),
319            // Region is writable after it is created.
320            manifest_ctx: Arc::new(ManifestContext::new(
321                manifest_manager,
322                RegionRoleState::Leader(RegionLeaderState::Writable),
323            )),
324            file_purger: create_file_purger(
325                config.gc.enable,
326                self.purge_scheduler,
327                access_layer,
328                self.cache_manager,
329                self.file_ref_manager.clone(),
330            ),
331            provider,
332            last_flush_millis: AtomicI64::new(now),
333            last_compaction_millis: AtomicI64::new(now),
334            time_provider: self.time_provider.clone(),
335            topic_latest_entry_id: AtomicU64::new(0),
336            written_bytes: Arc::new(AtomicU64::new(0)),
337            stats: self.stats,
338        }))
339    }
340
341    /// Opens an existing region in read only mode.
342    ///
343    /// Returns error if the region doesn't exist.
344    pub(crate) async fn open<S: LogStore>(
345        mut self,
346        config: &MitoConfig,
347        wal: &Wal<S>,
348    ) -> Result<MitoRegionRef> {
349        let region_id = self.region_id;
350        let region_dir = self.region_dir();
351        let region = self
352            .maybe_open(config, wal)
353            .await?
354            .with_context(|| EmptyRegionDirSnafu {
355                region_id,
356                region_dir: &region_dir,
357            })?;
358
359        ensure!(
360            region.region_id == self.region_id,
361            RegionCorruptedSnafu {
362                region_id: self.region_id,
363                reason: format!(
364                    "recovered region has different region id {}",
365                    region.region_id
366                ),
367            }
368        );
369
370        Ok(region)
371    }
372
373    fn provider<S: LogStore>(&self, wal_options: &WalOptions) -> Result<Provider> {
374        match wal_options {
375            WalOptions::RaftEngine => {
376                ensure!(
377                    TypeId::of::<RaftEngineLogStore>() == TypeId::of::<S>()
378                        || TypeId::of::<NoopLogStore>() == TypeId::of::<S>(),
379                    error::IncompatibleWalProviderChangeSnafu {
380                        global: "`kafka`",
381                        region: "`raft_engine`",
382                    }
383                );
384                Ok(Provider::raft_engine_provider(self.region_id.as_u64()))
385            }
386            WalOptions::Kafka(options) => {
387                ensure!(
388                    TypeId::of::<KafkaLogStore>() == TypeId::of::<S>()
389                        || TypeId::of::<NoopLogStore>() == TypeId::of::<S>(),
390                    error::IncompatibleWalProviderChangeSnafu {
391                        global: "`raft_engine`",
392                        region: "`kafka`",
393                    }
394                );
395                Ok(Provider::kafka_provider(options.topic.clone()))
396            }
397            WalOptions::Noop => Ok(Provider::noop_provider()),
398        }
399    }
400
401    /// Tries to open the region and returns `None` if the region directory is empty.
402    async fn maybe_open<S: LogStore>(
403        &mut self,
404        config: &MitoConfig,
405        wal: &Wal<S>,
406    ) -> Result<Option<MitoRegionRef>> {
407        let now = Instant::now();
408        let mut region_options = self.options.as_ref().unwrap().clone();
409
410        let region_manifest_options = Self::manifest_options(
411            config,
412            &region_options,
413            &self.region_dir(),
414            &self.object_store_manager,
415        )?;
416        let Some(manifest_manager) =
417            RegionManifestManager::open(region_manifest_options, &self.stats).await?
418        else {
419            return Ok(None);
420        };
421
422        // Backfill `partition_expr` if missing. Use the backfilled metadata in-memory during this open.
423        let manifest = manifest_manager.manifest();
424        let metadata = if manifest.metadata.partition_expr.is_none()
425            && let Some(expr_json) = self.partition_expr_fetcher.fetch_expr(self.region_id).await
426        {
427            let metadata = manifest.metadata.as_ref().clone();
428            let mut builder = RegionMetadataBuilder::from_existing(metadata);
429            builder.partition_expr_json(Some(expr_json));
430            Arc::new(builder.build().context(InvalidMetadataSnafu)?)
431        } else {
432            manifest.metadata.clone()
433        };
434        // Updates the region options with the manifest.
435        sanitize_region_options(&manifest, &mut region_options);
436
437        let region_id = self.region_id;
438        let provider = self.provider::<S>(&region_options.wal_options)?;
439        let wal_entry_reader = self
440            .wal_entry_reader
441            .take()
442            .unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
443        let on_region_opened = wal.on_region_opened();
444        let object_store = get_object_store(&region_options.storage, &self.object_store_manager)?;
445
446        debug!(
447            "Open region {} at {} with options: {:?}",
448            region_id, self.table_dir, self.options
449        );
450
451        let access_layer = Arc::new(AccessLayer::new(
452            self.table_dir.clone(),
453            self.path_type,
454            object_store,
455            self.puffin_manager_factory.clone(),
456            self.intermediate_manager.clone(),
457        ));
458        let file_purger = create_file_purger(
459            config.gc.enable,
460            self.purge_scheduler.clone(),
461            access_layer.clone(),
462            self.cache_manager.clone(),
463            self.file_ref_manager.clone(),
464        );
465        // We should sanitize the region options before creating a new memtable.
466        let memtable_builder = self
467            .memtable_builder_provider
468            .builder_for_options(&region_options);
469        // Use compaction time window in the manifest if region doesn't provide
470        // the time window option.
471        let part_duration = region_options
472            .compaction
473            .time_window()
474            .or(manifest.compaction_time_window);
475        // Initial memtable id is 0.
476        let mutable = Arc::new(TimePartitions::new(
477            metadata.clone(),
478            memtable_builder.clone(),
479            0,
480            part_duration,
481        ));
482
483        // Updates region options by manifest before creating version.
484        let version_builder = version_builder_from_manifest(
485            &manifest,
486            metadata,
487            file_purger.clone(),
488            mutable,
489            region_options,
490        );
491        let version = version_builder.build();
492        let flushed_entry_id = version.flushed_entry_id;
493        let version_control = Arc::new(VersionControl::new(version));
494
495        let topic_latest_entry_id = if !self.skip_wal_replay {
496            let replay_from_entry_id = self
497                .replay_checkpoint
498                .unwrap_or_default()
499                .max(flushed_entry_id);
500            info!(
501                "Start replaying memtable at replay_from_entry_id: {} for region {}, manifest version: {}, flushed entry id: {}, elapsed: {:?}",
502                replay_from_entry_id,
503                region_id,
504                manifest.manifest_version,
505                flushed_entry_id,
506                now.elapsed()
507            );
508            replay_memtable(
509                &provider,
510                wal_entry_reader,
511                region_id,
512                replay_from_entry_id,
513                &version_control,
514                config.allow_stale_entries,
515                on_region_opened,
516            )
517            .await?;
518            // For remote WAL, we need to set topic_latest_entry_id to current topic's latest entry id.
519            // Only set after the WAL replay is completed.
520
521            if provider.is_remote_wal() && version_control.current().version.memtables.is_empty() {
522                wal.store().latest_entry_id(&provider).unwrap_or(0)
523            } else {
524                0
525            }
526        } else {
527            info!(
528                "Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}, elapsed: {:?}",
529                region_id,
530                manifest.manifest_version,
531                flushed_entry_id,
532                now.elapsed()
533            );
534
535            0
536        };
537
538        if let Some(committed_in_manifest) = manifest.committed_sequence {
539            let committed_after_replay = version_control.committed_sequence();
540            if committed_in_manifest > committed_after_replay {
541                info!(
542                    "Overriding committed sequence, region: {}, flushed_sequence: {}, committed_sequence: {} -> {}",
543                    self.region_id,
544                    version_control.current().version.flushed_sequence,
545                    version_control.committed_sequence(),
546                    committed_in_manifest
547                );
548                version_control.set_committed_sequence(committed_in_manifest);
549            }
550        }
551
552        let now = self.time_provider.current_time_millis();
553
554        let region = MitoRegion {
555            region_id: self.region_id,
556            version_control: version_control.clone(),
557            access_layer: access_layer.clone(),
558            // Region is always opened in read only mode.
559            manifest_ctx: Arc::new(ManifestContext::new(
560                manifest_manager,
561                RegionRoleState::Follower,
562            )),
563            file_purger,
564            provider: provider.clone(),
565            last_flush_millis: AtomicI64::new(now),
566            last_compaction_millis: AtomicI64::new(now),
567            time_provider: self.time_provider.clone(),
568            topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id),
569            written_bytes: Arc::new(AtomicU64::new(0)),
570            stats: self.stats.clone(),
571        };
572
573        let region = Arc::new(region);
574
575        maybe_load_cache(&region, config, &self.cache_manager);
576
577        Ok(Some(region))
578    }
579
580    /// Returns a new manifest options.
581    fn manifest_options(
582        config: &MitoConfig,
583        options: &RegionOptions,
584        region_dir: &str,
585        object_store_manager: &ObjectStoreManagerRef,
586    ) -> Result<RegionManifestOptions> {
587        let object_store = get_object_store(&options.storage, object_store_manager)?;
588        Ok(RegionManifestOptions {
589            manifest_dir: new_manifest_dir(region_dir),
590            object_store,
591            // We don't allow users to set the compression algorithm as we use it as a file suffix.
592            // Currently, the manifest storage doesn't have good support for changing compression algorithms.
593            compress_type: manifest_compress_type(config.compress_manifest),
594            checkpoint_distance: config.manifest_checkpoint_distance,
595            remove_file_options: RemoveFileOptions {
596                enable_gc: config.gc.enable,
597            },
598        })
599    }
600}
601
602/// Creates a version builder from a region manifest.
603pub(crate) fn version_builder_from_manifest(
604    manifest: &RegionManifest,
605    metadata: RegionMetadataRef,
606    file_purger: FilePurgerRef,
607    mutable: TimePartitionsRef,
608    region_options: RegionOptions,
609) -> VersionBuilder {
610    VersionBuilder::new(metadata, mutable)
611        .add_files(file_purger, manifest.files.values().cloned())
612        .flushed_entry_id(manifest.flushed_entry_id)
613        .flushed_sequence(manifest.flushed_sequence)
614        .truncated_entry_id(manifest.truncated_entry_id)
615        .compaction_time_window(manifest.compaction_time_window)
616        .options(region_options)
617}
618
619/// Updates region options by persistent options.
620pub(crate) fn sanitize_region_options(manifest: &RegionManifest, options: &mut RegionOptions) {
621    let option_format = options.sst_format.unwrap_or_default();
622    if option_format != manifest.sst_format {
623        common_telemetry::warn!(
624            "Overriding SST format from {:?} to {:?} for region {}",
625            option_format,
626            manifest.sst_format,
627            manifest.metadata.region_id,
628        );
629        options.sst_format = Some(manifest.sst_format);
630    }
631}
632
633/// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store.
634pub fn get_object_store(
635    name: &Option<String>,
636    object_store_manager: &ObjectStoreManagerRef,
637) -> Result<object_store::ObjectStore> {
638    if let Some(name) = name {
639        Ok(object_store_manager
640            .find(name)
641            .with_context(|| ObjectStoreNotFoundSnafu {
642                object_store: name.clone(),
643            })?
644            .clone())
645    } else {
646        Ok(object_store_manager.default_object_store().clone())
647    }
648}
649
650/// A loader for loading metadata from a region dir.
651pub struct RegionMetadataLoader {
652    config: Arc<MitoConfig>,
653    object_store_manager: ObjectStoreManagerRef,
654}
655
656impl RegionMetadataLoader {
657    /// Creates a new `RegionOpenerBuilder`.
658    pub fn new(config: Arc<MitoConfig>, object_store_manager: ObjectStoreManagerRef) -> Self {
659        Self {
660            config,
661            object_store_manager,
662        }
663    }
664
665    /// Loads the metadata of the region from the region dir.
666    pub async fn load(
667        &self,
668        region_dir: &str,
669        region_options: &RegionOptions,
670    ) -> Result<Option<RegionMetadataRef>> {
671        let manifest = self.load_manifest(region_dir, region_options).await?;
672        Ok(manifest.map(|m| m.metadata.clone()))
673    }
674
675    /// Loads the manifest of the region from the region dir.
676    pub async fn load_manifest(
677        &self,
678        region_dir: &str,
679        region_options: &RegionOptions,
680    ) -> Result<Option<Arc<RegionManifest>>> {
681        let region_manifest_options = RegionOpener::manifest_options(
682            &self.config,
683            region_options,
684            region_dir,
685            &self.object_store_manager,
686        )?;
687        let Some(manifest_manager) =
688            RegionManifestManager::open(region_manifest_options, &Default::default()).await?
689        else {
690            return Ok(None);
691        };
692
693        let manifest = manifest_manager.manifest();
694        Ok(Some(manifest))
695    }
696}
697
698/// Checks whether the recovered region has the same schema as region to create.
699pub(crate) fn check_recovered_region(
700    recovered: &RegionMetadata,
701    region_id: RegionId,
702    column_metadatas: &[ColumnMetadata],
703    primary_key: &[ColumnId],
704) -> Result<()> {
705    if recovered.region_id != region_id {
706        error!(
707            "Recovered region {}, expect region {}",
708            recovered.region_id, region_id
709        );
710        return RegionCorruptedSnafu {
711            region_id,
712            reason: format!(
713                "recovered metadata has different region id {}",
714                recovered.region_id
715            ),
716        }
717        .fail();
718    }
719    if recovered.column_metadatas != column_metadatas {
720        error!(
721            "Unexpected schema in recovered region {}, recovered: {:?}, expect: {:?}",
722            recovered.region_id, recovered.column_metadatas, column_metadatas
723        );
724
725        return RegionCorruptedSnafu {
726            region_id,
727            reason: "recovered metadata has different schema",
728        }
729        .fail();
730    }
731    if recovered.primary_key != primary_key {
732        error!(
733            "Unexpected primary key in recovered region {}, recovered: {:?}, expect: {:?}",
734            recovered.region_id, recovered.primary_key, primary_key
735        );
736
737        return RegionCorruptedSnafu {
738            region_id,
739            reason: "recovered metadata has different primary key",
740        }
741        .fail();
742    }
743
744    Ok(())
745}
746
747/// Replays the mutations from WAL and inserts mutations to memtable of given region.
748pub(crate) async fn replay_memtable<F>(
749    provider: &Provider,
750    mut wal_entry_reader: Box<dyn WalEntryReader>,
751    region_id: RegionId,
752    flushed_entry_id: EntryId,
753    version_control: &VersionControlRef,
754    allow_stale_entries: bool,
755    on_region_opened: F,
756) -> Result<EntryId>
757where
758    F: FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> + Send,
759{
760    let now = Instant::now();
761    let mut rows_replayed = 0;
762    // Last entry id should start from flushed entry id since there might be no
763    // data in the WAL.
764    let mut last_entry_id = flushed_entry_id;
765    let replay_from_entry_id = flushed_entry_id + 1;
766
767    let mut wal_stream = wal_entry_reader.read(provider, replay_from_entry_id)?;
768    while let Some(res) = wal_stream.next().await {
769        let (entry_id, entry) = res?;
770        if entry_id <= flushed_entry_id {
771            warn!(
772                "Stale WAL entries read during replay, region id: {}, flushed entry id: {}, entry id read: {}",
773                region_id, flushed_entry_id, entry_id
774            );
775            ensure!(
776                allow_stale_entries,
777                StaleLogEntrySnafu {
778                    region_id,
779                    flushed_entry_id,
780                    unexpected_entry_id: entry_id,
781                }
782            );
783        }
784        last_entry_id = last_entry_id.max(entry_id);
785
786        let mut region_write_ctx = RegionWriteCtx::new(
787            region_id,
788            version_control,
789            provider.clone(),
790            // For WAL replay, we don't need to track the write bytes rate.
791            None,
792        );
793        for mutation in entry.mutations {
794            rows_replayed += mutation
795                .rows
796                .as_ref()
797                .map(|rows| rows.rows.len())
798                .unwrap_or(0);
799            region_write_ctx.push_mutation(
800                mutation.op_type,
801                mutation.rows,
802                mutation.write_hint,
803                OptionOutputTx::none(),
804                // We should respect the sequence in WAL during replay.
805                Some(mutation.sequence),
806            );
807        }
808
809        for bulk_entry in entry.bulk_entries {
810            let part = BulkPart::try_from(bulk_entry)?;
811            rows_replayed += part.num_rows();
812            // During replay, we should adopt the sequence from WAL.
813            let bulk_sequence_from_wal = part.sequence;
814            ensure!(
815                region_write_ctx.push_bulk(
816                    OptionOutputTx::none(),
817                    part,
818                    Some(bulk_sequence_from_wal)
819                ),
820                RegionCorruptedSnafu {
821                    region_id,
822                    reason: "unable to replay memtable with bulk entries",
823                }
824            );
825        }
826
827        // set next_entry_id and write to memtable.
828        region_write_ctx.set_next_entry_id(last_entry_id + 1);
829        region_write_ctx.write_memtable().await;
830        region_write_ctx.write_bulk().await;
831    }
832
833    // TODO(weny): We need to update `flushed_entry_id` in the region manifest
834    // to avoid reading potentially incomplete entries in the future.
835    (on_region_opened)(region_id, flushed_entry_id, provider).await?;
836
837    let series_count = version_control.current().series_count();
838    info!(
839        "Replay WAL for region: {}, provider: {:?}, rows recovered: {}, replay from entry id: {}, last entry id: {}, total timeseries replayed: {}, elapsed: {:?}",
840        region_id,
841        provider,
842        rows_replayed,
843        replay_from_entry_id,
844        last_entry_id,
845        series_count,
846        now.elapsed()
847    );
848    Ok(last_entry_id)
849}
850
851/// Returns the directory to the manifest files.
852pub(crate) fn new_manifest_dir(region_dir: &str) -> String {
853    join_dir(region_dir, "manifest")
854}
855
856/// A task to load and fill the region file cache.
857pub(crate) struct RegionLoadCacheTask {
858    region: MitoRegionRef,
859}
860
861impl RegionLoadCacheTask {
862    pub(crate) fn new(region: MitoRegionRef) -> Self {
863        Self { region }
864    }
865
866    /// Fills the file cache with index files from the region.
867    pub(crate) async fn fill_cache(&self, file_cache: FileCacheRef) {
868        let region_id = self.region.region_id;
869        let table_dir = self.region.access_layer.table_dir();
870        let path_type = self.region.access_layer.path_type();
871        let object_store = self.region.access_layer.object_store();
872        let version_control = &self.region.version_control;
873
874        // Collects IndexKeys, file sizes, and max timestamps for files that need to be downloaded
875        let mut files_to_download = Vec::new();
876        let mut files_already_cached = 0;
877
878        {
879            let version = version_control.current().version;
880            for level in version.ssts.levels() {
881                for file_handle in level.files.values() {
882                    let file_meta = file_handle.meta_ref();
883                    if file_meta.exists_index() {
884                        let puffin_key = IndexKey::new(
885                            file_meta.region_id,
886                            file_meta.index_file_id().file_id(),
887                            FileType::Puffin,
888                        );
889
890                        if !file_cache.contains_key(&puffin_key) {
891                            files_to_download.push((
892                                puffin_key,
893                                file_meta.index_file_size,
894                                file_meta.time_range.1, // max timestamp
895                            ));
896                        } else {
897                            files_already_cached += 1;
898                        }
899                    }
900                }
901            }
902            // Releases the Version after the scope to avoid holding the memtables and file handles
903            // for a long time.
904        }
905
906        // Sorts files by max timestamp in descending order to loads latest files first
907        files_to_download.sort_by(|a, b| b.2.cmp(&a.2));
908
909        let total_files = files_to_download.len() as i64;
910
911        info!(
912            "Starting background index cache preload for region {}, total_files_to_download: {}, files_already_cached: {}",
913            region_id, total_files, files_already_cached
914        );
915
916        CACHE_FILL_PENDING_FILES.add(total_files);
917
918        let mut files_downloaded = 0;
919        let mut files_skipped = 0;
920
921        for (puffin_key, file_size, max_timestamp) in files_to_download {
922            let current_size = file_cache.puffin_cache_size();
923            let capacity = file_cache.puffin_cache_capacity();
924            let region_state = self.region.state();
925            if !can_load_cache(region_state) {
926                info!(
927                    "Stopping index cache by state: {:?}, region: {}, current_size: {}, capacity: {}",
928                    region_state, region_id, current_size, capacity
929                );
930                break;
931            }
932
933            // Checks if adding this file would exceed capacity
934            if current_size + file_size > capacity {
935                info!(
936                    "Stopping index cache preload due to capacity limit, region: {}, file_id: {}, current_size: {}, file_size: {}, capacity: {}, file_timestamp: {:?}",
937                    region_id, puffin_key.file_id, current_size, file_size, capacity, max_timestamp
938                );
939                files_skipped = (total_files - files_downloaded) as usize;
940                CACHE_FILL_PENDING_FILES.sub(total_files - files_downloaded);
941                break;
942            }
943
944            let index_remote_path = location::index_file_path(
945                table_dir,
946                RegionFileId::new(puffin_key.region_id, puffin_key.file_id),
947                path_type,
948            );
949
950            match file_cache
951                .download(puffin_key, &index_remote_path, object_store, file_size)
952                .await
953            {
954                Ok(_) => {
955                    debug!(
956                        "Downloaded index file to write cache, region: {}, file_id: {}",
957                        region_id, puffin_key.file_id
958                    );
959                    files_downloaded += 1;
960                    CACHE_FILL_DOWNLOADED_FILES.inc_by(1);
961                    CACHE_FILL_PENDING_FILES.dec();
962                }
963                Err(e) => {
964                    warn!(
965                        e; "Failed to download index file to write cache, region: {}, file_id: {}",
966                        region_id, puffin_key.file_id
967                    );
968                    CACHE_FILL_PENDING_FILES.dec();
969                }
970            }
971        }
972
973        info!(
974            "Completed background cache fill task for region {}, total_files: {}, files_downloaded: {}, files_already_cached: {}, files_skipped: {}",
975            region_id, total_files, files_downloaded, files_already_cached, files_skipped
976        );
977    }
978}
979
980/// Loads all index (Puffin) files from the version into the write cache.
981fn maybe_load_cache(
982    region: &MitoRegionRef,
983    config: &MitoConfig,
984    cache_manager: &Option<CacheManagerRef>,
985) {
986    let Some(cache_manager) = cache_manager else {
987        return;
988    };
989    let Some(write_cache) = cache_manager.write_cache() else {
990        return;
991    };
992
993    let preload_enabled = config.preload_index_cache;
994    if !preload_enabled {
995        return;
996    }
997
998    let task = RegionLoadCacheTask::new(region.clone());
999    write_cache.load_region_cache(task);
1000}
1001
1002fn can_load_cache(state: RegionRoleState) -> bool {
1003    match state {
1004        RegionRoleState::Leader(RegionLeaderState::Writable)
1005        | RegionRoleState::Leader(RegionLeaderState::Staging)
1006        | RegionRoleState::Leader(RegionLeaderState::Altering)
1007        | RegionRoleState::Leader(RegionLeaderState::Editing)
1008        | RegionRoleState::Follower => true,
1009        // The region will be closed soon if it is downgrading.
1010        RegionRoleState::Leader(RegionLeaderState::Downgrading)
1011        | RegionRoleState::Leader(RegionLeaderState::Dropping)
1012        | RegionRoleState::Leader(RegionLeaderState::Truncating) => false,
1013    }
1014}