mito2/region/
opener.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Region opener.
16
17use std::any::TypeId;
18use std::collections::HashMap;
19use std::sync::atomic::{AtomicI64, AtomicU64};
20use std::sync::{Arc, Mutex};
21use std::time::Instant;
22
23use common_telemetry::{debug, error, info, warn};
24use common_wal::options::WalOptions;
25use futures::StreamExt;
26use futures::future::BoxFuture;
27use log_store::kafka::log_store::KafkaLogStore;
28use log_store::noop::log_store::NoopLogStore;
29use log_store::raft_engine::log_store::RaftEngineLogStore;
30use object_store::manager::ObjectStoreManagerRef;
31use object_store::util::normalize_dir;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::logstore::LogStore;
34use store_api::logstore::provider::Provider;
35use store_api::metadata::{
36    ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
37};
38use store_api::region_engine::RegionRole;
39use store_api::region_request::PathType;
40use store_api::storage::{ColumnId, RegionId};
41
42use crate::access_layer::AccessLayer;
43use crate::cache::CacheManagerRef;
44use crate::cache::file_cache::{FileCache, FileType, IndexKey};
45use crate::config::MitoConfig;
46use crate::error;
47use crate::error::{
48    EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu,
49    Result, StaleLogEntrySnafu,
50};
51use crate::manifest::action::RegionManifest;
52use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
53use crate::memtable::MemtableBuilderProvider;
54use crate::memtable::bulk::part::BulkPart;
55use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef};
56use crate::metrics::{CACHE_FILL_DOWNLOADED_FILES, CACHE_FILL_PENDING_FILES};
57use crate::region::options::RegionOptions;
58use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
59use crate::region::{
60    ManifestContext, ManifestStats, MitoRegion, MitoRegionRef, RegionLeaderState, RegionRoleState,
61};
62use crate::region_write_ctx::RegionWriteCtx;
63use crate::request::OptionOutputTx;
64use crate::schedule::scheduler::SchedulerRef;
65use crate::sst::FormatType;
66use crate::sst::file::{RegionFileId, RegionIndexId};
67use crate::sst::file_purger::{FilePurgerRef, create_file_purger};
68use crate::sst::file_ref::FileReferenceManagerRef;
69use crate::sst::index::intermediate::IntermediateManager;
70use crate::sst::index::puffin_manager::PuffinManagerFactory;
71use crate::sst::location::{self, region_dir_from_table_dir};
72use crate::time_provider::TimeProviderRef;
73use crate::wal::entry_reader::WalEntryReader;
74use crate::wal::{EntryId, Wal};
75
76/// A fetcher to retrieve partition expr for a region.
77///
78/// Compatibility: older regions didn't persist `partition_expr` in engine metadata,
79/// while newer ones do. On open, we backfill it via this fetcher and persist it
80/// to the manifest so future opens don't need refetching.
81#[async_trait::async_trait]
82pub trait PartitionExprFetcher {
83    async fn fetch_expr(&self, region_id: RegionId) -> Option<String>;
84}
85
86pub type PartitionExprFetcherRef = Arc<dyn PartitionExprFetcher + Send + Sync>;
87
88/// Builder to create a new [MitoRegion] or open an existing one.
89pub(crate) struct RegionOpener {
90    region_id: RegionId,
91    metadata_builder: Option<RegionMetadataBuilder>,
92    memtable_builder_provider: MemtableBuilderProvider,
93    object_store_manager: ObjectStoreManagerRef,
94    table_dir: String,
95    path_type: PathType,
96    purge_scheduler: SchedulerRef,
97    options: Option<RegionOptions>,
98    cache_manager: Option<CacheManagerRef>,
99    skip_wal_replay: bool,
100    puffin_manager_factory: PuffinManagerFactory,
101    intermediate_manager: IntermediateManager,
102    time_provider: TimeProviderRef,
103    stats: ManifestStats,
104    wal_entry_reader: Option<Box<dyn WalEntryReader>>,
105    replay_checkpoint: Option<u64>,
106    file_ref_manager: FileReferenceManagerRef,
107    partition_expr_fetcher: PartitionExprFetcherRef,
108}
109
110impl RegionOpener {
111    /// Returns a new opener.
112    // TODO(LFC): Reduce the number of arguments.
113    #[allow(clippy::too_many_arguments)]
114    pub(crate) fn new(
115        region_id: RegionId,
116        table_dir: &str,
117        path_type: PathType,
118        memtable_builder_provider: MemtableBuilderProvider,
119        object_store_manager: ObjectStoreManagerRef,
120        purge_scheduler: SchedulerRef,
121        puffin_manager_factory: PuffinManagerFactory,
122        intermediate_manager: IntermediateManager,
123        time_provider: TimeProviderRef,
124        file_ref_manager: FileReferenceManagerRef,
125        partition_expr_fetcher: PartitionExprFetcherRef,
126    ) -> RegionOpener {
127        RegionOpener {
128            region_id,
129            metadata_builder: None,
130            memtable_builder_provider,
131            object_store_manager,
132            table_dir: normalize_dir(table_dir),
133            path_type,
134            purge_scheduler,
135            options: None,
136            cache_manager: None,
137            skip_wal_replay: false,
138            puffin_manager_factory,
139            intermediate_manager,
140            time_provider,
141            stats: Default::default(),
142            wal_entry_reader: None,
143            replay_checkpoint: None,
144            file_ref_manager,
145            partition_expr_fetcher,
146        }
147    }
148
149    /// Sets metadata builder of the region to create.
150    pub(crate) fn metadata_builder(mut self, builder: RegionMetadataBuilder) -> Self {
151        self.metadata_builder = Some(builder);
152        self
153    }
154
155    /// Computes the region directory from table_dir and region_id.
156    fn region_dir(&self) -> String {
157        region_dir_from_table_dir(&self.table_dir, self.region_id, self.path_type)
158    }
159
160    /// Builds the region metadata.
161    ///
162    /// # Panics
163    /// - Panics if `options` is not set.
164    /// - Panics if `metadata_builder` is not set.
165    fn build_metadata(&mut self) -> Result<RegionMetadata> {
166        let options = self.options.as_ref().unwrap();
167        let mut metadata_builder = self.metadata_builder.take().unwrap();
168        metadata_builder.primary_key_encoding(options.primary_key_encoding());
169        metadata_builder.build().context(InvalidMetadataSnafu)
170    }
171
172    /// Parses and sets options for the region.
173    pub(crate) fn parse_options(self, options: HashMap<String, String>) -> Result<Self> {
174        self.options(RegionOptions::try_from(&options)?)
175    }
176
177    /// Sets the replay checkpoint for the region.
178    pub(crate) fn replay_checkpoint(mut self, replay_checkpoint: Option<u64>) -> Self {
179        self.replay_checkpoint = replay_checkpoint;
180        self
181    }
182
183    /// If a [WalEntryReader] is set, the [RegionOpener] will use [WalEntryReader] instead of
184    /// constructing a new one from scratch.
185    pub(crate) fn wal_entry_reader(
186        mut self,
187        wal_entry_reader: Option<Box<dyn WalEntryReader>>,
188    ) -> Self {
189        self.wal_entry_reader = wal_entry_reader;
190        self
191    }
192
193    /// Sets options for the region.
194    pub(crate) fn options(mut self, options: RegionOptions) -> Result<Self> {
195        options.validate()?;
196        self.options = Some(options);
197        Ok(self)
198    }
199
200    /// Sets the cache manager for the region.
201    pub(crate) fn cache(mut self, cache_manager: Option<CacheManagerRef>) -> Self {
202        self.cache_manager = cache_manager;
203        self
204    }
205
206    /// Sets the `skip_wal_replay`.
207    pub(crate) fn skip_wal_replay(mut self, skip: bool) -> Self {
208        self.skip_wal_replay = skip;
209        self
210    }
211
212    /// Writes region manifest and creates a new region if it does not exist.
213    /// Opens the region if it already exists.
214    ///
215    /// # Panics
216    /// - Panics if `metadata_builder` is not set.
217    /// - Panics if `options` is not set.
218    pub(crate) async fn create_or_open<S: LogStore>(
219        mut self,
220        config: &MitoConfig,
221        wal: &Wal<S>,
222    ) -> Result<MitoRegionRef> {
223        let region_id = self.region_id;
224        let region_dir = self.region_dir();
225        let metadata = self.build_metadata()?;
226        // Tries to open the region.
227        match self.maybe_open(config, wal).await {
228            Ok(Some(region)) => {
229                let recovered = region.metadata();
230                // Checks the schema of the region.
231                let expect = &metadata;
232                check_recovered_region(
233                    &recovered,
234                    expect.region_id,
235                    &expect.column_metadatas,
236                    &expect.primary_key,
237                )?;
238                // To keep consistency with Create behavior, set the opened Region to RegionRole::Leader.
239                region.set_role(RegionRole::Leader);
240
241                return Ok(region);
242            }
243            Ok(None) => {
244                debug!(
245                    "No data under directory {}, region_id: {}",
246                    region_dir, self.region_id
247                );
248            }
249            Err(e) => {
250                warn!(e;
251                    "Failed to open region {} before creating it, region_dir: {}",
252                    self.region_id, region_dir
253                );
254            }
255        }
256        // Safety: must be set before calling this method.
257        let mut options = self.options.take().unwrap();
258        let object_store = get_object_store(&options.storage, &self.object_store_manager)?;
259        let provider = self.provider::<S>(&options.wal_options)?;
260        let metadata = Arc::new(metadata);
261        // Sets the sst_format based on options or flat_format flag
262        let sst_format = if let Some(format) = options.sst_format {
263            format
264        } else if config.default_experimental_flat_format {
265            options.sst_format = Some(FormatType::Flat);
266            FormatType::Flat
267        } else {
268            // Default to PrimaryKeyParquet for newly created regions
269            options.sst_format = Some(FormatType::PrimaryKey);
270            FormatType::PrimaryKey
271        };
272        // Create a manifest manager for this region and writes regions to the manifest file.
273        let mut region_manifest_options =
274            RegionManifestOptions::new(config, &region_dir, &object_store);
275        // Set manifest cache if available
276        region_manifest_options.manifest_cache = self
277            .cache_manager
278            .as_ref()
279            .and_then(|cm| cm.write_cache())
280            .and_then(|wc| wc.manifest_cache());
281        // For remote WAL, we need to set flushed_entry_id to current topic's latest entry id.
282        let flushed_entry_id = provider.initial_flushed_entry_id::<S>(wal.store());
283        let manifest_manager = RegionManifestManager::new(
284            metadata.clone(),
285            flushed_entry_id,
286            region_manifest_options,
287            sst_format,
288            &self.stats,
289        )
290        .await?;
291
292        let memtable_builder = self.memtable_builder_provider.builder_for_options(&options);
293        let part_duration = options.compaction.time_window();
294        // Initial memtable id is 0.
295        let mutable = Arc::new(TimePartitions::new(
296            metadata.clone(),
297            memtable_builder.clone(),
298            0,
299            part_duration,
300        ));
301
302        debug!(
303            "Create region {} with options: {:?}, default_flat_format: {}",
304            region_id, options, config.default_experimental_flat_format
305        );
306
307        let version = VersionBuilder::new(metadata, mutable)
308            .options(options)
309            .build();
310        let version_control = Arc::new(VersionControl::new(version));
311        let access_layer = Arc::new(AccessLayer::new(
312            self.table_dir.clone(),
313            self.path_type,
314            object_store,
315            self.puffin_manager_factory,
316            self.intermediate_manager,
317        ));
318        let now = self.time_provider.current_time_millis();
319
320        Ok(Arc::new(MitoRegion {
321            region_id,
322            version_control,
323            access_layer: access_layer.clone(),
324            // Region is writable after it is created.
325            manifest_ctx: Arc::new(ManifestContext::new(
326                manifest_manager,
327                RegionRoleState::Leader(RegionLeaderState::Writable),
328            )),
329            file_purger: create_file_purger(
330                config.gc.enable,
331                self.purge_scheduler,
332                access_layer,
333                self.cache_manager,
334                self.file_ref_manager.clone(),
335            ),
336            provider,
337            last_flush_millis: AtomicI64::new(now),
338            last_compaction_millis: AtomicI64::new(now),
339            time_provider: self.time_provider.clone(),
340            topic_latest_entry_id: AtomicU64::new(0),
341            written_bytes: Arc::new(AtomicU64::new(0)),
342            stats: self.stats,
343            staging_partition_expr: Mutex::new(None),
344        }))
345    }
346
347    /// Opens an existing region in read only mode.
348    ///
349    /// Returns error if the region doesn't exist.
350    pub(crate) async fn open<S: LogStore>(
351        mut self,
352        config: &MitoConfig,
353        wal: &Wal<S>,
354    ) -> Result<MitoRegionRef> {
355        let region_id = self.region_id;
356        let region_dir = self.region_dir();
357        let region = self
358            .maybe_open(config, wal)
359            .await?
360            .with_context(|| EmptyRegionDirSnafu {
361                region_id,
362                region_dir: &region_dir,
363            })?;
364
365        ensure!(
366            region.region_id == self.region_id,
367            RegionCorruptedSnafu {
368                region_id: self.region_id,
369                reason: format!(
370                    "recovered region has different region id {}",
371                    region.region_id
372                ),
373            }
374        );
375
376        Ok(region)
377    }
378
379    fn provider<S: LogStore>(&self, wal_options: &WalOptions) -> Result<Provider> {
380        match wal_options {
381            WalOptions::RaftEngine => {
382                ensure!(
383                    TypeId::of::<RaftEngineLogStore>() == TypeId::of::<S>()
384                        || TypeId::of::<NoopLogStore>() == TypeId::of::<S>(),
385                    error::IncompatibleWalProviderChangeSnafu {
386                        global: "`kafka`",
387                        region: "`raft_engine`",
388                    }
389                );
390                Ok(Provider::raft_engine_provider(self.region_id.as_u64()))
391            }
392            WalOptions::Kafka(options) => {
393                ensure!(
394                    TypeId::of::<KafkaLogStore>() == TypeId::of::<S>()
395                        || TypeId::of::<NoopLogStore>() == TypeId::of::<S>(),
396                    error::IncompatibleWalProviderChangeSnafu {
397                        global: "`raft_engine`",
398                        region: "`kafka`",
399                    }
400                );
401                Ok(Provider::kafka_provider(options.topic.clone()))
402            }
403            WalOptions::Noop => Ok(Provider::noop_provider()),
404        }
405    }
406
407    /// Tries to open the region and returns `None` if the region directory is empty.
408    async fn maybe_open<S: LogStore>(
409        &mut self,
410        config: &MitoConfig,
411        wal: &Wal<S>,
412    ) -> Result<Option<MitoRegionRef>> {
413        let now = Instant::now();
414        let mut region_options = self.options.as_ref().unwrap().clone();
415        let object_storage = get_object_store(&region_options.storage, &self.object_store_manager)?;
416        let mut region_manifest_options =
417            RegionManifestOptions::new(config, &self.region_dir(), &object_storage);
418        // Set manifest cache if available
419        region_manifest_options.manifest_cache = self
420            .cache_manager
421            .as_ref()
422            .and_then(|cm| cm.write_cache())
423            .and_then(|wc| wc.manifest_cache());
424        let Some(manifest_manager) =
425            RegionManifestManager::open(region_manifest_options, &self.stats).await?
426        else {
427            return Ok(None);
428        };
429
430        // Backfill `partition_expr` if missing. Use the backfilled metadata in-memory during this open.
431        let manifest = manifest_manager.manifest();
432        let metadata = if manifest.metadata.partition_expr.is_none()
433            && let Some(expr_json) = self.partition_expr_fetcher.fetch_expr(self.region_id).await
434        {
435            let metadata = manifest.metadata.as_ref().clone();
436            let mut builder = RegionMetadataBuilder::from_existing(metadata);
437            builder.partition_expr_json(Some(expr_json));
438            Arc::new(builder.build().context(InvalidMetadataSnafu)?)
439        } else {
440            manifest.metadata.clone()
441        };
442        // Updates the region options with the manifest.
443        sanitize_region_options(&manifest, &mut region_options);
444
445        let region_id = self.region_id;
446        let provider = self.provider::<S>(&region_options.wal_options)?;
447        let wal_entry_reader = self
448            .wal_entry_reader
449            .take()
450            .unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
451        let on_region_opened = wal.on_region_opened();
452        let object_store = get_object_store(&region_options.storage, &self.object_store_manager)?;
453
454        debug!(
455            "Open region {} at {} with options: {:?}",
456            region_id, self.table_dir, self.options
457        );
458
459        let access_layer = Arc::new(AccessLayer::new(
460            self.table_dir.clone(),
461            self.path_type,
462            object_store,
463            self.puffin_manager_factory.clone(),
464            self.intermediate_manager.clone(),
465        ));
466        let file_purger = create_file_purger(
467            config.gc.enable,
468            self.purge_scheduler.clone(),
469            access_layer.clone(),
470            self.cache_manager.clone(),
471            self.file_ref_manager.clone(),
472        );
473        // We should sanitize the region options before creating a new memtable.
474        let memtable_builder = self
475            .memtable_builder_provider
476            .builder_for_options(&region_options);
477        // Use compaction time window in the manifest if region doesn't provide
478        // the time window option.
479        let part_duration = region_options
480            .compaction
481            .time_window()
482            .or(manifest.compaction_time_window);
483        // Initial memtable id is 0.
484        let mutable = Arc::new(TimePartitions::new(
485            metadata.clone(),
486            memtable_builder.clone(),
487            0,
488            part_duration,
489        ));
490
491        // Updates region options by manifest before creating version.
492        let version_builder = version_builder_from_manifest(
493            &manifest,
494            metadata,
495            file_purger.clone(),
496            mutable,
497            region_options,
498        );
499        let version = version_builder.build();
500        let flushed_entry_id = version.flushed_entry_id;
501        let version_control = Arc::new(VersionControl::new(version));
502
503        let topic_latest_entry_id = if !self.skip_wal_replay {
504            let replay_from_entry_id = self
505                .replay_checkpoint
506                .unwrap_or_default()
507                .max(flushed_entry_id);
508            info!(
509                "Start replaying memtable at replay_from_entry_id: {} for region {}, manifest version: {}, flushed entry id: {}, elapsed: {:?}",
510                replay_from_entry_id,
511                region_id,
512                manifest.manifest_version,
513                flushed_entry_id,
514                now.elapsed()
515            );
516            replay_memtable(
517                &provider,
518                wal_entry_reader,
519                region_id,
520                replay_from_entry_id,
521                &version_control,
522                config.allow_stale_entries,
523                on_region_opened,
524            )
525            .await?;
526            // For remote WAL, we need to set topic_latest_entry_id to current topic's latest entry id.
527            // Only set after the WAL replay is completed.
528
529            if provider.is_remote_wal() && version_control.current().version.memtables.is_empty() {
530                wal.store().latest_entry_id(&provider).unwrap_or(0)
531            } else {
532                0
533            }
534        } else {
535            info!(
536                "Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}, elapsed: {:?}",
537                region_id,
538                manifest.manifest_version,
539                flushed_entry_id,
540                now.elapsed()
541            );
542
543            0
544        };
545
546        if let Some(committed_in_manifest) = manifest.committed_sequence {
547            let committed_after_replay = version_control.committed_sequence();
548            if committed_in_manifest > committed_after_replay {
549                info!(
550                    "Overriding committed sequence, region: {}, flushed_sequence: {}, committed_sequence: {} -> {}",
551                    self.region_id,
552                    version_control.current().version.flushed_sequence,
553                    version_control.committed_sequence(),
554                    committed_in_manifest
555                );
556                version_control.set_committed_sequence(committed_in_manifest);
557            }
558        }
559
560        let now = self.time_provider.current_time_millis();
561
562        let region = MitoRegion {
563            region_id: self.region_id,
564            version_control: version_control.clone(),
565            access_layer: access_layer.clone(),
566            // Region is always opened in read only mode.
567            manifest_ctx: Arc::new(ManifestContext::new(
568                manifest_manager,
569                RegionRoleState::Follower,
570            )),
571            file_purger,
572            provider: provider.clone(),
573            last_flush_millis: AtomicI64::new(now),
574            last_compaction_millis: AtomicI64::new(now),
575            time_provider: self.time_provider.clone(),
576            topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id),
577            written_bytes: Arc::new(AtomicU64::new(0)),
578            stats: self.stats.clone(),
579            // TODO(weny): reload the staging partition expr from the manifest.
580            staging_partition_expr: Mutex::new(None),
581        };
582
583        let region = Arc::new(region);
584
585        maybe_load_cache(&region, config, &self.cache_manager);
586
587        Ok(Some(region))
588    }
589}
590
591/// Creates a version builder from a region manifest.
592pub(crate) fn version_builder_from_manifest(
593    manifest: &RegionManifest,
594    metadata: RegionMetadataRef,
595    file_purger: FilePurgerRef,
596    mutable: TimePartitionsRef,
597    region_options: RegionOptions,
598) -> VersionBuilder {
599    VersionBuilder::new(metadata, mutable)
600        .add_files(file_purger, manifest.files.values().cloned())
601        .flushed_entry_id(manifest.flushed_entry_id)
602        .flushed_sequence(manifest.flushed_sequence)
603        .truncated_entry_id(manifest.truncated_entry_id)
604        .compaction_time_window(manifest.compaction_time_window)
605        .options(region_options)
606}
607
608/// Updates region options by persistent options.
609pub(crate) fn sanitize_region_options(manifest: &RegionManifest, options: &mut RegionOptions) {
610    let option_format = options.sst_format.unwrap_or_default();
611    if option_format != manifest.sst_format {
612        common_telemetry::warn!(
613            "Overriding SST format from {:?} to {:?} for region {}",
614            option_format,
615            manifest.sst_format,
616            manifest.metadata.region_id,
617        );
618        options.sst_format = Some(manifest.sst_format);
619    }
620}
621
622/// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store.
623pub fn get_object_store(
624    name: &Option<String>,
625    object_store_manager: &ObjectStoreManagerRef,
626) -> Result<object_store::ObjectStore> {
627    if let Some(name) = name {
628        Ok(object_store_manager
629            .find(name)
630            .with_context(|| ObjectStoreNotFoundSnafu {
631                object_store: name.clone(),
632            })?
633            .clone())
634    } else {
635        Ok(object_store_manager.default_object_store().clone())
636    }
637}
638
639/// A loader for loading metadata from a region dir.
640#[derive(Debug, Clone)]
641pub struct RegionMetadataLoader {
642    config: Arc<MitoConfig>,
643    object_store_manager: ObjectStoreManagerRef,
644}
645
646impl RegionMetadataLoader {
647    /// Creates a new `RegionOpenerBuilder`.
648    pub fn new(config: Arc<MitoConfig>, object_store_manager: ObjectStoreManagerRef) -> Self {
649        Self {
650            config,
651            object_store_manager,
652        }
653    }
654
655    /// Loads the metadata of the region from the region dir.
656    pub async fn load(
657        &self,
658        region_dir: &str,
659        region_options: &RegionOptions,
660    ) -> Result<Option<RegionMetadataRef>> {
661        let manifest = self
662            .load_manifest(region_dir, &region_options.storage)
663            .await?;
664        Ok(manifest.map(|m| m.metadata.clone()))
665    }
666
667    /// Loads the manifest of the region from the region dir.
668    pub async fn load_manifest(
669        &self,
670        region_dir: &str,
671        storage: &Option<String>,
672    ) -> Result<Option<Arc<RegionManifest>>> {
673        let object_store = get_object_store(storage, &self.object_store_manager)?;
674        let region_manifest_options =
675            RegionManifestOptions::new(&self.config, region_dir, &object_store);
676        let Some(manifest_manager) =
677            RegionManifestManager::open(region_manifest_options, &Default::default()).await?
678        else {
679            return Ok(None);
680        };
681
682        let manifest = manifest_manager.manifest();
683        Ok(Some(manifest))
684    }
685}
686
687/// Checks whether the recovered region has the same schema as region to create.
688pub(crate) fn check_recovered_region(
689    recovered: &RegionMetadata,
690    region_id: RegionId,
691    column_metadatas: &[ColumnMetadata],
692    primary_key: &[ColumnId],
693) -> Result<()> {
694    if recovered.region_id != region_id {
695        error!(
696            "Recovered region {}, expect region {}",
697            recovered.region_id, region_id
698        );
699        return RegionCorruptedSnafu {
700            region_id,
701            reason: format!(
702                "recovered metadata has different region id {}",
703                recovered.region_id
704            ),
705        }
706        .fail();
707    }
708    if recovered.column_metadatas != column_metadatas {
709        error!(
710            "Unexpected schema in recovered region {}, recovered: {:?}, expect: {:?}",
711            recovered.region_id, recovered.column_metadatas, column_metadatas
712        );
713
714        return RegionCorruptedSnafu {
715            region_id,
716            reason: "recovered metadata has different schema",
717        }
718        .fail();
719    }
720    if recovered.primary_key != primary_key {
721        error!(
722            "Unexpected primary key in recovered region {}, recovered: {:?}, expect: {:?}",
723            recovered.region_id, recovered.primary_key, primary_key
724        );
725
726        return RegionCorruptedSnafu {
727            region_id,
728            reason: "recovered metadata has different primary key",
729        }
730        .fail();
731    }
732
733    Ok(())
734}
735
736/// Replays the mutations from WAL and inserts mutations to memtable of given region.
737pub(crate) async fn replay_memtable<F>(
738    provider: &Provider,
739    mut wal_entry_reader: Box<dyn WalEntryReader>,
740    region_id: RegionId,
741    flushed_entry_id: EntryId,
742    version_control: &VersionControlRef,
743    allow_stale_entries: bool,
744    on_region_opened: F,
745) -> Result<EntryId>
746where
747    F: FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> + Send,
748{
749    let now = Instant::now();
750    let mut rows_replayed = 0;
751    // Last entry id should start from flushed entry id since there might be no
752    // data in the WAL.
753    let mut last_entry_id = flushed_entry_id;
754    let replay_from_entry_id = flushed_entry_id + 1;
755
756    let mut wal_stream = wal_entry_reader.read(provider, replay_from_entry_id)?;
757    while let Some(res) = wal_stream.next().await {
758        let (entry_id, entry) = res?;
759        if entry_id <= flushed_entry_id {
760            warn!(
761                "Stale WAL entries read during replay, region id: {}, flushed entry id: {}, entry id read: {}",
762                region_id, flushed_entry_id, entry_id
763            );
764            ensure!(
765                allow_stale_entries,
766                StaleLogEntrySnafu {
767                    region_id,
768                    flushed_entry_id,
769                    unexpected_entry_id: entry_id,
770                }
771            );
772        }
773        last_entry_id = last_entry_id.max(entry_id);
774
775        let mut region_write_ctx = RegionWriteCtx::new(
776            region_id,
777            version_control,
778            provider.clone(),
779            // For WAL replay, we don't need to track the write bytes rate.
780            None,
781        );
782        for mutation in entry.mutations {
783            rows_replayed += mutation
784                .rows
785                .as_ref()
786                .map(|rows| rows.rows.len())
787                .unwrap_or(0);
788            region_write_ctx.push_mutation(
789                mutation.op_type,
790                mutation.rows,
791                mutation.write_hint,
792                OptionOutputTx::none(),
793                // We should respect the sequence in WAL during replay.
794                Some(mutation.sequence),
795            );
796        }
797
798        for bulk_entry in entry.bulk_entries {
799            let part = BulkPart::try_from(bulk_entry)?;
800            rows_replayed += part.num_rows();
801            // During replay, we should adopt the sequence from WAL.
802            let bulk_sequence_from_wal = part.sequence;
803            ensure!(
804                region_write_ctx.push_bulk(
805                    OptionOutputTx::none(),
806                    part,
807                    Some(bulk_sequence_from_wal)
808                ),
809                RegionCorruptedSnafu {
810                    region_id,
811                    reason: "unable to replay memtable with bulk entries",
812                }
813            );
814        }
815
816        // set next_entry_id and write to memtable.
817        region_write_ctx.set_next_entry_id(last_entry_id + 1);
818        region_write_ctx.write_memtable().await;
819        region_write_ctx.write_bulk().await;
820    }
821
822    // TODO(weny): We need to update `flushed_entry_id` in the region manifest
823    // to avoid reading potentially incomplete entries in the future.
824    (on_region_opened)(region_id, flushed_entry_id, provider).await?;
825
826    let series_count = version_control.current().series_count();
827    info!(
828        "Replay WAL for region: {}, provider: {:?}, rows recovered: {}, replay from entry id: {}, last entry id: {}, total timeseries replayed: {}, elapsed: {:?}",
829        region_id,
830        provider,
831        rows_replayed,
832        replay_from_entry_id,
833        last_entry_id,
834        series_count,
835        now.elapsed()
836    );
837    Ok(last_entry_id)
838}
839
840/// A task to load and fill the region file cache.
841pub(crate) struct RegionLoadCacheTask {
842    region: MitoRegionRef,
843}
844
845impl RegionLoadCacheTask {
846    pub(crate) fn new(region: MitoRegionRef) -> Self {
847        Self { region }
848    }
849
850    /// Fills the file cache with index files from the region.
851    pub(crate) async fn fill_cache(&self, file_cache: &FileCache) {
852        let region_id = self.region.region_id;
853        let table_dir = self.region.access_layer.table_dir();
854        let path_type = self.region.access_layer.path_type();
855        let object_store = self.region.access_layer.object_store();
856        let version_control = &self.region.version_control;
857
858        // Collects IndexKeys, file sizes, and max timestamps for files that need to be downloaded
859        let mut files_to_download = Vec::new();
860        let mut files_already_cached = 0;
861
862        {
863            let version = version_control.current().version;
864            for level in version.ssts.levels() {
865                for file_handle in level.files.values() {
866                    let file_meta = file_handle.meta_ref();
867                    if file_meta.exists_index() {
868                        let puffin_key = IndexKey::new(
869                            file_meta.region_id,
870                            file_meta.file_id,
871                            FileType::Puffin(file_meta.index_version),
872                        );
873
874                        if !file_cache.contains_key(&puffin_key) {
875                            files_to_download.push((
876                                puffin_key,
877                                file_meta.index_file_size,
878                                file_meta.time_range.1, // max timestamp
879                            ));
880                        } else {
881                            files_already_cached += 1;
882                        }
883                    }
884                }
885            }
886            // Releases the Version after the scope to avoid holding the memtables and file handles
887            // for a long time.
888        }
889
890        // Sorts files by max timestamp in descending order to loads latest files first
891        files_to_download.sort_by(|a, b| b.2.cmp(&a.2));
892
893        let total_files = files_to_download.len() as i64;
894
895        info!(
896            "Starting background index cache preload for region {}, total_files_to_download: {}, files_already_cached: {}",
897            region_id, total_files, files_already_cached
898        );
899
900        CACHE_FILL_PENDING_FILES.add(total_files);
901
902        let mut files_downloaded = 0;
903        let mut files_skipped = 0;
904
905        for (puffin_key, file_size, max_timestamp) in files_to_download {
906            let current_size = file_cache.puffin_cache_size();
907            let capacity = file_cache.puffin_cache_capacity();
908            let region_state = self.region.state();
909            if !can_load_cache(region_state) {
910                info!(
911                    "Stopping index cache by state: {:?}, region: {}, current_size: {}, capacity: {}",
912                    region_state, region_id, current_size, capacity
913                );
914                break;
915            }
916
917            // Checks if adding this file would exceed capacity
918            if current_size + file_size > capacity {
919                info!(
920                    "Stopping index cache preload due to capacity limit, region: {}, file_id: {}, current_size: {}, file_size: {}, capacity: {}, file_timestamp: {:?}",
921                    region_id, puffin_key.file_id, current_size, file_size, capacity, max_timestamp
922                );
923                files_skipped = (total_files - files_downloaded) as usize;
924                CACHE_FILL_PENDING_FILES.sub(total_files - files_downloaded);
925                break;
926            }
927
928            let index_version = if let FileType::Puffin(version) = puffin_key.file_type {
929                version
930            } else {
931                unreachable!("`files_to_download` should only contains Puffin files");
932            };
933            let index_id = RegionIndexId::new(
934                RegionFileId::new(puffin_key.region_id, puffin_key.file_id),
935                index_version,
936            );
937
938            let index_remote_path = location::index_file_path(table_dir, index_id, path_type);
939
940            match file_cache
941                .download(puffin_key, &index_remote_path, object_store, file_size)
942                .await
943            {
944                Ok(_) => {
945                    debug!(
946                        "Downloaded index file to write cache, region: {}, file_id: {}",
947                        region_id, puffin_key.file_id
948                    );
949                    files_downloaded += 1;
950                    CACHE_FILL_DOWNLOADED_FILES.inc_by(1);
951                    CACHE_FILL_PENDING_FILES.dec();
952                }
953                Err(e) => {
954                    warn!(
955                        e; "Failed to download index file to write cache, region: {}, file_id: {}",
956                        region_id, puffin_key.file_id
957                    );
958                    CACHE_FILL_PENDING_FILES.dec();
959                }
960            }
961        }
962
963        info!(
964            "Completed background cache fill task for region {}, total_files: {}, files_downloaded: {}, files_already_cached: {}, files_skipped: {}",
965            region_id, total_files, files_downloaded, files_already_cached, files_skipped
966        );
967    }
968}
969
970/// Loads all index (Puffin) files from the version into the write cache.
971fn maybe_load_cache(
972    region: &MitoRegionRef,
973    config: &MitoConfig,
974    cache_manager: &Option<CacheManagerRef>,
975) {
976    let Some(cache_manager) = cache_manager else {
977        return;
978    };
979    let Some(write_cache) = cache_manager.write_cache() else {
980        return;
981    };
982
983    let preload_enabled = config.preload_index_cache;
984    if !preload_enabled {
985        return;
986    }
987
988    let task = RegionLoadCacheTask::new(region.clone());
989    write_cache.load_region_cache(task);
990}
991
992fn can_load_cache(state: RegionRoleState) -> bool {
993    match state {
994        RegionRoleState::Leader(RegionLeaderState::Writable)
995        | RegionRoleState::Leader(RegionLeaderState::Staging)
996        | RegionRoleState::Leader(RegionLeaderState::Altering)
997        | RegionRoleState::Leader(RegionLeaderState::EnteringStaging)
998        | RegionRoleState::Leader(RegionLeaderState::Editing)
999        | RegionRoleState::Follower => true,
1000        // The region will be closed soon if it is downgrading.
1001        RegionRoleState::Leader(RegionLeaderState::Downgrading)
1002        | RegionRoleState::Leader(RegionLeaderState::Dropping)
1003        | RegionRoleState::Leader(RegionLeaderState::Truncating) => false,
1004    }
1005}