mito2/manifest/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use common_datasource::compression::CompressionType;
19use common_telemetry::{debug, info};
20use futures::TryStreamExt;
21use object_store::ObjectStore;
22use snafu::{OptionExt, ResultExt, ensure};
23use store_api::metadata::RegionMetadataRef;
24use store_api::{MAX_VERSION, MIN_VERSION, ManifestVersion};
25
26use crate::cache::manifest_cache::ManifestCache;
27use crate::config::MitoConfig;
28use crate::error::{
29    self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result,
30};
31use crate::manifest::action::{
32    RegionChange, RegionCheckpoint, RegionEdit, RegionManifest, RegionManifestBuilder,
33    RegionMetaAction, RegionMetaActionList, RemovedFile,
34};
35use crate::manifest::checkpointer::Checkpointer;
36use crate::manifest::storage::{
37    ManifestObjectStore, file_version, is_checkpoint_file, is_delta_file, manifest_compress_type,
38    manifest_dir,
39};
40use crate::metrics::MANIFEST_OP_ELAPSED;
41use crate::region::{ManifestStats, RegionLeaderState, RegionRoleState};
42use crate::sst::FormatType;
43
44/// Options for [RegionManifestManager].
45#[derive(Debug, Clone)]
46pub struct RegionManifestOptions {
47    /// Directory to store manifest.
48    pub manifest_dir: String,
49    pub object_store: ObjectStore,
50    pub compress_type: CompressionType,
51    /// Interval of version ([ManifestVersion](store_api::manifest::ManifestVersion)) between two checkpoints.
52    /// Set to 0 to disable checkpoint.
53    pub checkpoint_distance: u64,
54    pub remove_file_options: RemoveFileOptions,
55    /// Optional cache for manifest files.
56    pub manifest_cache: Option<ManifestCache>,
57}
58
59impl RegionManifestOptions {
60    /// Creates a new [RegionManifestOptions] with the given region directory, object store, and configuration.
61    pub fn new(config: &MitoConfig, region_dir: &str, object_store: &ObjectStore) -> Self {
62        RegionManifestOptions {
63            manifest_dir: manifest_dir(region_dir),
64            object_store: object_store.clone(),
65            // We don't allow users to set the compression algorithm as we use it as a file suffix.
66            // Currently, the manifest storage doesn't have good support for changing compression algorithms.
67            compress_type: manifest_compress_type(config.compress_manifest),
68            checkpoint_distance: config.manifest_checkpoint_distance,
69            remove_file_options: RemoveFileOptions {
70                enable_gc: config.gc.enable,
71            },
72            manifest_cache: None,
73        }
74    }
75}
76
77/// Options for updating `removed_files` field in [RegionManifest].
78#[derive(Debug, Clone)]
79#[cfg_attr(any(test, feature = "test"), derive(Default))]
80pub struct RemoveFileOptions {
81    /// Whether GC is enabled. If not, the removed files should always be empty when persisting manifest.
82    pub enable_gc: bool,
83}
84
85// rewrite note:
86// trait Checkpoint -> struct RegionCheckpoint
87// trait MetaAction -> struct RegionMetaActionList
88// trait MetaActionIterator -> struct MetaActionIteratorImpl
89
90#[cfg_attr(doc, aquamarine::aquamarine)]
91/// Manage region's manifest. Provide APIs to access (create/modify/recover) region's persisted
92/// metadata.
93///
94/// ```mermaid
95/// classDiagram
96/// class RegionManifestManager {
97///     -ManifestObjectStore store
98///     -RegionManifestOptions options
99///     -RegionManifest manifest
100///     +new() RegionManifestManager
101///     +open() Option~RegionManifestManager~
102///     +stop()
103///     +update(RegionMetaActionList action_list) ManifestVersion
104///     +manifest() RegionManifest
105/// }
106/// class ManifestObjectStore {
107///     -ObjectStore object_store
108/// }
109/// class RegionChange {
110///     -RegionMetadataRef metadata
111/// }
112/// class RegionEdit {
113///     -VersionNumber region_version
114///     -Vec~FileMeta~ files_to_add
115///     -Vec~FileMeta~ files_to_remove
116///     -SequenceNumber flushed_sequence
117/// }
118/// class RegionRemove {
119///     -RegionId region_id
120/// }
121/// RegionManifestManager o-- ManifestObjectStore
122/// RegionManifestManager o-- RegionManifest
123/// RegionManifestManager o-- RegionManifestOptions
124/// RegionManifestManager -- RegionMetaActionList
125/// RegionManifestManager -- RegionCheckpoint
126/// ManifestObjectStore o-- ObjectStore
127/// RegionMetaActionList o-- RegionMetaAction
128/// RegionMetaAction o-- ProtocolAction
129/// RegionMetaAction o-- RegionChange
130/// RegionMetaAction o-- RegionEdit
131/// RegionMetaAction o-- RegionRemove
132/// RegionChange o-- RegionMetadata
133/// RegionEdit o-- FileMeta
134///
135/// class RegionManifest {
136///     -RegionMetadataRef metadata
137///     -HashMap&lt;FileId, FileMeta&gt; files
138///     -ManifestVersion manifest_version
139/// }
140/// class RegionMetadata
141/// class FileMeta
142/// RegionManifest o-- RegionMetadata
143/// RegionManifest o-- FileMeta
144///
145/// class RegionCheckpoint {
146///     -ManifestVersion last_version
147///     -Option~RegionManifest~ checkpoint
148/// }
149/// RegionCheckpoint o-- RegionManifest
150/// ```
151#[derive(Debug)]
152pub struct RegionManifestManager {
153    store: ManifestObjectStore,
154    last_version: Arc<AtomicU64>,
155    checkpointer: Checkpointer,
156    manifest: Arc<RegionManifest>,
157    // Staging manifest is used to store the manifest of the staging region before it becomes available.
158    // It is initially inherited from the previous manifest(i.e., `self.manifest`).
159    // When the staging manifest becomes available, it will be used to construct the new manifest.
160    staging_manifest: Option<Arc<RegionManifest>>,
161    stats: ManifestStats,
162    stopped: bool,
163}
164
165impl RegionManifestManager {
166    /// Constructs a region's manifest and persist it.
167    pub async fn new(
168        metadata: RegionMetadataRef,
169        flushed_entry_id: u64,
170        options: RegionManifestOptions,
171        sst_format: FormatType,
172        stats: &ManifestStats,
173    ) -> Result<Self> {
174        // construct storage
175        let mut store = ManifestObjectStore::new(
176            &options.manifest_dir,
177            options.object_store.clone(),
178            options.compress_type,
179            stats.total_manifest_size.clone(),
180            options.manifest_cache.clone(),
181        );
182        let manifest_version = stats.manifest_version.clone();
183
184        info!(
185            "Creating region manifest in {} with metadata {:?}, flushed_entry_id: {}",
186            options.manifest_dir, metadata, flushed_entry_id
187        );
188
189        let version = MIN_VERSION;
190        let mut manifest_builder = RegionManifestBuilder::default();
191        // set the initial metadata.
192        manifest_builder.apply_change(
193            version,
194            RegionChange {
195                metadata: metadata.clone(),
196                sst_format,
197                append_mode: None,
198            },
199        );
200        let manifest = manifest_builder.try_build()?;
201        let region_id = metadata.region_id;
202
203        debug!(
204            "Build region manifest in {}, manifest: {:?}",
205            options.manifest_dir, manifest
206        );
207
208        let mut actions = vec![RegionMetaAction::Change(RegionChange {
209            metadata,
210            sst_format,
211            append_mode: None,
212        })];
213        if flushed_entry_id > 0 {
214            actions.push(RegionMetaAction::Edit(RegionEdit {
215                files_to_add: vec![],
216                files_to_remove: vec![],
217                timestamp_ms: None,
218                compaction_time_window: None,
219                flushed_entry_id: Some(flushed_entry_id),
220                flushed_sequence: None,
221                committed_sequence: None,
222            }));
223        }
224
225        // Persist region change.
226        let action_list = RegionMetaActionList::new(actions);
227
228        // New region is not in staging mode.
229        // TODO(ruihang): add staging mode support if needed.
230        store.save(version, &action_list.encode()?, false).await?;
231
232        let checkpointer = Checkpointer::new(region_id, options, store.clone(), MIN_VERSION);
233        manifest_version.store(version, Ordering::Relaxed);
234        manifest
235            .removed_files
236            .update_file_removed_cnt_to_stats(stats);
237        Ok(Self {
238            store,
239            last_version: manifest_version,
240            checkpointer,
241            manifest: Arc::new(manifest),
242            staging_manifest: None,
243            stats: stats.clone(),
244            stopped: false,
245        })
246    }
247
248    /// Opens an existing manifest.
249    ///
250    /// Returns `Ok(None)` if no such manifest.
251    pub async fn open(
252        options: RegionManifestOptions,
253        stats: &ManifestStats,
254    ) -> Result<Option<Self>> {
255        let _t = MANIFEST_OP_ELAPSED
256            .with_label_values(&["open"])
257            .start_timer();
258
259        // construct storage
260        let mut store = ManifestObjectStore::new(
261            &options.manifest_dir,
262            options.object_store.clone(),
263            options.compress_type,
264            stats.total_manifest_size.clone(),
265            options.manifest_cache.clone(),
266        );
267        let manifest_version = stats.manifest_version.clone();
268
269        // recover from storage
270        // construct manifest builder
271        // calculate the manifest size from the latest checkpoint
272        let mut version = MIN_VERSION;
273        let checkpoint = Self::last_checkpoint(&mut store).await?;
274        let last_checkpoint_version = checkpoint
275            .as_ref()
276            .map(|(checkpoint, _)| checkpoint.last_version)
277            .unwrap_or(MIN_VERSION);
278        let mut manifest_builder = if let Some((checkpoint, _)) = checkpoint {
279            info!(
280                "Recover region manifest {} from checkpoint version {}",
281                options.manifest_dir, checkpoint.last_version
282            );
283            version = version.max(checkpoint.last_version + 1);
284            RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint)
285        } else {
286            info!(
287                "Checkpoint not found in {}, build manifest from scratch",
288                options.manifest_dir
289            );
290            RegionManifestBuilder::default()
291        };
292
293        // apply actions from storage
294        let manifests = store.fetch_manifests(version, MAX_VERSION).await?;
295
296        for (manifest_version, raw_action_list) in manifests {
297            let action_list = RegionMetaActionList::decode(&raw_action_list)?;
298            // set manifest size after last checkpoint
299            store.set_delta_file_size(manifest_version, raw_action_list.len() as u64);
300            for action in action_list.actions {
301                match action {
302                    RegionMetaAction::Change(action) => {
303                        manifest_builder.apply_change(manifest_version, action);
304                    }
305                    RegionMetaAction::PartitionExprChange(action) => {
306                        manifest_builder.apply_partition_expr_change(manifest_version, action);
307                    }
308                    RegionMetaAction::Edit(action) => {
309                        manifest_builder.apply_edit(manifest_version, action);
310                    }
311                    RegionMetaAction::Remove(_) => {
312                        debug!(
313                            "Unhandled action in {}, action: {:?}",
314                            options.manifest_dir, action
315                        );
316                    }
317                    RegionMetaAction::Truncate(action) => {
318                        manifest_builder.apply_truncate(manifest_version, action);
319                    }
320                }
321            }
322        }
323
324        // set the initial metadata if necessary
325        if !manifest_builder.contains_metadata() {
326            debug!("No region manifest in {}", options.manifest_dir);
327            return Ok(None);
328        }
329
330        let manifest = manifest_builder.try_build()?;
331        debug!(
332            "Recovered region manifest from {}, manifest: {:?}",
333            options.manifest_dir, manifest
334        );
335        let version = manifest.manifest_version;
336
337        let checkpointer = Checkpointer::new(
338            manifest.metadata.region_id,
339            options,
340            store.clone(),
341            last_checkpoint_version,
342        );
343        manifest_version.store(version, Ordering::Relaxed);
344        manifest
345            .removed_files
346            .update_file_removed_cnt_to_stats(stats);
347        Ok(Some(Self {
348            store,
349            last_version: manifest_version,
350            checkpointer,
351            manifest: Arc::new(manifest),
352            // TODO(weny): open the staging manifest if exists.
353            staging_manifest: None,
354            stats: stats.clone(),
355            stopped: false,
356        }))
357    }
358
359    /// Stops the manager.
360    pub async fn stop(&mut self) {
361        self.stopped = true;
362    }
363
364    /// Installs the manifest changes from the current version to the target version (inclusive).
365    ///
366    /// Returns installed version.
367    /// **Note**: This function is not guaranteed to install the target version strictly.
368    /// The installed version may be greater than the target version.
369    pub async fn install_manifest_to(
370        &mut self,
371        target_version: ManifestVersion,
372    ) -> Result<ManifestVersion> {
373        let _t = MANIFEST_OP_ELAPSED
374            .with_label_values(&["install_manifest_to"])
375            .start_timer();
376
377        let last_version = self.last_version();
378        // Case 1: If the target version is less than the current version, return the current version.
379        if last_version >= target_version {
380            debug!(
381                "Target version {} is less than or equal to the current version {}, region: {}, skip install",
382                target_version, last_version, self.manifest.metadata.region_id
383            );
384            return Ok(last_version);
385        }
386
387        ensure!(
388            !self.stopped,
389            RegionStoppedSnafu {
390                region_id: self.manifest.metadata.region_id,
391            }
392        );
393
394        let region_id = self.manifest.metadata.region_id;
395        // Fetches manifests from the last version strictly.
396        let mut manifests = self
397            .store
398            // Invariant: last_version < target_version.
399            .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
400            .await?;
401
402        // Case 2: No manifests in range: [current_version+1, target_version+1)
403        //
404        // |---------Has been deleted------------|     [Checkpoint Version]...[Latest Version]
405        //                                                                    [Leader region]
406        // [Current Version]......[Target Version]
407        // [Follower region]
408        if manifests.is_empty() {
409            info!(
410                "Manifests are not strict from {}, region: {}, tries to install the last checkpoint",
411                last_version, self.manifest.metadata.region_id
412            );
413            let last_version = self.install_last_checkpoint().await?;
414            // Case 2.1: If the installed checkpoint version is greater than or equal to the target version, return the last version.
415            if last_version >= target_version {
416                return Ok(last_version);
417            }
418
419            // Fetches manifests from the installed version strictly.
420            manifests = self
421                .store
422                // Invariant: last_version < target_version.
423                .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
424                .await?;
425        }
426
427        if manifests.is_empty() {
428            return NoManifestsSnafu {
429                region_id: self.manifest.metadata.region_id,
430                start_version: last_version + 1,
431                end_version: target_version + 1,
432                last_version,
433            }
434            .fail();
435        }
436
437        debug_assert_eq!(manifests.first().unwrap().0, last_version + 1);
438        let mut manifest_builder =
439            RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
440
441        for (manifest_version, raw_action_list) in manifests {
442            self.store
443                .set_delta_file_size(manifest_version, raw_action_list.len() as u64);
444            let action_list = RegionMetaActionList::decode(&raw_action_list)?;
445            for action in action_list.actions {
446                match action {
447                    RegionMetaAction::Change(action) => {
448                        manifest_builder.apply_change(manifest_version, action);
449                    }
450                    RegionMetaAction::PartitionExprChange(action) => {
451                        manifest_builder.apply_partition_expr_change(manifest_version, action);
452                    }
453                    RegionMetaAction::Edit(action) => {
454                        manifest_builder.apply_edit(manifest_version, action);
455                    }
456                    RegionMetaAction::Remove(_) => {
457                        debug!(
458                            "Unhandled action for region {}, action: {:?}",
459                            self.manifest.metadata.region_id, action
460                        );
461                    }
462                    RegionMetaAction::Truncate(action) => {
463                        manifest_builder.apply_truncate(manifest_version, action);
464                    }
465                }
466            }
467        }
468
469        let new_manifest = manifest_builder.try_build()?;
470        ensure!(
471            new_manifest.manifest_version >= target_version,
472            InstallManifestToSnafu {
473                region_id: self.manifest.metadata.region_id,
474                target_version,
475                available_version: new_manifest.manifest_version,
476                last_version,
477            }
478        );
479
480        let version = self.last_version();
481        new_manifest
482            .removed_files
483            .update_file_removed_cnt_to_stats(&self.stats);
484        self.manifest = Arc::new(new_manifest);
485        let last_version = self.set_version(self.manifest.manifest_version);
486        info!(
487            "Install manifest changes from {} to {}, region: {}",
488            version, last_version, self.manifest.metadata.region_id
489        );
490
491        Ok(last_version)
492    }
493
494    /// Installs the last checkpoint.
495    pub(crate) async fn install_last_checkpoint(&mut self) -> Result<ManifestVersion> {
496        let last_version = self.last_version();
497        let Some((checkpoint, checkpoint_size)) = Self::last_checkpoint(&mut self.store).await?
498        else {
499            return NoCheckpointSnafu {
500                region_id: self.manifest.metadata.region_id,
501                last_version,
502            }
503            .fail();
504        };
505        self.store.reset_manifest_size();
506        self.store
507            .set_checkpoint_file_size(checkpoint.last_version, checkpoint_size);
508        let builder = RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint);
509        let manifest = builder.try_build()?;
510        let last_version = self.set_version(manifest.manifest_version);
511        manifest
512            .removed_files
513            .update_file_removed_cnt_to_stats(&self.stats);
514        self.manifest = Arc::new(manifest);
515        info!(
516            "Installed region manifest from checkpoint: {}, region: {}",
517            checkpoint.last_version, self.manifest.metadata.region_id
518        );
519
520        Ok(last_version)
521    }
522
523    /// Updates the manifest. Returns the current manifest version number.
524    pub async fn update(
525        &mut self,
526        action_list: RegionMetaActionList,
527        is_staging: bool,
528    ) -> Result<ManifestVersion> {
529        let _t = MANIFEST_OP_ELAPSED
530            .with_label_values(&["update"])
531            .start_timer();
532
533        ensure!(
534            !self.stopped,
535            RegionStoppedSnafu {
536                region_id: self.manifest.metadata.region_id,
537            }
538        );
539
540        let version = self.increase_version();
541        self.store
542            .save(version, &action_list.encode()?, is_staging)
543            .await?;
544
545        // For a staging region, the manifest is initially inherited from the previous manifest(i.e., `self.manifest`).
546        // When the staging manifest becomes available, it will be used to construct the new manifest.
547        let mut manifest_builder =
548            if is_staging && let Some(staging_manifest) = self.staging_manifest.as_ref() {
549                RegionManifestBuilder::with_checkpoint(Some(staging_manifest.as_ref().clone()))
550            } else {
551                RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()))
552            };
553
554        for action in action_list.actions {
555            match action {
556                RegionMetaAction::Change(action) => {
557                    manifest_builder.apply_change(version, action);
558                }
559                RegionMetaAction::PartitionExprChange(action) => {
560                    manifest_builder.apply_partition_expr_change(version, action);
561                }
562                RegionMetaAction::Edit(action) => {
563                    manifest_builder.apply_edit(version, action);
564                }
565                RegionMetaAction::Remove(_) => {
566                    debug!(
567                        "Unhandled action for region {}, action: {:?}",
568                        self.manifest.metadata.region_id, action
569                    );
570                }
571                RegionMetaAction::Truncate(action) => {
572                    manifest_builder.apply_truncate(version, action);
573                }
574            }
575        }
576
577        if is_staging {
578            let new_manifest = manifest_builder.try_build()?;
579            self.staging_manifest = Some(Arc::new(new_manifest));
580
581            info!(
582                "Skipping checkpoint for region {} in staging mode, manifest version: {}",
583                self.manifest.metadata.region_id, self.manifest.manifest_version
584            );
585        } else {
586            let new_manifest = manifest_builder.try_build()?;
587            new_manifest
588                .removed_files
589                .update_file_removed_cnt_to_stats(&self.stats);
590            let updated_manifest = self
591                .checkpointer
592                .update_manifest_removed_files(new_manifest)?;
593            self.manifest = Arc::new(updated_manifest);
594            self.checkpointer
595                .maybe_do_checkpoint(self.manifest.as_ref());
596        }
597
598        Ok(version)
599    }
600
601    /// Clear deleted files from manifest's `removed_files` field without update version. Notice if datanode exit before checkpoint then new manifest by open region may still contain these deleted files, which is acceptable for gc process.
602    pub fn clear_deleted_files(&mut self, deleted_files: Vec<RemovedFile>) {
603        let mut manifest = (*self.manifest()).clone();
604        manifest.removed_files.clear_deleted_files(deleted_files);
605        self.set_manifest(Arc::new(manifest));
606    }
607
608    pub(crate) fn set_manifest(&mut self, manifest: Arc<RegionManifest>) {
609        self.manifest = manifest;
610    }
611
612    /// Retrieves the current [RegionManifest].
613    pub fn manifest(&self) -> Arc<RegionManifest> {
614        self.manifest.clone()
615    }
616
617    /// Retrieves the current [RegionManifest].
618    pub fn staging_manifest(&self) -> Option<Arc<RegionManifest>> {
619        self.staging_manifest.clone()
620    }
621
622    /// Returns total manifest size.
623    pub fn manifest_usage(&self) -> u64 {
624        self.store.total_manifest_size()
625    }
626
627    /// Returns true if a newer version manifest file is found.
628    ///
629    /// It is typically used in read-only regions to catch up with manifest.
630    /// It doesn't lock the manifest directory in the object store so the result
631    /// may be inaccurate if there are concurrent writes.
632    pub async fn has_update(&self) -> Result<bool> {
633        let last_version = self.last_version();
634
635        let streamer =
636            self.store
637                .manifest_lister(false)
638                .await?
639                .context(error::EmptyManifestDirSnafu {
640                    manifest_dir: self.store.manifest_dir(),
641                })?;
642
643        let need_update = streamer
644            .try_any(|entry| async move {
645                let file_name = entry.name();
646                if is_delta_file(file_name) || is_checkpoint_file(file_name) {
647                    let version = file_version(file_name);
648                    if version > last_version {
649                        return true;
650                    }
651                }
652                false
653            })
654            .await
655            .context(error::OpenDalSnafu)?;
656
657        Ok(need_update)
658    }
659
660    /// Increases last version and returns the increased version.
661    fn increase_version(&mut self) -> ManifestVersion {
662        let previous = self.last_version.fetch_add(1, Ordering::Relaxed);
663        previous + 1
664    }
665
666    /// Sets the last version.
667    fn set_version(&mut self, version: ManifestVersion) -> ManifestVersion {
668        self.last_version.store(version, Ordering::Relaxed);
669        version
670    }
671
672    fn last_version(&self) -> ManifestVersion {
673        self.last_version.load(Ordering::Relaxed)
674    }
675
676    /// Fetches the last [RegionCheckpoint] from storage.
677    ///
678    /// If the checkpoint is not found, returns `None`.
679    /// Otherwise, returns the checkpoint and the size of the checkpoint.
680    pub(crate) async fn last_checkpoint(
681        store: &mut ManifestObjectStore,
682    ) -> Result<Option<(RegionCheckpoint, u64)>> {
683        let last_checkpoint = store.load_last_checkpoint().await?;
684
685        if let Some((_, bytes)) = last_checkpoint {
686            let checkpoint = RegionCheckpoint::decode(&bytes)?;
687            Ok(Some((checkpoint, bytes.len() as u64)))
688        } else {
689            Ok(None)
690        }
691    }
692
693    pub fn store(&self) -> ManifestObjectStore {
694        self.store.clone()
695    }
696
697    #[cfg(test)]
698    pub(crate) fn checkpointer(&self) -> &Checkpointer {
699        &self.checkpointer
700    }
701
702    /// Merge all staged manifest actions into a single action list ready for submission.
703    /// This collects all staging manifests, applies them sequentially, and returns the merged actions.
704    pub(crate) async fn merge_staged_actions(
705        &mut self,
706        region_state: RegionRoleState,
707    ) -> Result<Option<RegionMetaActionList>> {
708        // Only merge if we're in staging mode
709        if region_state != RegionRoleState::Leader(RegionLeaderState::Staging) {
710            return Ok(None);
711        }
712
713        // Fetch all staging manifests
714        let staging_manifests = self.store.fetch_staging_manifests().await?;
715
716        if staging_manifests.is_empty() {
717            info!(
718                "No staging manifests to merge for region {}",
719                self.manifest.metadata.region_id
720            );
721            return Ok(None);
722        }
723
724        info!(
725            "Merging {} staging manifests for region {}",
726            staging_manifests.len(),
727            self.manifest.metadata.region_id
728        );
729
730        // Start with current manifest state as the base
731        let mut merged_actions = Vec::new();
732        let mut latest_version = self.last_version();
733
734        // Apply all staging actions in order
735        for (manifest_version, raw_action_list) in staging_manifests {
736            let action_list = RegionMetaActionList::decode(&raw_action_list)?;
737
738            for action in action_list.actions {
739                merged_actions.push(action);
740            }
741
742            latest_version = latest_version.max(manifest_version);
743        }
744
745        if merged_actions.is_empty() {
746            return Ok(None);
747        }
748
749        info!(
750            "Successfully merged {} actions from staging manifests for region {}, latest version: {}",
751            merged_actions.len(),
752            self.manifest.metadata.region_id,
753            latest_version
754        );
755
756        Ok(Some(RegionMetaActionList::new(merged_actions)))
757    }
758
759    /// Unsets the staging manifest.
760    pub(crate) fn unset_staging_manifest(&mut self) {
761        self.staging_manifest = None;
762    }
763
764    /// Clear all staging manifests.
765    pub(crate) async fn clear_staging_manifest_and_dir(&mut self) -> Result<()> {
766        self.staging_manifest = None;
767        self.store.clear_staging_manifests().await?;
768        info!(
769            "Cleared all staging manifests for region {}",
770            self.manifest.metadata.region_id
771        );
772        Ok(())
773    }
774}
775
776#[cfg(test)]
777impl RegionManifestManager {
778    fn validate_manifest(&self, expect: &RegionMetadataRef, last_version: ManifestVersion) {
779        let manifest = self.manifest();
780        assert_eq!(manifest.metadata, *expect);
781        assert_eq!(self.manifest.manifest_version, self.last_version());
782        assert_eq!(last_version, self.last_version());
783    }
784}
785
786#[cfg(test)]
787mod test {
788    use std::time::Duration;
789
790    use api::v1::SemanticType;
791    use common_datasource::compression::CompressionType;
792    use common_test_util::temp_dir::create_temp_dir;
793    use datatypes::prelude::ConcreteDataType;
794    use datatypes::schema::ColumnSchema;
795    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
796
797    use super::*;
798    use crate::manifest::action::{RegionChange, RegionEdit};
799    use crate::manifest::tests::utils::basic_region_metadata;
800    use crate::test_util::TestEnv;
801
802    #[tokio::test]
803    async fn create_manifest_manager() {
804        let metadata = Arc::new(basic_region_metadata());
805        let env = TestEnv::new().await;
806        let manager = env
807            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
808            .await
809            .unwrap()
810            .unwrap();
811
812        manager.validate_manifest(&metadata, 0);
813    }
814
815    #[tokio::test]
816    async fn open_manifest_manager() {
817        let env = TestEnv::new().await;
818        // Try to opens an empty manifest.
819        assert!(
820            env.create_manifest_manager(CompressionType::Uncompressed, 10, None)
821                .await
822                .unwrap()
823                .is_none()
824        );
825
826        // Creates a manifest.
827        let metadata = Arc::new(basic_region_metadata());
828        let mut manager = env
829            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
830            .await
831            .unwrap()
832            .unwrap();
833        // Stops it.
834        manager.stop().await;
835
836        // Open it.
837        let manager = env
838            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
839            .await
840            .unwrap()
841            .unwrap();
842
843        manager.validate_manifest(&metadata, 0);
844    }
845
846    #[tokio::test]
847    async fn manifest_with_partition_expr_roundtrip() {
848        let env = TestEnv::new().await;
849        let expr_json =
850            r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
851        let mut metadata = basic_region_metadata();
852        metadata.set_partition_expr(Some(expr_json.to_string()));
853        let metadata = Arc::new(metadata);
854        let mut manager = env
855            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
856            .await
857            .unwrap()
858            .unwrap();
859
860        // persisted manifest should contain the same partition_expr JSON
861        let manifest = manager.manifest();
862        assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
863
864        manager.stop().await;
865
866        // Reopen and check again
867        let manager = env
868            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
869            .await
870            .unwrap()
871            .unwrap();
872        let manifest = manager.manifest();
873        assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
874    }
875
876    #[tokio::test]
877    async fn region_change_add_column() {
878        let metadata = Arc::new(basic_region_metadata());
879        let env = TestEnv::new().await;
880        let mut manager = env
881            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
882            .await
883            .unwrap()
884            .unwrap();
885
886        let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
887        new_metadata_builder.push_column_metadata(ColumnMetadata {
888            column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
889            semantic_type: SemanticType::Field,
890            column_id: 252,
891        });
892        let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
893
894        let action_list =
895            RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
896                metadata: new_metadata.clone(),
897                sst_format: FormatType::PrimaryKey,
898                append_mode: None,
899            }));
900
901        let current_version = manager.update(action_list, false).await.unwrap();
902        assert_eq!(current_version, 1);
903        manager.validate_manifest(&new_metadata, 1);
904
905        // Reopen the manager.
906        manager.stop().await;
907        let manager = env
908            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
909            .await
910            .unwrap()
911            .unwrap();
912        manager.validate_manifest(&new_metadata, 1);
913    }
914
915    /// Just for test, refer to wal_dir_usage in src/store-api/src/logstore.rs.
916    async fn manifest_dir_usage(path: &str) -> u64 {
917        let mut size = 0;
918        let mut read_dir = tokio::fs::read_dir(path).await.unwrap();
919        while let Ok(dir_entry) = read_dir.next_entry().await {
920            let Some(entry) = dir_entry else {
921                break;
922            };
923            if entry.file_type().await.unwrap().is_file() {
924                let file_name = entry.file_name().into_string().unwrap();
925                if file_name.contains(".checkpoint") || file_name.contains(".json") {
926                    let file_size = entry.metadata().await.unwrap().len() as usize;
927                    debug!("File: {file_name:?}, size: {file_size}");
928                    size += file_size;
929                }
930            }
931        }
932        size as u64
933    }
934
935    #[tokio::test]
936    async fn test_manifest_size() {
937        let metadata = Arc::new(basic_region_metadata());
938        let data_home = create_temp_dir("");
939        let data_home_path = data_home.path().to_str().unwrap().to_string();
940        let env = TestEnv::with_data_home(data_home).await;
941
942        let manifest_dir = format!("{}/manifest", data_home_path);
943
944        let mut manager = env
945            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
946            .await
947            .unwrap()
948            .unwrap();
949
950        let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
951        new_metadata_builder.push_column_metadata(ColumnMetadata {
952            column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
953            semantic_type: SemanticType::Field,
954            column_id: 252,
955        });
956        let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
957
958        let action_list =
959            RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
960                metadata: new_metadata.clone(),
961                sst_format: FormatType::PrimaryKey,
962                append_mode: None,
963            }));
964
965        let current_version = manager.update(action_list, false).await.unwrap();
966        assert_eq!(current_version, 1);
967        manager.validate_manifest(&new_metadata, 1);
968
969        // get manifest size
970        let manifest_size = manager.manifest_usage();
971        assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
972
973        // update 10 times nop_action to trigger checkpoint
974        for _ in 0..10 {
975            manager
976                .update(
977                    RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
978                        files_to_add: vec![],
979                        files_to_remove: vec![],
980                        timestamp_ms: None,
981                        compaction_time_window: None,
982                        flushed_entry_id: None,
983                        flushed_sequence: None,
984                        committed_sequence: None,
985                    })]),
986                    false,
987                )
988                .await
989                .unwrap();
990        }
991
992        while manager.checkpointer.is_doing_checkpoint() {
993            tokio::time::sleep(Duration::from_millis(10)).await;
994        }
995
996        // check manifest size again
997        let manifest_size = manager.manifest_usage();
998        assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
999
1000        // Reopen the manager,
1001        // we just calculate the size from the latest checkpoint file
1002        manager.stop().await;
1003        let manager = env
1004            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
1005            .await
1006            .unwrap()
1007            .unwrap();
1008        manager.validate_manifest(&new_metadata, 11);
1009
1010        // get manifest size again
1011        let manifest_size = manager.manifest_usage();
1012        assert_eq!(manifest_size, 1397);
1013    }
1014}