mito2/region/
opener.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Region opener.
16
17use std::any::TypeId;
18use std::collections::HashMap;
19use std::sync::Arc;
20use std::sync::atomic::{AtomicI64, AtomicU64};
21use std::time::Instant;
22
23use common_telemetry::{debug, error, info, warn};
24use common_wal::options::WalOptions;
25use futures::StreamExt;
26use futures::future::BoxFuture;
27use log_store::kafka::log_store::KafkaLogStore;
28use log_store::raft_engine::log_store::RaftEngineLogStore;
29use object_store::manager::ObjectStoreManagerRef;
30use object_store::util::{join_dir, normalize_dir};
31use snafu::{OptionExt, ResultExt, ensure};
32use store_api::logstore::LogStore;
33use store_api::logstore::provider::Provider;
34use store_api::metadata::{
35    ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
36};
37use store_api::region_engine::RegionRole;
38use store_api::region_request::PathType;
39use store_api::storage::{ColumnId, RegionId};
40
41use crate::access_layer::AccessLayer;
42use crate::cache::CacheManagerRef;
43use crate::config::MitoConfig;
44use crate::error;
45use crate::error::{
46    EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu,
47    Result, StaleLogEntrySnafu,
48};
49use crate::manifest::action::RegionManifest;
50use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions};
51use crate::manifest::storage::manifest_compress_type;
52use crate::memtable::MemtableBuilderProvider;
53use crate::memtable::bulk::part::BulkPart;
54use crate::memtable::time_partition::TimePartitions;
55use crate::region::options::RegionOptions;
56use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
57use crate::region::{
58    ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
59};
60use crate::region_write_ctx::RegionWriteCtx;
61use crate::request::OptionOutputTx;
62use crate::schedule::scheduler::SchedulerRef;
63use crate::sst::FormatType;
64use crate::sst::file_purger::create_local_file_purger;
65use crate::sst::file_ref::FileReferenceManagerRef;
66use crate::sst::index::intermediate::IntermediateManager;
67use crate::sst::index::puffin_manager::PuffinManagerFactory;
68use crate::sst::location::region_dir_from_table_dir;
69use crate::time_provider::TimeProviderRef;
70use crate::wal::entry_reader::WalEntryReader;
71use crate::wal::{EntryId, Wal};
72
73/// A fetcher to retrieve partition expr for a region.
74///
75/// Compatibility: older regions didn't persist `partition_expr` in engine metadata,
76/// while newer ones do. On open, we backfill it via this fetcher and persist it
77/// to the manifest so future opens don't need refetching.
78#[async_trait::async_trait]
79pub trait PartitionExprFetcher {
80    async fn fetch_expr(&self, region_id: RegionId) -> Option<String>;
81}
82
83pub type PartitionExprFetcherRef = Arc<dyn PartitionExprFetcher + Send + Sync>;
84
85/// Builder to create a new [MitoRegion] or open an existing one.
86pub(crate) struct RegionOpener {
87    region_id: RegionId,
88    metadata_builder: Option<RegionMetadataBuilder>,
89    memtable_builder_provider: MemtableBuilderProvider,
90    object_store_manager: ObjectStoreManagerRef,
91    table_dir: String,
92    path_type: PathType,
93    purge_scheduler: SchedulerRef,
94    options: Option<RegionOptions>,
95    cache_manager: Option<CacheManagerRef>,
96    skip_wal_replay: bool,
97    puffin_manager_factory: PuffinManagerFactory,
98    intermediate_manager: IntermediateManager,
99    time_provider: TimeProviderRef,
100    stats: ManifestStats,
101    wal_entry_reader: Option<Box<dyn WalEntryReader>>,
102    replay_checkpoint: Option<u64>,
103    file_ref_manager: FileReferenceManagerRef,
104    partition_expr_fetcher: PartitionExprFetcherRef,
105}
106
107impl RegionOpener {
108    /// Returns a new opener.
109    // TODO(LFC): Reduce the number of arguments.
110    #[allow(clippy::too_many_arguments)]
111    pub(crate) fn new(
112        region_id: RegionId,
113        table_dir: &str,
114        path_type: PathType,
115        memtable_builder_provider: MemtableBuilderProvider,
116        object_store_manager: ObjectStoreManagerRef,
117        purge_scheduler: SchedulerRef,
118        puffin_manager_factory: PuffinManagerFactory,
119        intermediate_manager: IntermediateManager,
120        time_provider: TimeProviderRef,
121        file_ref_manager: FileReferenceManagerRef,
122        partition_expr_fetcher: PartitionExprFetcherRef,
123    ) -> RegionOpener {
124        RegionOpener {
125            region_id,
126            metadata_builder: None,
127            memtable_builder_provider,
128            object_store_manager,
129            table_dir: normalize_dir(table_dir),
130            path_type,
131            purge_scheduler,
132            options: None,
133            cache_manager: None,
134            skip_wal_replay: false,
135            puffin_manager_factory,
136            intermediate_manager,
137            time_provider,
138            stats: Default::default(),
139            wal_entry_reader: None,
140            replay_checkpoint: None,
141            file_ref_manager,
142            partition_expr_fetcher,
143        }
144    }
145
146    /// Sets metadata builder of the region to create.
147    pub(crate) fn metadata_builder(mut self, builder: RegionMetadataBuilder) -> Self {
148        self.metadata_builder = Some(builder);
149        self
150    }
151
152    /// Computes the region directory from table_dir and region_id.
153    fn region_dir(&self) -> String {
154        region_dir_from_table_dir(&self.table_dir, self.region_id, self.path_type)
155    }
156
157    /// Builds the region metadata.
158    ///
159    /// # Panics
160    /// - Panics if `options` is not set.
161    /// - Panics if `metadata_builder` is not set.
162    fn build_metadata(&mut self) -> Result<RegionMetadata> {
163        let options = self.options.as_ref().unwrap();
164        let mut metadata_builder = self.metadata_builder.take().unwrap();
165        metadata_builder.primary_key_encoding(options.primary_key_encoding());
166        metadata_builder.build().context(InvalidMetadataSnafu)
167    }
168
169    /// Parses and sets options for the region.
170    pub(crate) fn parse_options(self, options: HashMap<String, String>) -> Result<Self> {
171        self.options(RegionOptions::try_from(&options)?)
172    }
173
174    /// Sets the replay checkpoint for the region.
175    pub(crate) fn replay_checkpoint(mut self, replay_checkpoint: Option<u64>) -> Self {
176        self.replay_checkpoint = replay_checkpoint;
177        self
178    }
179
180    /// If a [WalEntryReader] is set, the [RegionOpener] will use [WalEntryReader] instead of
181    /// constructing a new one from scratch.
182    pub(crate) fn wal_entry_reader(
183        mut self,
184        wal_entry_reader: Option<Box<dyn WalEntryReader>>,
185    ) -> Self {
186        self.wal_entry_reader = wal_entry_reader;
187        self
188    }
189
190    /// Sets options for the region.
191    pub(crate) fn options(mut self, options: RegionOptions) -> Result<Self> {
192        options.validate()?;
193        self.options = Some(options);
194        Ok(self)
195    }
196
197    /// Sets the cache manager for the region.
198    pub(crate) fn cache(mut self, cache_manager: Option<CacheManagerRef>) -> Self {
199        self.cache_manager = cache_manager;
200        self
201    }
202
203    /// Sets the `skip_wal_replay`.
204    pub(crate) fn skip_wal_replay(mut self, skip: bool) -> Self {
205        self.skip_wal_replay = skip;
206        self
207    }
208
209    /// Writes region manifest and creates a new region if it does not exist.
210    /// Opens the region if it already exists.
211    ///
212    /// # Panics
213    /// - Panics if `metadata_builder` is not set.
214    /// - Panics if `options` is not set.
215    pub(crate) async fn create_or_open<S: LogStore>(
216        mut self,
217        config: &MitoConfig,
218        wal: &Wal<S>,
219    ) -> Result<MitoRegion> {
220        let region_id = self.region_id;
221        let region_dir = self.region_dir();
222        let metadata = self.build_metadata()?;
223        // Tries to open the region.
224        match self.maybe_open(config, wal).await {
225            Ok(Some(region)) => {
226                let recovered = region.metadata();
227                // Checks the schema of the region.
228                let expect = &metadata;
229                check_recovered_region(
230                    &recovered,
231                    expect.region_id,
232                    &expect.column_metadatas,
233                    &expect.primary_key,
234                )?;
235                // To keep consistency with Create behavior, set the opened Region to RegionRole::Leader.
236                region.set_role(RegionRole::Leader);
237
238                return Ok(region);
239            }
240            Ok(None) => {
241                debug!(
242                    "No data under directory {}, region_id: {}",
243                    region_dir, self.region_id
244                );
245            }
246            Err(e) => {
247                warn!(e;
248                    "Failed to open region {} before creating it, region_dir: {}",
249                    self.region_id, region_dir
250                );
251            }
252        }
253        // Safety: must be set before calling this method.
254        let options = self.options.take().unwrap();
255        let object_store = get_object_store(&options.storage, &self.object_store_manager)?;
256        let provider = self.provider::<S>(&options.wal_options)?;
257        let metadata = Arc::new(metadata);
258        // Set the sst_format based on options or flat_format flag
259        let sst_format = if let Some(format) = options.sst_format {
260            format
261        } else if config.default_experimental_flat_format {
262            FormatType::Flat
263        } else {
264            // Default to PrimaryKeyParquet for newly created regions
265            FormatType::PrimaryKey
266        };
267        // Create a manifest manager for this region and writes regions to the manifest file.
268        let region_manifest_options =
269            Self::manifest_options(config, &options, &region_dir, &self.object_store_manager)?;
270        // For remote WAL, we need to set flushed_entry_id to current topic's latest entry id.
271        let flushed_entry_id = provider.initial_flushed_entry_id::<S>(wal.store());
272        let manifest_manager = RegionManifestManager::new(
273            metadata.clone(),
274            flushed_entry_id,
275            region_manifest_options,
276            self.stats.total_manifest_size.clone(),
277            self.stats.manifest_version.clone(),
278            sst_format,
279        )
280        .await?;
281
282        let memtable_builder = self.memtable_builder_provider.builder_for_options(&options);
283        let part_duration = options.compaction.time_window();
284        // Initial memtable id is 0.
285        let mutable = Arc::new(TimePartitions::new(
286            metadata.clone(),
287            memtable_builder.clone(),
288            0,
289            part_duration,
290        ));
291
292        debug!("Create region {} with options: {:?}", region_id, options);
293
294        let version = VersionBuilder::new(metadata, mutable)
295            .options(options)
296            .build();
297        let version_control = Arc::new(VersionControl::new(version));
298        let access_layer = Arc::new(AccessLayer::new(
299            self.table_dir.clone(),
300            self.path_type,
301            object_store,
302            self.puffin_manager_factory,
303            self.intermediate_manager,
304        ));
305        let now = self.time_provider.current_time_millis();
306
307        Ok(MitoRegion {
308            region_id,
309            version_control,
310            access_layer: access_layer.clone(),
311            // Region is writable after it is created.
312            manifest_ctx: Arc::new(ManifestContext::new(
313                manifest_manager,
314                RegionRoleState::Leader(RegionLeaderState::Writable),
315            )),
316            file_purger: create_local_file_purger(
317                self.purge_scheduler,
318                access_layer,
319                self.cache_manager,
320                self.file_ref_manager.clone(),
321            ),
322            provider,
323            last_flush_millis: AtomicI64::new(now),
324            last_compaction_millis: AtomicI64::new(now),
325            time_provider: self.time_provider.clone(),
326            topic_latest_entry_id: AtomicU64::new(0),
327            memtable_builder,
328            written_bytes: Arc::new(AtomicU64::new(0)),
329            sst_format,
330            stats: self.stats,
331        })
332    }
333
334    /// Opens an existing region in read only mode.
335    ///
336    /// Returns error if the region doesn't exist.
337    pub(crate) async fn open<S: LogStore>(
338        mut self,
339        config: &MitoConfig,
340        wal: &Wal<S>,
341    ) -> Result<MitoRegion> {
342        let region_id = self.region_id;
343        let region_dir = self.region_dir();
344        let region = self
345            .maybe_open(config, wal)
346            .await?
347            .context(EmptyRegionDirSnafu {
348                region_id,
349                region_dir: &region_dir,
350            })?;
351
352        ensure!(
353            region.region_id == self.region_id,
354            RegionCorruptedSnafu {
355                region_id: self.region_id,
356                reason: format!(
357                    "recovered region has different region id {}",
358                    region.region_id
359                ),
360            }
361        );
362
363        Ok(region)
364    }
365
366    fn provider<S: LogStore>(&self, wal_options: &WalOptions) -> Result<Provider> {
367        match wal_options {
368            WalOptions::RaftEngine => {
369                ensure!(
370                    TypeId::of::<RaftEngineLogStore>() == TypeId::of::<S>(),
371                    error::IncompatibleWalProviderChangeSnafu {
372                        global: "`kafka`",
373                        region: "`raft_engine`",
374                    }
375                );
376                Ok(Provider::raft_engine_provider(self.region_id.as_u64()))
377            }
378            WalOptions::Kafka(options) => {
379                ensure!(
380                    TypeId::of::<KafkaLogStore>() == TypeId::of::<S>(),
381                    error::IncompatibleWalProviderChangeSnafu {
382                        global: "`raft_engine`",
383                        region: "`kafka`",
384                    }
385                );
386                Ok(Provider::kafka_provider(options.topic.clone()))
387            }
388            WalOptions::Noop => Ok(Provider::noop_provider()),
389        }
390    }
391
392    /// Tries to open the region and returns `None` if the region directory is empty.
393    async fn maybe_open<S: LogStore>(
394        &mut self,
395        config: &MitoConfig,
396        wal: &Wal<S>,
397    ) -> Result<Option<MitoRegion>> {
398        let region_options = self.options.as_ref().unwrap().clone();
399
400        let region_manifest_options = Self::manifest_options(
401            config,
402            &region_options,
403            &self.region_dir(),
404            &self.object_store_manager,
405        )?;
406        let Some(manifest_manager) = RegionManifestManager::open(
407            region_manifest_options,
408            self.stats.total_manifest_size.clone(),
409            self.stats.manifest_version.clone(),
410        )
411        .await?
412        else {
413            return Ok(None);
414        };
415
416        // Backfill `partition_expr` if missing. Use the backfilled metadata in-memory during this open.
417        let manifest = manifest_manager.manifest();
418        let metadata = if manifest.metadata.partition_expr.is_none()
419            && let Some(expr_json) = self.partition_expr_fetcher.fetch_expr(self.region_id).await
420        {
421            let metadata = manifest.metadata.as_ref().clone();
422            let mut builder = RegionMetadataBuilder::from_existing(metadata);
423            builder.partition_expr_json(Some(expr_json));
424            Arc::new(builder.build().context(InvalidMetadataSnafu)?)
425        } else {
426            manifest.metadata.clone()
427        };
428
429        let region_id = self.region_id;
430        let provider = self.provider::<S>(&region_options.wal_options)?;
431        let wal_entry_reader = self
432            .wal_entry_reader
433            .take()
434            .unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
435        let on_region_opened = wal.on_region_opened();
436        let object_store = get_object_store(&region_options.storage, &self.object_store_manager)?;
437
438        debug!(
439            "Open region {} at {} with options: {:?}",
440            region_id, self.table_dir, self.options
441        );
442
443        let access_layer = Arc::new(AccessLayer::new(
444            self.table_dir.clone(),
445            self.path_type,
446            object_store,
447            self.puffin_manager_factory.clone(),
448            self.intermediate_manager.clone(),
449        ));
450        let file_purger = create_local_file_purger(
451            self.purge_scheduler.clone(),
452            access_layer.clone(),
453            self.cache_manager.clone(),
454            self.file_ref_manager.clone(),
455        );
456        let memtable_builder = self
457            .memtable_builder_provider
458            .builder_for_options(&region_options);
459        // Use compaction time window in the manifest if region doesn't provide
460        // the time window option.
461        let part_duration = region_options
462            .compaction
463            .time_window()
464            .or(manifest.compaction_time_window);
465        // Initial memtable id is 0.
466        let mutable = Arc::new(TimePartitions::new(
467            metadata.clone(),
468            memtable_builder.clone(),
469            0,
470            part_duration,
471        ));
472        let version = VersionBuilder::new(metadata, mutable)
473            .add_files(file_purger.clone(), manifest.files.values().cloned())
474            .flushed_entry_id(manifest.flushed_entry_id)
475            .flushed_sequence(manifest.flushed_sequence)
476            .truncated_entry_id(manifest.truncated_entry_id)
477            .compaction_time_window(manifest.compaction_time_window)
478            .options(region_options)
479            .build();
480        let flushed_entry_id = version.flushed_entry_id;
481        let version_control = Arc::new(VersionControl::new(version));
482
483        let topic_latest_entry_id = if !self.skip_wal_replay {
484            let replay_from_entry_id = self
485                .replay_checkpoint
486                .unwrap_or_default()
487                .max(flushed_entry_id);
488            info!(
489                "Start replaying memtable at replay_from_entry_id: {} for region {}, manifest version: {}, flushed entry id: {}",
490                replay_from_entry_id, region_id, manifest.manifest_version, flushed_entry_id
491            );
492            replay_memtable(
493                &provider,
494                wal_entry_reader,
495                region_id,
496                replay_from_entry_id,
497                &version_control,
498                config.allow_stale_entries,
499                on_region_opened,
500            )
501            .await?;
502            // For remote WAL, we need to set topic_latest_entry_id to current topic's latest entry id.
503            // Only set after the WAL replay is completed.
504
505            if provider.is_remote_wal() && version_control.current().version.memtables.is_empty() {
506                wal.store().latest_entry_id(&provider).unwrap_or(0)
507            } else {
508                0
509            }
510        } else {
511            info!(
512                "Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}",
513                region_id, manifest.manifest_version, flushed_entry_id
514            );
515
516            0
517        };
518
519        if let Some(committed_in_manifest) = manifest.committed_sequence {
520            let committed_after_replay = version_control.committed_sequence();
521            if committed_in_manifest > committed_after_replay {
522                info!(
523                    "Overriding committed sequence, region: {}, flushed_sequence: {}, committed_sequence: {} -> {}",
524                    self.region_id,
525                    version_control.current().version.flushed_sequence,
526                    version_control.committed_sequence(),
527                    committed_in_manifest
528                );
529                version_control.set_committed_sequence(committed_in_manifest);
530            }
531        }
532
533        let now = self.time_provider.current_time_millis();
534        // Read sst_format from manifest
535        let sst_format = manifest.sst_format;
536
537        let region = MitoRegion {
538            region_id: self.region_id,
539            version_control,
540            access_layer,
541            // Region is always opened in read only mode.
542            manifest_ctx: Arc::new(ManifestContext::new(
543                manifest_manager,
544                RegionRoleState::Follower,
545            )),
546            file_purger,
547            provider: provider.clone(),
548            last_flush_millis: AtomicI64::new(now),
549            last_compaction_millis: AtomicI64::new(now),
550            time_provider: self.time_provider.clone(),
551            topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id),
552            written_bytes: Arc::new(AtomicU64::new(0)),
553            memtable_builder,
554            sst_format,
555            stats: self.stats.clone(),
556        };
557        Ok(Some(region))
558    }
559
560    /// Returns a new manifest options.
561    fn manifest_options(
562        config: &MitoConfig,
563        options: &RegionOptions,
564        region_dir: &str,
565        object_store_manager: &ObjectStoreManagerRef,
566    ) -> Result<RegionManifestOptions> {
567        let object_store = get_object_store(&options.storage, object_store_manager)?;
568        Ok(RegionManifestOptions {
569            manifest_dir: new_manifest_dir(region_dir),
570            object_store,
571            // We don't allow users to set the compression algorithm as we use it as a file suffix.
572            // Currently, the manifest storage doesn't have good support for changing compression algorithms.
573            compress_type: manifest_compress_type(config.compress_manifest),
574            checkpoint_distance: config.manifest_checkpoint_distance,
575            remove_file_options: RemoveFileOptions {
576                keep_count: config.experimental_manifest_keep_removed_file_count,
577                keep_ttl: config.experimental_manifest_keep_removed_file_ttl,
578            },
579        })
580    }
581}
582
583/// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store.
584pub fn get_object_store(
585    name: &Option<String>,
586    object_store_manager: &ObjectStoreManagerRef,
587) -> Result<object_store::ObjectStore> {
588    if let Some(name) = name {
589        Ok(object_store_manager
590            .find(name)
591            .with_context(|| ObjectStoreNotFoundSnafu {
592                object_store: name.clone(),
593            })?
594            .clone())
595    } else {
596        Ok(object_store_manager.default_object_store().clone())
597    }
598}
599
600/// A loader for loading metadata from a region dir.
601pub struct RegionMetadataLoader {
602    config: Arc<MitoConfig>,
603    object_store_manager: ObjectStoreManagerRef,
604}
605
606impl RegionMetadataLoader {
607    /// Creates a new `RegionOpenerBuilder`.
608    pub fn new(config: Arc<MitoConfig>, object_store_manager: ObjectStoreManagerRef) -> Self {
609        Self {
610            config,
611            object_store_manager,
612        }
613    }
614
615    /// Loads the metadata of the region from the region dir.
616    pub async fn load(
617        &self,
618        region_dir: &str,
619        region_options: &RegionOptions,
620    ) -> Result<Option<RegionMetadataRef>> {
621        let manifest = self.load_manifest(region_dir, region_options).await?;
622        Ok(manifest.map(|m| m.metadata.clone()))
623    }
624
625    /// Loads the manifest of the region from the region dir.
626    pub async fn load_manifest(
627        &self,
628        region_dir: &str,
629        region_options: &RegionOptions,
630    ) -> Result<Option<Arc<RegionManifest>>> {
631        let region_manifest_options = RegionOpener::manifest_options(
632            &self.config,
633            region_options,
634            region_dir,
635            &self.object_store_manager,
636        )?;
637        let Some(manifest_manager) = RegionManifestManager::open(
638            region_manifest_options,
639            Arc::new(AtomicU64::new(0)),
640            Arc::new(AtomicU64::new(0)),
641        )
642        .await?
643        else {
644            return Ok(None);
645        };
646
647        let manifest = manifest_manager.manifest();
648        Ok(Some(manifest))
649    }
650}
651
652/// Checks whether the recovered region has the same schema as region to create.
653pub(crate) fn check_recovered_region(
654    recovered: &RegionMetadata,
655    region_id: RegionId,
656    column_metadatas: &[ColumnMetadata],
657    primary_key: &[ColumnId],
658) -> Result<()> {
659    if recovered.region_id != region_id {
660        error!(
661            "Recovered region {}, expect region {}",
662            recovered.region_id, region_id
663        );
664        return RegionCorruptedSnafu {
665            region_id,
666            reason: format!(
667                "recovered metadata has different region id {}",
668                recovered.region_id
669            ),
670        }
671        .fail();
672    }
673    if recovered.column_metadatas != column_metadatas {
674        error!(
675            "Unexpected schema in recovered region {}, recovered: {:?}, expect: {:?}",
676            recovered.region_id, recovered.column_metadatas, column_metadatas
677        );
678
679        return RegionCorruptedSnafu {
680            region_id,
681            reason: "recovered metadata has different schema",
682        }
683        .fail();
684    }
685    if recovered.primary_key != primary_key {
686        error!(
687            "Unexpected primary key in recovered region {}, recovered: {:?}, expect: {:?}",
688            recovered.region_id, recovered.primary_key, primary_key
689        );
690
691        return RegionCorruptedSnafu {
692            region_id,
693            reason: "recovered metadata has different primary key",
694        }
695        .fail();
696    }
697
698    Ok(())
699}
700
701/// Replays the mutations from WAL and inserts mutations to memtable of given region.
702pub(crate) async fn replay_memtable<F>(
703    provider: &Provider,
704    mut wal_entry_reader: Box<dyn WalEntryReader>,
705    region_id: RegionId,
706    flushed_entry_id: EntryId,
707    version_control: &VersionControlRef,
708    allow_stale_entries: bool,
709    on_region_opened: F,
710) -> Result<EntryId>
711where
712    F: FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> + Send,
713{
714    let now = Instant::now();
715    let mut rows_replayed = 0;
716    // Last entry id should start from flushed entry id since there might be no
717    // data in the WAL.
718    let mut last_entry_id = flushed_entry_id;
719    let replay_from_entry_id = flushed_entry_id + 1;
720
721    let mut wal_stream = wal_entry_reader.read(provider, replay_from_entry_id)?;
722    while let Some(res) = wal_stream.next().await {
723        let (entry_id, entry) = res?;
724        if entry_id <= flushed_entry_id {
725            warn!(
726                "Stale WAL entries read during replay, region id: {}, flushed entry id: {}, entry id read: {}",
727                region_id, flushed_entry_id, entry_id
728            );
729            ensure!(
730                allow_stale_entries,
731                StaleLogEntrySnafu {
732                    region_id,
733                    flushed_entry_id,
734                    unexpected_entry_id: entry_id,
735                }
736            );
737        }
738        last_entry_id = last_entry_id.max(entry_id);
739
740        let mut region_write_ctx = RegionWriteCtx::new(
741            region_id,
742            version_control,
743            provider.clone(),
744            // For WAL replay, we don't need to track the write bytes rate.
745            None,
746        );
747        for mutation in entry.mutations {
748            rows_replayed += mutation
749                .rows
750                .as_ref()
751                .map(|rows| rows.rows.len())
752                .unwrap_or(0);
753            region_write_ctx.push_mutation(
754                mutation.op_type,
755                mutation.rows,
756                mutation.write_hint,
757                OptionOutputTx::none(),
758                // We should respect the sequence in WAL during replay.
759                Some(mutation.sequence),
760            );
761        }
762
763        for bulk_entry in entry.bulk_entries {
764            let part = BulkPart::try_from(bulk_entry)?;
765            rows_replayed += part.num_rows();
766            // During replay, we should adopt the sequence from WAL.
767            let bulk_sequence_from_wal = part.sequence;
768            ensure!(
769                region_write_ctx.push_bulk(
770                    OptionOutputTx::none(),
771                    part,
772                    Some(bulk_sequence_from_wal)
773                ),
774                RegionCorruptedSnafu {
775                    region_id,
776                    reason: "unable to replay memtable with bulk entries",
777                }
778            );
779        }
780
781        // set next_entry_id and write to memtable.
782        region_write_ctx.set_next_entry_id(last_entry_id + 1);
783        region_write_ctx.write_memtable().await;
784        region_write_ctx.write_bulk().await;
785    }
786
787    // TODO(weny): We need to update `flushed_entry_id` in the region manifest
788    // to avoid reading potentially incomplete entries in the future.
789    (on_region_opened)(region_id, flushed_entry_id, provider).await?;
790
791    let series_count = version_control.current().series_count();
792    info!(
793        "Replay WAL for region: {}, provider: {:?}, rows recovered: {}, replay from entry id: {}, last entry id: {}, total timeseries replayed: {}, elapsed: {:?}",
794        region_id,
795        provider,
796        rows_replayed,
797        replay_from_entry_id,
798        last_entry_id,
799        series_count,
800        now.elapsed()
801    );
802    Ok(last_entry_id)
803}
804
805/// Returns the directory to the manifest files.
806pub(crate) fn new_manifest_dir(region_dir: &str) -> String {
807    join_dir(region_dir, "manifest")
808}