mito2/region/
opener.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Region opener.
16
17use std::any::TypeId;
18use std::collections::HashMap;
19use std::sync::atomic::{AtomicI64, AtomicU64};
20use std::sync::Arc;
21use std::time::Instant;
22
23use common_telemetry::{debug, error, info, warn};
24use common_wal::options::WalOptions;
25use futures::future::BoxFuture;
26use futures::StreamExt;
27use log_store::kafka::log_store::KafkaLogStore;
28use log_store::raft_engine::log_store::RaftEngineLogStore;
29use object_store::manager::ObjectStoreManagerRef;
30use object_store::util::{join_dir, normalize_dir};
31use snafu::{ensure, OptionExt, ResultExt};
32use store_api::logstore::provider::Provider;
33use store_api::logstore::LogStore;
34use store_api::metadata::{
35    ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
36};
37use store_api::region_engine::RegionRole;
38use store_api::region_request::PathType;
39use store_api::storage::{ColumnId, RegionId};
40
41use crate::access_layer::AccessLayer;
42use crate::cache::CacheManagerRef;
43use crate::config::MitoConfig;
44use crate::error;
45use crate::error::{
46    EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu,
47    Result, StaleLogEntrySnafu,
48};
49use crate::manifest::action::RegionManifest;
50use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions};
51use crate::manifest::storage::manifest_compress_type;
52use crate::memtable::bulk::part::BulkPart;
53use crate::memtable::time_partition::TimePartitions;
54use crate::memtable::MemtableBuilderProvider;
55use crate::region::options::RegionOptions;
56use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
57use crate::region::{
58    ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
59};
60use crate::region_write_ctx::RegionWriteCtx;
61use crate::request::OptionOutputTx;
62use crate::schedule::scheduler::SchedulerRef;
63use crate::sst::file_purger::create_local_file_purger;
64use crate::sst::file_ref::FileReferenceManagerRef;
65use crate::sst::index::intermediate::IntermediateManager;
66use crate::sst::index::puffin_manager::PuffinManagerFactory;
67use crate::sst::location::region_dir_from_table_dir;
68use crate::time_provider::TimeProviderRef;
69use crate::wal::entry_reader::WalEntryReader;
70use crate::wal::{EntryId, Wal};
71
72/// Builder to create a new [MitoRegion] or open an existing one.
73pub(crate) struct RegionOpener {
74    region_id: RegionId,
75    metadata_builder: Option<RegionMetadataBuilder>,
76    memtable_builder_provider: MemtableBuilderProvider,
77    object_store_manager: ObjectStoreManagerRef,
78    table_dir: String,
79    path_type: PathType,
80    purge_scheduler: SchedulerRef,
81    options: Option<RegionOptions>,
82    cache_manager: Option<CacheManagerRef>,
83    skip_wal_replay: bool,
84    puffin_manager_factory: PuffinManagerFactory,
85    intermediate_manager: IntermediateManager,
86    time_provider: TimeProviderRef,
87    stats: ManifestStats,
88    wal_entry_reader: Option<Box<dyn WalEntryReader>>,
89    replay_checkpoint: Option<u64>,
90    file_ref_manager: FileReferenceManagerRef,
91}
92
93impl RegionOpener {
94    /// Returns a new opener.
95    // TODO(LFC): Reduce the number of arguments.
96    #[allow(clippy::too_many_arguments)]
97    pub(crate) fn new(
98        region_id: RegionId,
99        table_dir: &str,
100        path_type: PathType,
101        memtable_builder_provider: MemtableBuilderProvider,
102        object_store_manager: ObjectStoreManagerRef,
103        purge_scheduler: SchedulerRef,
104        puffin_manager_factory: PuffinManagerFactory,
105        intermediate_manager: IntermediateManager,
106        time_provider: TimeProviderRef,
107        file_ref_manager: FileReferenceManagerRef,
108    ) -> RegionOpener {
109        RegionOpener {
110            region_id,
111            metadata_builder: None,
112            memtable_builder_provider,
113            object_store_manager,
114            table_dir: normalize_dir(table_dir),
115            path_type,
116            purge_scheduler,
117            options: None,
118            cache_manager: None,
119            skip_wal_replay: false,
120            puffin_manager_factory,
121            intermediate_manager,
122            time_provider,
123            stats: Default::default(),
124            wal_entry_reader: None,
125            replay_checkpoint: None,
126            file_ref_manager,
127        }
128    }
129
130    /// Sets metadata builder of the region to create.
131    pub(crate) fn metadata_builder(mut self, builder: RegionMetadataBuilder) -> Self {
132        self.metadata_builder = Some(builder);
133        self
134    }
135
136    /// Computes the region directory from table_dir and region_id.
137    fn region_dir(&self) -> String {
138        region_dir_from_table_dir(&self.table_dir, self.region_id, self.path_type)
139    }
140
141    /// Builds the region metadata.
142    ///
143    /// # Panics
144    /// - Panics if `options` is not set.
145    /// - Panics if `metadata_builder` is not set.
146    fn build_metadata(&mut self) -> Result<RegionMetadata> {
147        let options = self.options.as_ref().unwrap();
148        let mut metadata_builder = self.metadata_builder.take().unwrap();
149        metadata_builder.primary_key_encoding(options.primary_key_encoding());
150        metadata_builder.build().context(InvalidMetadataSnafu)
151    }
152
153    /// Parses and sets options for the region.
154    pub(crate) fn parse_options(self, options: HashMap<String, String>) -> Result<Self> {
155        self.options(RegionOptions::try_from(&options)?)
156    }
157
158    /// Sets the replay checkpoint for the region.
159    pub(crate) fn replay_checkpoint(mut self, replay_checkpoint: Option<u64>) -> Self {
160        self.replay_checkpoint = replay_checkpoint;
161        self
162    }
163
164    /// If a [WalEntryReader] is set, the [RegionOpener] will use [WalEntryReader] instead of
165    /// constructing a new one from scratch.
166    pub(crate) fn wal_entry_reader(
167        mut self,
168        wal_entry_reader: Option<Box<dyn WalEntryReader>>,
169    ) -> Self {
170        self.wal_entry_reader = wal_entry_reader;
171        self
172    }
173
174    /// Sets options for the region.
175    pub(crate) fn options(mut self, options: RegionOptions) -> Result<Self> {
176        options.validate()?;
177        self.options = Some(options);
178        Ok(self)
179    }
180
181    /// Sets the cache manager for the region.
182    pub(crate) fn cache(mut self, cache_manager: Option<CacheManagerRef>) -> Self {
183        self.cache_manager = cache_manager;
184        self
185    }
186
187    /// Sets the `skip_wal_replay`.
188    pub(crate) fn skip_wal_replay(mut self, skip: bool) -> Self {
189        self.skip_wal_replay = skip;
190        self
191    }
192
193    /// Writes region manifest and creates a new region if it does not exist.
194    /// Opens the region if it already exists.
195    ///
196    /// # Panics
197    /// - Panics if `metadata_builder` is not set.
198    /// - Panics if `options` is not set.
199    pub(crate) async fn create_or_open<S: LogStore>(
200        mut self,
201        config: &MitoConfig,
202        wal: &Wal<S>,
203    ) -> Result<MitoRegion> {
204        let region_id = self.region_id;
205        let region_dir = self.region_dir();
206        let metadata = self.build_metadata()?;
207        // Tries to open the region.
208        match self.maybe_open(config, wal).await {
209            Ok(Some(region)) => {
210                let recovered = region.metadata();
211                // Checks the schema of the region.
212                let expect = &metadata;
213                check_recovered_region(
214                    &recovered,
215                    expect.region_id,
216                    &expect.column_metadatas,
217                    &expect.primary_key,
218                )?;
219                // To keep consistency with Create behavior, set the opened Region to RegionRole::Leader.
220                region.set_role(RegionRole::Leader);
221
222                return Ok(region);
223            }
224            Ok(None) => {
225                debug!(
226                    "No data under directory {}, region_id: {}",
227                    region_dir, self.region_id
228                );
229            }
230            Err(e) => {
231                warn!(e;
232                    "Failed to open region {} before creating it, region_dir: {}",
233                    self.region_id, region_dir
234                );
235            }
236        }
237        // Safety: must be set before calling this method.
238        let options = self.options.take().unwrap();
239        let object_store = get_object_store(&options.storage, &self.object_store_manager)?;
240        let provider = self.provider::<S>(&options.wal_options)?;
241        let metadata = Arc::new(metadata);
242        // Create a manifest manager for this region and writes regions to the manifest file.
243        let region_manifest_options =
244            Self::manifest_options(config, &options, &region_dir, &self.object_store_manager)?;
245        // For remote WAL, we need to set flushed_entry_id to current topic's latest entry id.
246        let flushed_entry_id = provider.initial_flushed_entry_id::<S>(wal.store());
247        let manifest_manager = RegionManifestManager::new(
248            metadata.clone(),
249            flushed_entry_id,
250            region_manifest_options,
251            self.stats.total_manifest_size.clone(),
252            self.stats.manifest_version.clone(),
253        )
254        .await?;
255
256        let memtable_builder = self.memtable_builder_provider.builder_for_options(
257            options.memtable.as_ref(),
258            options.need_dedup(),
259            options.merge_mode(),
260        );
261        let part_duration = options.compaction.time_window();
262        // Initial memtable id is 0.
263        let mutable = Arc::new(TimePartitions::new(
264            metadata.clone(),
265            memtable_builder.clone(),
266            0,
267            part_duration,
268        ));
269
270        debug!("Create region {} with options: {:?}", region_id, options);
271
272        let version = VersionBuilder::new(metadata, mutable)
273            .options(options)
274            .build();
275        let version_control = Arc::new(VersionControl::new(version));
276        let access_layer = Arc::new(AccessLayer::new(
277            self.table_dir.clone(),
278            self.path_type,
279            object_store,
280            self.puffin_manager_factory,
281            self.intermediate_manager,
282        ));
283        let now = self.time_provider.current_time_millis();
284
285        Ok(MitoRegion {
286            region_id,
287            version_control,
288            access_layer: access_layer.clone(),
289            // Region is writable after it is created.
290            manifest_ctx: Arc::new(ManifestContext::new(
291                manifest_manager,
292                RegionRoleState::Leader(RegionLeaderState::Writable),
293            )),
294            file_purger: create_local_file_purger(
295                self.purge_scheduler,
296                access_layer,
297                self.cache_manager,
298                self.file_ref_manager.clone(),
299            ),
300            provider,
301            last_flush_millis: AtomicI64::new(now),
302            last_compaction_millis: AtomicI64::new(now),
303            time_provider: self.time_provider.clone(),
304            topic_latest_entry_id: AtomicU64::new(0),
305            memtable_builder,
306            written_bytes: Arc::new(AtomicU64::new(0)),
307            stats: self.stats,
308        })
309    }
310
311    /// Opens an existing region in read only mode.
312    ///
313    /// Returns error if the region doesn't exist.
314    pub(crate) async fn open<S: LogStore>(
315        mut self,
316        config: &MitoConfig,
317        wal: &Wal<S>,
318    ) -> Result<MitoRegion> {
319        let region_id = self.region_id;
320        let region_dir = self.region_dir();
321        let region = self
322            .maybe_open(config, wal)
323            .await?
324            .context(EmptyRegionDirSnafu {
325                region_id,
326                region_dir: &region_dir,
327            })?;
328
329        ensure!(
330            region.region_id == self.region_id,
331            RegionCorruptedSnafu {
332                region_id: self.region_id,
333                reason: format!(
334                    "recovered region has different region id {}",
335                    region.region_id
336                ),
337            }
338        );
339
340        Ok(region)
341    }
342
343    fn provider<S: LogStore>(&self, wal_options: &WalOptions) -> Result<Provider> {
344        match wal_options {
345            WalOptions::RaftEngine => {
346                ensure!(
347                    TypeId::of::<RaftEngineLogStore>() == TypeId::of::<S>(),
348                    error::IncompatibleWalProviderChangeSnafu {
349                        global: "`kafka`",
350                        region: "`raft_engine`",
351                    }
352                );
353                Ok(Provider::raft_engine_provider(self.region_id.as_u64()))
354            }
355            WalOptions::Kafka(options) => {
356                ensure!(
357                    TypeId::of::<KafkaLogStore>() == TypeId::of::<S>(),
358                    error::IncompatibleWalProviderChangeSnafu {
359                        global: "`raft_engine`",
360                        region: "`kafka`",
361                    }
362                );
363                Ok(Provider::kafka_provider(options.topic.to_string()))
364            }
365            WalOptions::Noop => Ok(Provider::noop_provider()),
366        }
367    }
368
369    /// Tries to open the region and returns `None` if the region directory is empty.
370    async fn maybe_open<S: LogStore>(
371        &mut self,
372        config: &MitoConfig,
373        wal: &Wal<S>,
374    ) -> Result<Option<MitoRegion>> {
375        let region_options = self.options.as_ref().unwrap().clone();
376
377        let region_manifest_options = Self::manifest_options(
378            config,
379            &region_options,
380            &self.region_dir(),
381            &self.object_store_manager,
382        )?;
383        let Some(manifest_manager) = RegionManifestManager::open(
384            region_manifest_options,
385            self.stats.total_manifest_size.clone(),
386            self.stats.manifest_version.clone(),
387        )
388        .await?
389        else {
390            return Ok(None);
391        };
392
393        let manifest = manifest_manager.manifest();
394        let metadata = manifest.metadata.clone();
395
396        let region_id = self.region_id;
397        let provider = self.provider::<S>(&region_options.wal_options)?;
398        let wal_entry_reader = self
399            .wal_entry_reader
400            .take()
401            .unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
402        let on_region_opened = wal.on_region_opened();
403        let object_store = get_object_store(&region_options.storage, &self.object_store_manager)?;
404
405        debug!(
406            "Open region {} at {} with options: {:?}",
407            region_id, self.table_dir, self.options
408        );
409
410        let access_layer = Arc::new(AccessLayer::new(
411            self.table_dir.clone(),
412            self.path_type,
413            object_store,
414            self.puffin_manager_factory.clone(),
415            self.intermediate_manager.clone(),
416        ));
417        let file_purger = create_local_file_purger(
418            self.purge_scheduler.clone(),
419            access_layer.clone(),
420            self.cache_manager.clone(),
421            self.file_ref_manager.clone(),
422        );
423        let memtable_builder = self.memtable_builder_provider.builder_for_options(
424            region_options.memtable.as_ref(),
425            region_options.need_dedup(),
426            region_options.merge_mode(),
427        );
428        // Use compaction time window in the manifest if region doesn't provide
429        // the time window option.
430        let part_duration = region_options
431            .compaction
432            .time_window()
433            .or(manifest.compaction_time_window);
434        // Initial memtable id is 0.
435        let mutable = Arc::new(TimePartitions::new(
436            metadata.clone(),
437            memtable_builder.clone(),
438            0,
439            part_duration,
440        ));
441        let version = VersionBuilder::new(metadata, mutable)
442            .add_files(file_purger.clone(), manifest.files.values().cloned())
443            .flushed_entry_id(manifest.flushed_entry_id)
444            .flushed_sequence(manifest.flushed_sequence)
445            .truncated_entry_id(manifest.truncated_entry_id)
446            .compaction_time_window(manifest.compaction_time_window)
447            .options(region_options)
448            .build();
449        let flushed_entry_id = version.flushed_entry_id;
450        let version_control = Arc::new(VersionControl::new(version));
451        let topic_latest_entry_id = if !self.skip_wal_replay {
452            let replay_from_entry_id = self
453                .replay_checkpoint
454                .unwrap_or_default()
455                .max(flushed_entry_id);
456            info!(
457                "Start replaying memtable at replay_from_entry_id: {} for region {}, manifest version: {}, flushed entry id: {}",
458                replay_from_entry_id,
459                region_id,
460                manifest.manifest_version,
461                flushed_entry_id
462            );
463            replay_memtable(
464                &provider,
465                wal_entry_reader,
466                region_id,
467                replay_from_entry_id,
468                &version_control,
469                config.allow_stale_entries,
470                on_region_opened,
471            )
472            .await?;
473            // For remote WAL, we need to set topic_latest_entry_id to current topic's latest entry id.
474            // Only set after the WAL replay is completed.
475            let topic_latest_entry_id = if provider.is_remote_wal()
476                && version_control.current().version.memtables.is_empty()
477            {
478                wal.store().latest_entry_id(&provider).unwrap_or(0)
479            } else {
480                0
481            };
482
483            topic_latest_entry_id
484        } else {
485            info!(
486                "Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}",
487                region_id, manifest.manifest_version, flushed_entry_id
488            );
489
490            0
491        };
492        let now = self.time_provider.current_time_millis();
493        let region = MitoRegion {
494            region_id: self.region_id,
495            version_control,
496            access_layer,
497            // Region is always opened in read only mode.
498            manifest_ctx: Arc::new(ManifestContext::new(
499                manifest_manager,
500                RegionRoleState::Follower,
501            )),
502            file_purger,
503            provider: provider.clone(),
504            last_flush_millis: AtomicI64::new(now),
505            last_compaction_millis: AtomicI64::new(now),
506            time_provider: self.time_provider.clone(),
507            topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id),
508            written_bytes: Arc::new(AtomicU64::new(0)),
509            memtable_builder,
510            stats: self.stats.clone(),
511        };
512        Ok(Some(region))
513    }
514
515    /// Returns a new manifest options.
516    fn manifest_options(
517        config: &MitoConfig,
518        options: &RegionOptions,
519        region_dir: &str,
520        object_store_manager: &ObjectStoreManagerRef,
521    ) -> Result<RegionManifestOptions> {
522        let object_store = get_object_store(&options.storage, object_store_manager)?;
523        Ok(RegionManifestOptions {
524            manifest_dir: new_manifest_dir(region_dir),
525            object_store,
526            // We don't allow users to set the compression algorithm as we use it as a file suffix.
527            // Currently, the manifest storage doesn't have good support for changing compression algorithms.
528            compress_type: manifest_compress_type(config.compress_manifest),
529            checkpoint_distance: config.manifest_checkpoint_distance,
530            remove_file_options: RemoveFileOptions {
531                keep_count: config.experimental_manifest_keep_removed_file_count,
532                keep_ttl: config.experimental_manifest_keep_removed_file_ttl,
533            },
534        })
535    }
536}
537
538/// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store.
539pub fn get_object_store(
540    name: &Option<String>,
541    object_store_manager: &ObjectStoreManagerRef,
542) -> Result<object_store::ObjectStore> {
543    if let Some(name) = name {
544        Ok(object_store_manager
545            .find(name)
546            .with_context(|| ObjectStoreNotFoundSnafu {
547                object_store: name.to_string(),
548            })?
549            .clone())
550    } else {
551        Ok(object_store_manager.default_object_store().clone())
552    }
553}
554
555/// A loader for loading metadata from a region dir.
556pub struct RegionMetadataLoader {
557    config: Arc<MitoConfig>,
558    object_store_manager: ObjectStoreManagerRef,
559}
560
561impl RegionMetadataLoader {
562    /// Creates a new `RegionOpenerBuilder`.
563    pub fn new(config: Arc<MitoConfig>, object_store_manager: ObjectStoreManagerRef) -> Self {
564        Self {
565            config,
566            object_store_manager,
567        }
568    }
569
570    /// Loads the metadata of the region from the region dir.
571    pub async fn load(
572        &self,
573        region_dir: &str,
574        region_options: &RegionOptions,
575    ) -> Result<Option<RegionMetadataRef>> {
576        let manifest = self.load_manifest(region_dir, region_options).await?;
577        Ok(manifest.map(|m| m.metadata.clone()))
578    }
579
580    /// Loads the manifest of the region from the region dir.
581    pub async fn load_manifest(
582        &self,
583        region_dir: &str,
584        region_options: &RegionOptions,
585    ) -> Result<Option<Arc<RegionManifest>>> {
586        let region_manifest_options = RegionOpener::manifest_options(
587            &self.config,
588            region_options,
589            region_dir,
590            &self.object_store_manager,
591        )?;
592        let Some(manifest_manager) = RegionManifestManager::open(
593            region_manifest_options,
594            Arc::new(AtomicU64::new(0)),
595            Arc::new(AtomicU64::new(0)),
596        )
597        .await?
598        else {
599            return Ok(None);
600        };
601
602        let manifest = manifest_manager.manifest();
603        Ok(Some(manifest))
604    }
605}
606
607/// Checks whether the recovered region has the same schema as region to create.
608pub(crate) fn check_recovered_region(
609    recovered: &RegionMetadata,
610    region_id: RegionId,
611    column_metadatas: &[ColumnMetadata],
612    primary_key: &[ColumnId],
613) -> Result<()> {
614    if recovered.region_id != region_id {
615        error!(
616            "Recovered region {}, expect region {}",
617            recovered.region_id, region_id
618        );
619        return RegionCorruptedSnafu {
620            region_id,
621            reason: format!(
622                "recovered metadata has different region id {}",
623                recovered.region_id
624            ),
625        }
626        .fail();
627    }
628    if recovered.column_metadatas != column_metadatas {
629        error!(
630            "Unexpected schema in recovered region {}, recovered: {:?}, expect: {:?}",
631            recovered.region_id, recovered.column_metadatas, column_metadatas
632        );
633
634        return RegionCorruptedSnafu {
635            region_id,
636            reason: "recovered metadata has different schema",
637        }
638        .fail();
639    }
640    if recovered.primary_key != primary_key {
641        error!(
642            "Unexpected primary key in recovered region {}, recovered: {:?}, expect: {:?}",
643            recovered.region_id, recovered.primary_key, primary_key
644        );
645
646        return RegionCorruptedSnafu {
647            region_id,
648            reason: "recovered metadata has different primary key",
649        }
650        .fail();
651    }
652
653    Ok(())
654}
655
656/// Replays the mutations from WAL and inserts mutations to memtable of given region.
657pub(crate) async fn replay_memtable<F>(
658    provider: &Provider,
659    mut wal_entry_reader: Box<dyn WalEntryReader>,
660    region_id: RegionId,
661    flushed_entry_id: EntryId,
662    version_control: &VersionControlRef,
663    allow_stale_entries: bool,
664    on_region_opened: F,
665) -> Result<EntryId>
666where
667    F: FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> + Send,
668{
669    let now = Instant::now();
670    let mut rows_replayed = 0;
671    // Last entry id should start from flushed entry id since there might be no
672    // data in the WAL.
673    let mut last_entry_id = flushed_entry_id;
674    let replay_from_entry_id = flushed_entry_id + 1;
675
676    let mut wal_stream = wal_entry_reader.read(provider, replay_from_entry_id)?;
677    while let Some(res) = wal_stream.next().await {
678        let (entry_id, entry) = res?;
679        if entry_id <= flushed_entry_id {
680            warn!("Stale WAL entries read during replay, region id: {}, flushed entry id: {}, entry id read: {}", region_id, flushed_entry_id, entry_id);
681            ensure!(
682                allow_stale_entries,
683                StaleLogEntrySnafu {
684                    region_id,
685                    flushed_entry_id,
686                    unexpected_entry_id: entry_id,
687                }
688            );
689        }
690        last_entry_id = last_entry_id.max(entry_id);
691
692        let mut region_write_ctx = RegionWriteCtx::new(
693            region_id,
694            version_control,
695            provider.clone(),
696            // For WAL replay, we don't need to track the write bytes rate.
697            None,
698        );
699        for mutation in entry.mutations {
700            rows_replayed += mutation
701                .rows
702                .as_ref()
703                .map(|rows| rows.rows.len())
704                .unwrap_or(0);
705            region_write_ctx.push_mutation(
706                mutation.op_type,
707                mutation.rows,
708                mutation.write_hint,
709                OptionOutputTx::none(),
710            );
711        }
712
713        for bulk_entry in entry.bulk_entries {
714            let part = BulkPart::try_from(bulk_entry)?;
715            rows_replayed += part.num_rows();
716            ensure!(
717                region_write_ctx.push_bulk(OptionOutputTx::none(), part),
718                RegionCorruptedSnafu {
719                    region_id,
720                    reason: "unable to replay memtable with bulk entries",
721                }
722            );
723        }
724
725        // set next_entry_id and write to memtable.
726        region_write_ctx.set_next_entry_id(last_entry_id + 1);
727        region_write_ctx.write_memtable().await;
728        region_write_ctx.write_bulk().await;
729    }
730
731    // TODO(weny): We need to update `flushed_entry_id` in the region manifest
732    // to avoid reading potentially incomplete entries in the future.
733    (on_region_opened)(region_id, flushed_entry_id, provider).await?;
734
735    let series_count = version_control.current().series_count();
736    info!(
737        "Replay WAL for region: {}, provider: {:?}, rows recovered: {}, replay from entry id: {}, last entry id: {}, total timeseries replayed: {}, elapsed: {:?}",
738        region_id, provider, rows_replayed, replay_from_entry_id, last_entry_id, series_count, now.elapsed()
739    );
740    Ok(last_entry_id)
741}
742
743/// Returns the directory to the manifest files.
744pub(crate) fn new_manifest_dir(region_dir: &str) -> String {
745    join_dir(region_dir, "manifest")
746}