mito2/region/
opener.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Region opener.
16
17use std::any::TypeId;
18use std::collections::HashMap;
19use std::sync::Arc;
20use std::sync::atomic::{AtomicI64, AtomicU64};
21use std::time::Instant;
22
23use common_telemetry::{debug, error, info, warn};
24use common_wal::options::WalOptions;
25use futures::StreamExt;
26use futures::future::BoxFuture;
27use log_store::kafka::log_store::KafkaLogStore;
28use log_store::noop::log_store::NoopLogStore;
29use log_store::raft_engine::log_store::RaftEngineLogStore;
30use object_store::manager::ObjectStoreManagerRef;
31use object_store::util::{join_dir, normalize_dir};
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::logstore::LogStore;
34use store_api::logstore::provider::Provider;
35use store_api::metadata::{
36    ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
37};
38use store_api::region_engine::RegionRole;
39use store_api::region_request::PathType;
40use store_api::storage::{ColumnId, RegionId};
41
42use crate::access_layer::AccessLayer;
43use crate::cache::CacheManagerRef;
44use crate::config::MitoConfig;
45use crate::error;
46use crate::error::{
47    EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu,
48    Result, StaleLogEntrySnafu,
49};
50use crate::manifest::action::RegionManifest;
51use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions};
52use crate::manifest::storage::manifest_compress_type;
53use crate::memtable::MemtableBuilderProvider;
54use crate::memtable::bulk::part::BulkPart;
55use crate::memtable::time_partition::TimePartitions;
56use crate::region::options::RegionOptions;
57use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
58use crate::region::{
59    ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
60};
61use crate::region_write_ctx::RegionWriteCtx;
62use crate::request::OptionOutputTx;
63use crate::schedule::scheduler::SchedulerRef;
64use crate::sst::FormatType;
65use crate::sst::file_purger::create_local_file_purger;
66use crate::sst::file_ref::FileReferenceManagerRef;
67use crate::sst::index::intermediate::IntermediateManager;
68use crate::sst::index::puffin_manager::PuffinManagerFactory;
69use crate::sst::location::region_dir_from_table_dir;
70use crate::time_provider::TimeProviderRef;
71use crate::wal::entry_reader::WalEntryReader;
72use crate::wal::{EntryId, Wal};
73
74/// A fetcher to retrieve partition expr for a region.
75///
76/// Compatibility: older regions didn't persist `partition_expr` in engine metadata,
77/// while newer ones do. On open, we backfill it via this fetcher and persist it
78/// to the manifest so future opens don't need refetching.
79#[async_trait::async_trait]
80pub trait PartitionExprFetcher {
81    async fn fetch_expr(&self, region_id: RegionId) -> Option<String>;
82}
83
84pub type PartitionExprFetcherRef = Arc<dyn PartitionExprFetcher + Send + Sync>;
85
86/// Builder to create a new [MitoRegion] or open an existing one.
87pub(crate) struct RegionOpener {
88    region_id: RegionId,
89    metadata_builder: Option<RegionMetadataBuilder>,
90    memtable_builder_provider: MemtableBuilderProvider,
91    object_store_manager: ObjectStoreManagerRef,
92    table_dir: String,
93    path_type: PathType,
94    purge_scheduler: SchedulerRef,
95    options: Option<RegionOptions>,
96    cache_manager: Option<CacheManagerRef>,
97    skip_wal_replay: bool,
98    puffin_manager_factory: PuffinManagerFactory,
99    intermediate_manager: IntermediateManager,
100    time_provider: TimeProviderRef,
101    stats: ManifestStats,
102    wal_entry_reader: Option<Box<dyn WalEntryReader>>,
103    replay_checkpoint: Option<u64>,
104    file_ref_manager: FileReferenceManagerRef,
105    partition_expr_fetcher: PartitionExprFetcherRef,
106}
107
108impl RegionOpener {
109    /// Returns a new opener.
110    // TODO(LFC): Reduce the number of arguments.
111    #[allow(clippy::too_many_arguments)]
112    pub(crate) fn new(
113        region_id: RegionId,
114        table_dir: &str,
115        path_type: PathType,
116        memtable_builder_provider: MemtableBuilderProvider,
117        object_store_manager: ObjectStoreManagerRef,
118        purge_scheduler: SchedulerRef,
119        puffin_manager_factory: PuffinManagerFactory,
120        intermediate_manager: IntermediateManager,
121        time_provider: TimeProviderRef,
122        file_ref_manager: FileReferenceManagerRef,
123        partition_expr_fetcher: PartitionExprFetcherRef,
124    ) -> RegionOpener {
125        RegionOpener {
126            region_id,
127            metadata_builder: None,
128            memtable_builder_provider,
129            object_store_manager,
130            table_dir: normalize_dir(table_dir),
131            path_type,
132            purge_scheduler,
133            options: None,
134            cache_manager: None,
135            skip_wal_replay: false,
136            puffin_manager_factory,
137            intermediate_manager,
138            time_provider,
139            stats: Default::default(),
140            wal_entry_reader: None,
141            replay_checkpoint: None,
142            file_ref_manager,
143            partition_expr_fetcher,
144        }
145    }
146
147    /// Sets metadata builder of the region to create.
148    pub(crate) fn metadata_builder(mut self, builder: RegionMetadataBuilder) -> Self {
149        self.metadata_builder = Some(builder);
150        self
151    }
152
153    /// Computes the region directory from table_dir and region_id.
154    fn region_dir(&self) -> String {
155        region_dir_from_table_dir(&self.table_dir, self.region_id, self.path_type)
156    }
157
158    /// Builds the region metadata.
159    ///
160    /// # Panics
161    /// - Panics if `options` is not set.
162    /// - Panics if `metadata_builder` is not set.
163    fn build_metadata(&mut self) -> Result<RegionMetadata> {
164        let options = self.options.as_ref().unwrap();
165        let mut metadata_builder = self.metadata_builder.take().unwrap();
166        metadata_builder.primary_key_encoding(options.primary_key_encoding());
167        metadata_builder.build().context(InvalidMetadataSnafu)
168    }
169
170    /// Parses and sets options for the region.
171    pub(crate) fn parse_options(self, options: HashMap<String, String>) -> Result<Self> {
172        self.options(RegionOptions::try_from(&options)?)
173    }
174
175    /// Sets the replay checkpoint for the region.
176    pub(crate) fn replay_checkpoint(mut self, replay_checkpoint: Option<u64>) -> Self {
177        self.replay_checkpoint = replay_checkpoint;
178        self
179    }
180
181    /// If a [WalEntryReader] is set, the [RegionOpener] will use [WalEntryReader] instead of
182    /// constructing a new one from scratch.
183    pub(crate) fn wal_entry_reader(
184        mut self,
185        wal_entry_reader: Option<Box<dyn WalEntryReader>>,
186    ) -> Self {
187        self.wal_entry_reader = wal_entry_reader;
188        self
189    }
190
191    /// Sets options for the region.
192    pub(crate) fn options(mut self, options: RegionOptions) -> Result<Self> {
193        options.validate()?;
194        self.options = Some(options);
195        Ok(self)
196    }
197
198    /// Sets the cache manager for the region.
199    pub(crate) fn cache(mut self, cache_manager: Option<CacheManagerRef>) -> Self {
200        self.cache_manager = cache_manager;
201        self
202    }
203
204    /// Sets the `skip_wal_replay`.
205    pub(crate) fn skip_wal_replay(mut self, skip: bool) -> Self {
206        self.skip_wal_replay = skip;
207        self
208    }
209
210    /// Writes region manifest and creates a new region if it does not exist.
211    /// Opens the region if it already exists.
212    ///
213    /// # Panics
214    /// - Panics if `metadata_builder` is not set.
215    /// - Panics if `options` is not set.
216    pub(crate) async fn create_or_open<S: LogStore>(
217        mut self,
218        config: &MitoConfig,
219        wal: &Wal<S>,
220    ) -> Result<MitoRegion> {
221        let region_id = self.region_id;
222        let region_dir = self.region_dir();
223        let metadata = self.build_metadata()?;
224        // Tries to open the region.
225        match self.maybe_open(config, wal).await {
226            Ok(Some(region)) => {
227                let recovered = region.metadata();
228                // Checks the schema of the region.
229                let expect = &metadata;
230                check_recovered_region(
231                    &recovered,
232                    expect.region_id,
233                    &expect.column_metadatas,
234                    &expect.primary_key,
235                )?;
236                // To keep consistency with Create behavior, set the opened Region to RegionRole::Leader.
237                region.set_role(RegionRole::Leader);
238
239                return Ok(region);
240            }
241            Ok(None) => {
242                debug!(
243                    "No data under directory {}, region_id: {}",
244                    region_dir, self.region_id
245                );
246            }
247            Err(e) => {
248                warn!(e;
249                    "Failed to open region {} before creating it, region_dir: {}",
250                    self.region_id, region_dir
251                );
252            }
253        }
254        // Safety: must be set before calling this method.
255        let options = self.options.take().unwrap();
256        let object_store = get_object_store(&options.storage, &self.object_store_manager)?;
257        let provider = self.provider::<S>(&options.wal_options)?;
258        let metadata = Arc::new(metadata);
259        // Set the sst_format based on options or flat_format flag
260        let sst_format = if let Some(format) = options.sst_format {
261            format
262        } else if config.default_experimental_flat_format {
263            FormatType::Flat
264        } else {
265            // Default to PrimaryKeyParquet for newly created regions
266            FormatType::PrimaryKey
267        };
268        // Create a manifest manager for this region and writes regions to the manifest file.
269        let region_manifest_options =
270            Self::manifest_options(config, &options, &region_dir, &self.object_store_manager)?;
271        // For remote WAL, we need to set flushed_entry_id to current topic's latest entry id.
272        let flushed_entry_id = provider.initial_flushed_entry_id::<S>(wal.store());
273        let manifest_manager = RegionManifestManager::new(
274            metadata.clone(),
275            flushed_entry_id,
276            region_manifest_options,
277            self.stats.total_manifest_size.clone(),
278            self.stats.manifest_version.clone(),
279            sst_format,
280        )
281        .await?;
282
283        let memtable_builder = self.memtable_builder_provider.builder_for_options(&options);
284        let part_duration = options.compaction.time_window();
285        // Initial memtable id is 0.
286        let mutable = Arc::new(TimePartitions::new(
287            metadata.clone(),
288            memtable_builder.clone(),
289            0,
290            part_duration,
291        ));
292
293        debug!("Create region {} with options: {:?}", region_id, options);
294
295        let version = VersionBuilder::new(metadata, mutable)
296            .options(options)
297            .build();
298        let version_control = Arc::new(VersionControl::new(version));
299        let access_layer = Arc::new(AccessLayer::new(
300            self.table_dir.clone(),
301            self.path_type,
302            object_store,
303            self.puffin_manager_factory,
304            self.intermediate_manager,
305        ));
306        let now = self.time_provider.current_time_millis();
307
308        Ok(MitoRegion {
309            region_id,
310            version_control,
311            access_layer: access_layer.clone(),
312            // Region is writable after it is created.
313            manifest_ctx: Arc::new(ManifestContext::new(
314                manifest_manager,
315                RegionRoleState::Leader(RegionLeaderState::Writable),
316            )),
317            file_purger: create_local_file_purger(
318                self.purge_scheduler,
319                access_layer,
320                self.cache_manager,
321                self.file_ref_manager.clone(),
322            ),
323            provider,
324            last_flush_millis: AtomicI64::new(now),
325            last_compaction_millis: AtomicI64::new(now),
326            time_provider: self.time_provider.clone(),
327            topic_latest_entry_id: AtomicU64::new(0),
328            memtable_builder,
329            written_bytes: Arc::new(AtomicU64::new(0)),
330            sst_format,
331            stats: self.stats,
332        })
333    }
334
335    /// Opens an existing region in read only mode.
336    ///
337    /// Returns error if the region doesn't exist.
338    pub(crate) async fn open<S: LogStore>(
339        mut self,
340        config: &MitoConfig,
341        wal: &Wal<S>,
342    ) -> Result<MitoRegion> {
343        let region_id = self.region_id;
344        let region_dir = self.region_dir();
345        let region = self
346            .maybe_open(config, wal)
347            .await?
348            .context(EmptyRegionDirSnafu {
349                region_id,
350                region_dir: &region_dir,
351            })?;
352
353        ensure!(
354            region.region_id == self.region_id,
355            RegionCorruptedSnafu {
356                region_id: self.region_id,
357                reason: format!(
358                    "recovered region has different region id {}",
359                    region.region_id
360                ),
361            }
362        );
363
364        Ok(region)
365    }
366
367    fn provider<S: LogStore>(&self, wal_options: &WalOptions) -> Result<Provider> {
368        match wal_options {
369            WalOptions::RaftEngine => {
370                ensure!(
371                    TypeId::of::<RaftEngineLogStore>() == TypeId::of::<S>()
372                        || TypeId::of::<NoopLogStore>() == TypeId::of::<S>(),
373                    error::IncompatibleWalProviderChangeSnafu {
374                        global: "`kafka`",
375                        region: "`raft_engine`",
376                    }
377                );
378                Ok(Provider::raft_engine_provider(self.region_id.as_u64()))
379            }
380            WalOptions::Kafka(options) => {
381                ensure!(
382                    TypeId::of::<KafkaLogStore>() == TypeId::of::<S>()
383                        || TypeId::of::<NoopLogStore>() == TypeId::of::<S>(),
384                    error::IncompatibleWalProviderChangeSnafu {
385                        global: "`raft_engine`",
386                        region: "`kafka`",
387                    }
388                );
389                Ok(Provider::kafka_provider(options.topic.clone()))
390            }
391            WalOptions::Noop => Ok(Provider::noop_provider()),
392        }
393    }
394
395    /// Tries to open the region and returns `None` if the region directory is empty.
396    async fn maybe_open<S: LogStore>(
397        &mut self,
398        config: &MitoConfig,
399        wal: &Wal<S>,
400    ) -> Result<Option<MitoRegion>> {
401        let region_options = self.options.as_ref().unwrap().clone();
402
403        let region_manifest_options = Self::manifest_options(
404            config,
405            &region_options,
406            &self.region_dir(),
407            &self.object_store_manager,
408        )?;
409        let Some(manifest_manager) = RegionManifestManager::open(
410            region_manifest_options,
411            self.stats.total_manifest_size.clone(),
412            self.stats.manifest_version.clone(),
413        )
414        .await?
415        else {
416            return Ok(None);
417        };
418
419        // Backfill `partition_expr` if missing. Use the backfilled metadata in-memory during this open.
420        let manifest = manifest_manager.manifest();
421        let metadata = if manifest.metadata.partition_expr.is_none()
422            && let Some(expr_json) = self.partition_expr_fetcher.fetch_expr(self.region_id).await
423        {
424            let metadata = manifest.metadata.as_ref().clone();
425            let mut builder = RegionMetadataBuilder::from_existing(metadata);
426            builder.partition_expr_json(Some(expr_json));
427            Arc::new(builder.build().context(InvalidMetadataSnafu)?)
428        } else {
429            manifest.metadata.clone()
430        };
431
432        let region_id = self.region_id;
433        let provider = self.provider::<S>(&region_options.wal_options)?;
434        let wal_entry_reader = self
435            .wal_entry_reader
436            .take()
437            .unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
438        let on_region_opened = wal.on_region_opened();
439        let object_store = get_object_store(&region_options.storage, &self.object_store_manager)?;
440
441        debug!(
442            "Open region {} at {} with options: {:?}",
443            region_id, self.table_dir, self.options
444        );
445
446        let access_layer = Arc::new(AccessLayer::new(
447            self.table_dir.clone(),
448            self.path_type,
449            object_store,
450            self.puffin_manager_factory.clone(),
451            self.intermediate_manager.clone(),
452        ));
453        let file_purger = create_local_file_purger(
454            self.purge_scheduler.clone(),
455            access_layer.clone(),
456            self.cache_manager.clone(),
457            self.file_ref_manager.clone(),
458        );
459        let memtable_builder = self
460            .memtable_builder_provider
461            .builder_for_options(&region_options);
462        // Use compaction time window in the manifest if region doesn't provide
463        // the time window option.
464        let part_duration = region_options
465            .compaction
466            .time_window()
467            .or(manifest.compaction_time_window);
468        // Initial memtable id is 0.
469        let mutable = Arc::new(TimePartitions::new(
470            metadata.clone(),
471            memtable_builder.clone(),
472            0,
473            part_duration,
474        ));
475        let version = VersionBuilder::new(metadata, mutable)
476            .add_files(file_purger.clone(), manifest.files.values().cloned())
477            .flushed_entry_id(manifest.flushed_entry_id)
478            .flushed_sequence(manifest.flushed_sequence)
479            .truncated_entry_id(manifest.truncated_entry_id)
480            .compaction_time_window(manifest.compaction_time_window)
481            .options(region_options)
482            .build();
483        let flushed_entry_id = version.flushed_entry_id;
484        let version_control = Arc::new(VersionControl::new(version));
485
486        let topic_latest_entry_id = if !self.skip_wal_replay {
487            let replay_from_entry_id = self
488                .replay_checkpoint
489                .unwrap_or_default()
490                .max(flushed_entry_id);
491            info!(
492                "Start replaying memtable at replay_from_entry_id: {} for region {}, manifest version: {}, flushed entry id: {}",
493                replay_from_entry_id, region_id, manifest.manifest_version, flushed_entry_id
494            );
495            replay_memtable(
496                &provider,
497                wal_entry_reader,
498                region_id,
499                replay_from_entry_id,
500                &version_control,
501                config.allow_stale_entries,
502                on_region_opened,
503            )
504            .await?;
505            // For remote WAL, we need to set topic_latest_entry_id to current topic's latest entry id.
506            // Only set after the WAL replay is completed.
507
508            if provider.is_remote_wal() && version_control.current().version.memtables.is_empty() {
509                wal.store().latest_entry_id(&provider).unwrap_or(0)
510            } else {
511                0
512            }
513        } else {
514            info!(
515                "Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}",
516                region_id, manifest.manifest_version, flushed_entry_id
517            );
518
519            0
520        };
521
522        if let Some(committed_in_manifest) = manifest.committed_sequence {
523            let committed_after_replay = version_control.committed_sequence();
524            if committed_in_manifest > committed_after_replay {
525                info!(
526                    "Overriding committed sequence, region: {}, flushed_sequence: {}, committed_sequence: {} -> {}",
527                    self.region_id,
528                    version_control.current().version.flushed_sequence,
529                    version_control.committed_sequence(),
530                    committed_in_manifest
531                );
532                version_control.set_committed_sequence(committed_in_manifest);
533            }
534        }
535
536        let now = self.time_provider.current_time_millis();
537        // Read sst_format from manifest
538        let sst_format = manifest.sst_format;
539
540        let region = MitoRegion {
541            region_id: self.region_id,
542            version_control,
543            access_layer,
544            // Region is always opened in read only mode.
545            manifest_ctx: Arc::new(ManifestContext::new(
546                manifest_manager,
547                RegionRoleState::Follower,
548            )),
549            file_purger,
550            provider: provider.clone(),
551            last_flush_millis: AtomicI64::new(now),
552            last_compaction_millis: AtomicI64::new(now),
553            time_provider: self.time_provider.clone(),
554            topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id),
555            written_bytes: Arc::new(AtomicU64::new(0)),
556            memtable_builder,
557            sst_format,
558            stats: self.stats.clone(),
559        };
560        Ok(Some(region))
561    }
562
563    /// Returns a new manifest options.
564    fn manifest_options(
565        config: &MitoConfig,
566        options: &RegionOptions,
567        region_dir: &str,
568        object_store_manager: &ObjectStoreManagerRef,
569    ) -> Result<RegionManifestOptions> {
570        let object_store = get_object_store(&options.storage, object_store_manager)?;
571        Ok(RegionManifestOptions {
572            manifest_dir: new_manifest_dir(region_dir),
573            object_store,
574            // We don't allow users to set the compression algorithm as we use it as a file suffix.
575            // Currently, the manifest storage doesn't have good support for changing compression algorithms.
576            compress_type: manifest_compress_type(config.compress_manifest),
577            checkpoint_distance: config.manifest_checkpoint_distance,
578            remove_file_options: RemoveFileOptions {
579                keep_count: config.experimental_manifest_keep_removed_file_count,
580                keep_ttl: config.experimental_manifest_keep_removed_file_ttl,
581            },
582        })
583    }
584}
585
586/// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store.
587pub fn get_object_store(
588    name: &Option<String>,
589    object_store_manager: &ObjectStoreManagerRef,
590) -> Result<object_store::ObjectStore> {
591    if let Some(name) = name {
592        Ok(object_store_manager
593            .find(name)
594            .with_context(|| ObjectStoreNotFoundSnafu {
595                object_store: name.clone(),
596            })?
597            .clone())
598    } else {
599        Ok(object_store_manager.default_object_store().clone())
600    }
601}
602
603/// A loader for loading metadata from a region dir.
604pub struct RegionMetadataLoader {
605    config: Arc<MitoConfig>,
606    object_store_manager: ObjectStoreManagerRef,
607}
608
609impl RegionMetadataLoader {
610    /// Creates a new `RegionOpenerBuilder`.
611    pub fn new(config: Arc<MitoConfig>, object_store_manager: ObjectStoreManagerRef) -> Self {
612        Self {
613            config,
614            object_store_manager,
615        }
616    }
617
618    /// Loads the metadata of the region from the region dir.
619    pub async fn load(
620        &self,
621        region_dir: &str,
622        region_options: &RegionOptions,
623    ) -> Result<Option<RegionMetadataRef>> {
624        let manifest = self.load_manifest(region_dir, region_options).await?;
625        Ok(manifest.map(|m| m.metadata.clone()))
626    }
627
628    /// Loads the manifest of the region from the region dir.
629    pub async fn load_manifest(
630        &self,
631        region_dir: &str,
632        region_options: &RegionOptions,
633    ) -> Result<Option<Arc<RegionManifest>>> {
634        let region_manifest_options = RegionOpener::manifest_options(
635            &self.config,
636            region_options,
637            region_dir,
638            &self.object_store_manager,
639        )?;
640        let Some(manifest_manager) = RegionManifestManager::open(
641            region_manifest_options,
642            Arc::new(AtomicU64::new(0)),
643            Arc::new(AtomicU64::new(0)),
644        )
645        .await?
646        else {
647            return Ok(None);
648        };
649
650        let manifest = manifest_manager.manifest();
651        Ok(Some(manifest))
652    }
653}
654
655/// Checks whether the recovered region has the same schema as region to create.
656pub(crate) fn check_recovered_region(
657    recovered: &RegionMetadata,
658    region_id: RegionId,
659    column_metadatas: &[ColumnMetadata],
660    primary_key: &[ColumnId],
661) -> Result<()> {
662    if recovered.region_id != region_id {
663        error!(
664            "Recovered region {}, expect region {}",
665            recovered.region_id, region_id
666        );
667        return RegionCorruptedSnafu {
668            region_id,
669            reason: format!(
670                "recovered metadata has different region id {}",
671                recovered.region_id
672            ),
673        }
674        .fail();
675    }
676    if recovered.column_metadatas != column_metadatas {
677        error!(
678            "Unexpected schema in recovered region {}, recovered: {:?}, expect: {:?}",
679            recovered.region_id, recovered.column_metadatas, column_metadatas
680        );
681
682        return RegionCorruptedSnafu {
683            region_id,
684            reason: "recovered metadata has different schema",
685        }
686        .fail();
687    }
688    if recovered.primary_key != primary_key {
689        error!(
690            "Unexpected primary key in recovered region {}, recovered: {:?}, expect: {:?}",
691            recovered.region_id, recovered.primary_key, primary_key
692        );
693
694        return RegionCorruptedSnafu {
695            region_id,
696            reason: "recovered metadata has different primary key",
697        }
698        .fail();
699    }
700
701    Ok(())
702}
703
704/// Replays the mutations from WAL and inserts mutations to memtable of given region.
705pub(crate) async fn replay_memtable<F>(
706    provider: &Provider,
707    mut wal_entry_reader: Box<dyn WalEntryReader>,
708    region_id: RegionId,
709    flushed_entry_id: EntryId,
710    version_control: &VersionControlRef,
711    allow_stale_entries: bool,
712    on_region_opened: F,
713) -> Result<EntryId>
714where
715    F: FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> + Send,
716{
717    let now = Instant::now();
718    let mut rows_replayed = 0;
719    // Last entry id should start from flushed entry id since there might be no
720    // data in the WAL.
721    let mut last_entry_id = flushed_entry_id;
722    let replay_from_entry_id = flushed_entry_id + 1;
723
724    let mut wal_stream = wal_entry_reader.read(provider, replay_from_entry_id)?;
725    while let Some(res) = wal_stream.next().await {
726        let (entry_id, entry) = res?;
727        if entry_id <= flushed_entry_id {
728            warn!(
729                "Stale WAL entries read during replay, region id: {}, flushed entry id: {}, entry id read: {}",
730                region_id, flushed_entry_id, entry_id
731            );
732            ensure!(
733                allow_stale_entries,
734                StaleLogEntrySnafu {
735                    region_id,
736                    flushed_entry_id,
737                    unexpected_entry_id: entry_id,
738                }
739            );
740        }
741        last_entry_id = last_entry_id.max(entry_id);
742
743        let mut region_write_ctx = RegionWriteCtx::new(
744            region_id,
745            version_control,
746            provider.clone(),
747            // For WAL replay, we don't need to track the write bytes rate.
748            None,
749        );
750        for mutation in entry.mutations {
751            rows_replayed += mutation
752                .rows
753                .as_ref()
754                .map(|rows| rows.rows.len())
755                .unwrap_or(0);
756            region_write_ctx.push_mutation(
757                mutation.op_type,
758                mutation.rows,
759                mutation.write_hint,
760                OptionOutputTx::none(),
761                // We should respect the sequence in WAL during replay.
762                Some(mutation.sequence),
763            );
764        }
765
766        for bulk_entry in entry.bulk_entries {
767            let part = BulkPart::try_from(bulk_entry)?;
768            rows_replayed += part.num_rows();
769            // During replay, we should adopt the sequence from WAL.
770            let bulk_sequence_from_wal = part.sequence;
771            ensure!(
772                region_write_ctx.push_bulk(
773                    OptionOutputTx::none(),
774                    part,
775                    Some(bulk_sequence_from_wal)
776                ),
777                RegionCorruptedSnafu {
778                    region_id,
779                    reason: "unable to replay memtable with bulk entries",
780                }
781            );
782        }
783
784        // set next_entry_id and write to memtable.
785        region_write_ctx.set_next_entry_id(last_entry_id + 1);
786        region_write_ctx.write_memtable().await;
787        region_write_ctx.write_bulk().await;
788    }
789
790    // TODO(weny): We need to update `flushed_entry_id` in the region manifest
791    // to avoid reading potentially incomplete entries in the future.
792    (on_region_opened)(region_id, flushed_entry_id, provider).await?;
793
794    let series_count = version_control.current().series_count();
795    info!(
796        "Replay WAL for region: {}, provider: {:?}, rows recovered: {}, replay from entry id: {}, last entry id: {}, total timeseries replayed: {}, elapsed: {:?}",
797        region_id,
798        provider,
799        rows_replayed,
800        replay_from_entry_id,
801        last_entry_id,
802        series_count,
803        now.elapsed()
804    );
805    Ok(last_entry_id)
806}
807
808/// Returns the directory to the manifest files.
809pub(crate) fn new_manifest_dir(region_dir: &str) -> String {
810    join_dir(region_dir, "manifest")
811}