mito2/manifest/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use common_datasource::compression::CompressionType;
19use common_telemetry::{debug, info};
20use futures::TryStreamExt;
21use object_store::ObjectStore;
22use snafu::{OptionExt, ResultExt, ensure};
23use store_api::metadata::RegionMetadataRef;
24use store_api::storage::FileId;
25use store_api::{MAX_VERSION, MIN_VERSION, ManifestVersion};
26
27use crate::config::MitoConfig;
28use crate::error::{
29    self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result,
30};
31use crate::manifest::action::{
32    RegionChange, RegionCheckpoint, RegionEdit, RegionManifest, RegionManifestBuilder,
33    RegionMetaAction, RegionMetaActionList,
34};
35use crate::manifest::checkpointer::Checkpointer;
36use crate::manifest::storage::{
37    ManifestObjectStore, file_version, is_checkpoint_file, is_delta_file, manifest_compress_type,
38    manifest_dir,
39};
40use crate::metrics::MANIFEST_OP_ELAPSED;
41use crate::region::{ManifestStats, RegionLeaderState, RegionRoleState};
42use crate::sst::FormatType;
43
44/// Options for [RegionManifestManager].
45#[derive(Debug, Clone)]
46pub struct RegionManifestOptions {
47    /// Directory to store manifest.
48    pub manifest_dir: String,
49    pub object_store: ObjectStore,
50    pub compress_type: CompressionType,
51    /// Interval of version ([ManifestVersion](store_api::manifest::ManifestVersion)) between two checkpoints.
52    /// Set to 0 to disable checkpoint.
53    pub checkpoint_distance: u64,
54    pub remove_file_options: RemoveFileOptions,
55}
56
57impl RegionManifestOptions {
58    /// Creates a new [RegionManifestOptions] with the given region directory, object store, and configuration.
59    pub fn new(config: &MitoConfig, region_dir: &str, object_store: &ObjectStore) -> Self {
60        RegionManifestOptions {
61            manifest_dir: manifest_dir(region_dir),
62            object_store: object_store.clone(),
63            // We don't allow users to set the compression algorithm as we use it as a file suffix.
64            // Currently, the manifest storage doesn't have good support for changing compression algorithms.
65            compress_type: manifest_compress_type(config.compress_manifest),
66            checkpoint_distance: config.manifest_checkpoint_distance,
67            remove_file_options: RemoveFileOptions {
68                enable_gc: config.gc.enable,
69            },
70        }
71    }
72}
73
74/// Options for updating `removed_files` field in [RegionManifest].
75#[derive(Debug, Clone)]
76#[cfg_attr(any(test, feature = "test"), derive(Default))]
77pub struct RemoveFileOptions {
78    /// Whether GC is enabled. If not, the removed files should always be empty when persisting manifest.
79    pub enable_gc: bool,
80}
81
82// rewrite note:
83// trait Checkpoint -> struct RegionCheckpoint
84// trait MetaAction -> struct RegionMetaActionList
85// trait MetaActionIterator -> struct MetaActionIteratorImpl
86
87#[cfg_attr(doc, aquamarine::aquamarine)]
88/// Manage region's manifest. Provide APIs to access (create/modify/recover) region's persisted
89/// metadata.
90///
91/// ```mermaid
92/// classDiagram
93/// class RegionManifestManager {
94///     -ManifestObjectStore store
95///     -RegionManifestOptions options
96///     -RegionManifest manifest
97///     +new() RegionManifestManager
98///     +open() Option~RegionManifestManager~
99///     +stop()
100///     +update(RegionMetaActionList action_list) ManifestVersion
101///     +manifest() RegionManifest
102/// }
103/// class ManifestObjectStore {
104///     -ObjectStore object_store
105/// }
106/// class RegionChange {
107///     -RegionMetadataRef metadata
108/// }
109/// class RegionEdit {
110///     -VersionNumber region_version
111///     -Vec~FileMeta~ files_to_add
112///     -Vec~FileMeta~ files_to_remove
113///     -SequenceNumber flushed_sequence
114/// }
115/// class RegionRemove {
116///     -RegionId region_id
117/// }
118/// RegionManifestManager o-- ManifestObjectStore
119/// RegionManifestManager o-- RegionManifest
120/// RegionManifestManager o-- RegionManifestOptions
121/// RegionManifestManager -- RegionMetaActionList
122/// RegionManifestManager -- RegionCheckpoint
123/// ManifestObjectStore o-- ObjectStore
124/// RegionMetaActionList o-- RegionMetaAction
125/// RegionMetaAction o-- ProtocolAction
126/// RegionMetaAction o-- RegionChange
127/// RegionMetaAction o-- RegionEdit
128/// RegionMetaAction o-- RegionRemove
129/// RegionChange o-- RegionMetadata
130/// RegionEdit o-- FileMeta
131///
132/// class RegionManifest {
133///     -RegionMetadataRef metadata
134///     -HashMap<FileId, FileMeta> files
135///     -ManifestVersion manifest_version
136/// }
137/// class RegionMetadata
138/// class FileMeta
139/// RegionManifest o-- RegionMetadata
140/// RegionManifest o-- FileMeta
141///
142/// class RegionCheckpoint {
143///     -ManifestVersion last_version
144///     -Option~RegionManifest~ checkpoint
145/// }
146/// RegionCheckpoint o-- RegionManifest
147/// ```
148#[derive(Debug)]
149pub struct RegionManifestManager {
150    store: ManifestObjectStore,
151    last_version: Arc<AtomicU64>,
152    checkpointer: Checkpointer,
153    manifest: Arc<RegionManifest>,
154    // Staging manifest is used to store the manifest of the staging region before it becomes available.
155    // It is initially inherited from the previous manifest(i.e., `self.manifest`).
156    // When the staging manifest becomes available, it will be used to construct the new manifest.
157    staging_manifest: Option<Arc<RegionManifest>>,
158    stats: ManifestStats,
159    stopped: bool,
160}
161
162impl RegionManifestManager {
163    /// Constructs a region's manifest and persist it.
164    pub async fn new(
165        metadata: RegionMetadataRef,
166        flushed_entry_id: u64,
167        options: RegionManifestOptions,
168        sst_format: FormatType,
169        stats: &ManifestStats,
170    ) -> Result<Self> {
171        // construct storage
172        let mut store = ManifestObjectStore::new(
173            &options.manifest_dir,
174            options.object_store.clone(),
175            options.compress_type,
176            stats.total_manifest_size.clone(),
177        );
178        let manifest_version = stats.manifest_version.clone();
179
180        info!(
181            "Creating region manifest in {} with metadata {:?}, flushed_entry_id: {}",
182            options.manifest_dir, metadata, flushed_entry_id
183        );
184
185        let version = MIN_VERSION;
186        let mut manifest_builder = RegionManifestBuilder::default();
187        // set the initial metadata.
188        manifest_builder.apply_change(
189            version,
190            RegionChange {
191                metadata: metadata.clone(),
192                sst_format,
193            },
194        );
195        let manifest = manifest_builder.try_build()?;
196        let region_id = metadata.region_id;
197
198        debug!(
199            "Build region manifest in {}, manifest: {:?}",
200            options.manifest_dir, manifest
201        );
202
203        let mut actions = vec![RegionMetaAction::Change(RegionChange {
204            metadata,
205            sst_format,
206        })];
207        if flushed_entry_id > 0 {
208            actions.push(RegionMetaAction::Edit(RegionEdit {
209                files_to_add: vec![],
210                files_to_remove: vec![],
211                timestamp_ms: None,
212                compaction_time_window: None,
213                flushed_entry_id: Some(flushed_entry_id),
214                flushed_sequence: None,
215                committed_sequence: None,
216            }));
217        }
218
219        // Persist region change.
220        let action_list = RegionMetaActionList::new(actions);
221
222        // New region is not in staging mode.
223        // TODO(ruihang): add staging mode support if needed.
224        store.save(version, &action_list.encode()?, false).await?;
225
226        let checkpointer = Checkpointer::new(region_id, options, store.clone(), MIN_VERSION);
227        manifest_version.store(version, Ordering::Relaxed);
228        manifest
229            .removed_files
230            .update_file_removed_cnt_to_stats(stats);
231        Ok(Self {
232            store,
233            last_version: manifest_version,
234            checkpointer,
235            manifest: Arc::new(manifest),
236            staging_manifest: None,
237            stats: stats.clone(),
238            stopped: false,
239        })
240    }
241
242    /// Opens an existing manifest.
243    ///
244    /// Returns `Ok(None)` if no such manifest.
245    pub async fn open(
246        options: RegionManifestOptions,
247        stats: &ManifestStats,
248    ) -> Result<Option<Self>> {
249        let _t = MANIFEST_OP_ELAPSED
250            .with_label_values(&["open"])
251            .start_timer();
252
253        // construct storage
254        let mut store = ManifestObjectStore::new(
255            &options.manifest_dir,
256            options.object_store.clone(),
257            options.compress_type,
258            stats.total_manifest_size.clone(),
259        );
260        let manifest_version = stats.manifest_version.clone();
261
262        // recover from storage
263        // construct manifest builder
264        // calculate the manifest size from the latest checkpoint
265        let mut version = MIN_VERSION;
266        let checkpoint = Self::last_checkpoint(&mut store).await?;
267        let last_checkpoint_version = checkpoint
268            .as_ref()
269            .map(|(checkpoint, _)| checkpoint.last_version)
270            .unwrap_or(MIN_VERSION);
271        let mut manifest_builder = if let Some((checkpoint, _)) = checkpoint {
272            info!(
273                "Recover region manifest {} from checkpoint version {}",
274                options.manifest_dir, checkpoint.last_version
275            );
276            version = version.max(checkpoint.last_version + 1);
277            RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint)
278        } else {
279            info!(
280                "Checkpoint not found in {}, build manifest from scratch",
281                options.manifest_dir
282            );
283            RegionManifestBuilder::default()
284        };
285
286        // apply actions from storage
287        let manifests = store.fetch_manifests(version, MAX_VERSION).await?;
288
289        for (manifest_version, raw_action_list) in manifests {
290            let action_list = RegionMetaActionList::decode(&raw_action_list)?;
291            // set manifest size after last checkpoint
292            store.set_delta_file_size(manifest_version, raw_action_list.len() as u64);
293            for action in action_list.actions {
294                match action {
295                    RegionMetaAction::Change(action) => {
296                        manifest_builder.apply_change(manifest_version, action);
297                    }
298                    RegionMetaAction::Edit(action) => {
299                        manifest_builder.apply_edit(manifest_version, action);
300                    }
301                    RegionMetaAction::Remove(_) => {
302                        debug!(
303                            "Unhandled action in {}, action: {:?}",
304                            options.manifest_dir, action
305                        );
306                    }
307                    RegionMetaAction::Truncate(action) => {
308                        manifest_builder.apply_truncate(manifest_version, action);
309                    }
310                }
311            }
312        }
313
314        // set the initial metadata if necessary
315        if !manifest_builder.contains_metadata() {
316            debug!("No region manifest in {}", options.manifest_dir);
317            return Ok(None);
318        }
319
320        let manifest = manifest_builder.try_build()?;
321        debug!(
322            "Recovered region manifest from {}, manifest: {:?}",
323            options.manifest_dir, manifest
324        );
325        let version = manifest.manifest_version;
326
327        let checkpointer = Checkpointer::new(
328            manifest.metadata.region_id,
329            options,
330            store.clone(),
331            last_checkpoint_version,
332        );
333        manifest_version.store(version, Ordering::Relaxed);
334        manifest
335            .removed_files
336            .update_file_removed_cnt_to_stats(stats);
337        Ok(Some(Self {
338            store,
339            last_version: manifest_version,
340            checkpointer,
341            manifest: Arc::new(manifest),
342            // TODO(weny): open the staging manifest if exists.
343            staging_manifest: None,
344            stats: stats.clone(),
345            stopped: false,
346        }))
347    }
348
349    /// Stops the manager.
350    pub async fn stop(&mut self) {
351        self.stopped = true;
352    }
353
354    /// Installs the manifest changes from the current version to the target version (inclusive).
355    ///
356    /// Returns installed version.
357    /// **Note**: This function is not guaranteed to install the target version strictly.
358    /// The installed version may be greater than the target version.
359    pub async fn install_manifest_to(
360        &mut self,
361        target_version: ManifestVersion,
362    ) -> Result<ManifestVersion> {
363        let _t = MANIFEST_OP_ELAPSED
364            .with_label_values(&["install_manifest_to"])
365            .start_timer();
366
367        let last_version = self.last_version();
368        // Case 1: If the target version is less than the current version, return the current version.
369        if last_version >= target_version {
370            debug!(
371                "Target version {} is less than or equal to the current version {}, region: {}, skip install",
372                target_version, last_version, self.manifest.metadata.region_id
373            );
374            return Ok(last_version);
375        }
376
377        ensure!(
378            !self.stopped,
379            RegionStoppedSnafu {
380                region_id: self.manifest.metadata.region_id,
381            }
382        );
383
384        let region_id = self.manifest.metadata.region_id;
385        // Fetches manifests from the last version strictly.
386        let mut manifests = self
387            .store
388            // Invariant: last_version < target_version.
389            .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
390            .await?;
391
392        // Case 2: No manifests in range: [current_version+1, target_version+1)
393        //
394        // |---------Has been deleted------------|     [Checkpoint Version]...[Latest Version]
395        //                                                                    [Leader region]
396        // [Current Version]......[Target Version]
397        // [Follower region]
398        if manifests.is_empty() {
399            info!(
400                "Manifests are not strict from {}, region: {}, tries to install the last checkpoint",
401                last_version, self.manifest.metadata.region_id
402            );
403            let last_version = self.install_last_checkpoint().await?;
404            // Case 2.1: If the installed checkpoint version is greater than or equal to the target version, return the last version.
405            if last_version >= target_version {
406                return Ok(last_version);
407            }
408
409            // Fetches manifests from the installed version strictly.
410            manifests = self
411                .store
412                // Invariant: last_version < target_version.
413                .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
414                .await?;
415        }
416
417        if manifests.is_empty() {
418            return NoManifestsSnafu {
419                region_id: self.manifest.metadata.region_id,
420                start_version: last_version + 1,
421                end_version: target_version + 1,
422                last_version,
423            }
424            .fail();
425        }
426
427        debug_assert_eq!(manifests.first().unwrap().0, last_version + 1);
428        let mut manifest_builder =
429            RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
430
431        for (manifest_version, raw_action_list) in manifests {
432            self.store
433                .set_delta_file_size(manifest_version, raw_action_list.len() as u64);
434            let action_list = RegionMetaActionList::decode(&raw_action_list)?;
435            for action in action_list.actions {
436                match action {
437                    RegionMetaAction::Change(action) => {
438                        manifest_builder.apply_change(manifest_version, action);
439                    }
440                    RegionMetaAction::Edit(action) => {
441                        manifest_builder.apply_edit(manifest_version, action);
442                    }
443                    RegionMetaAction::Remove(_) => {
444                        debug!(
445                            "Unhandled action for region {}, action: {:?}",
446                            self.manifest.metadata.region_id, action
447                        );
448                    }
449                    RegionMetaAction::Truncate(action) => {
450                        manifest_builder.apply_truncate(manifest_version, action);
451                    }
452                }
453            }
454        }
455
456        let new_manifest = manifest_builder.try_build()?;
457        ensure!(
458            new_manifest.manifest_version >= target_version,
459            InstallManifestToSnafu {
460                region_id: self.manifest.metadata.region_id,
461                target_version,
462                available_version: new_manifest.manifest_version,
463                last_version,
464            }
465        );
466
467        let version = self.last_version();
468        new_manifest
469            .removed_files
470            .update_file_removed_cnt_to_stats(&self.stats);
471        self.manifest = Arc::new(new_manifest);
472        let last_version = self.set_version(self.manifest.manifest_version);
473        info!(
474            "Install manifest changes from {} to {}, region: {}",
475            version, last_version, self.manifest.metadata.region_id
476        );
477
478        Ok(last_version)
479    }
480
481    /// Installs the last checkpoint.
482    pub(crate) async fn install_last_checkpoint(&mut self) -> Result<ManifestVersion> {
483        let last_version = self.last_version();
484        let Some((checkpoint, checkpoint_size)) = Self::last_checkpoint(&mut self.store).await?
485        else {
486            return NoCheckpointSnafu {
487                region_id: self.manifest.metadata.region_id,
488                last_version,
489            }
490            .fail();
491        };
492        self.store.reset_manifest_size();
493        self.store
494            .set_checkpoint_file_size(checkpoint.last_version, checkpoint_size);
495        let builder = RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint);
496        let manifest = builder.try_build()?;
497        let last_version = self.set_version(manifest.manifest_version);
498        manifest
499            .removed_files
500            .update_file_removed_cnt_to_stats(&self.stats);
501        self.manifest = Arc::new(manifest);
502        info!(
503            "Installed region manifest from checkpoint: {}, region: {}",
504            checkpoint.last_version, self.manifest.metadata.region_id
505        );
506
507        Ok(last_version)
508    }
509
510    /// Updates the manifest. Returns the current manifest version number.
511    pub async fn update(
512        &mut self,
513        action_list: RegionMetaActionList,
514        is_staging: bool,
515    ) -> Result<ManifestVersion> {
516        let _t = MANIFEST_OP_ELAPSED
517            .with_label_values(&["update"])
518            .start_timer();
519
520        ensure!(
521            !self.stopped,
522            RegionStoppedSnafu {
523                region_id: self.manifest.metadata.region_id,
524            }
525        );
526
527        let version = self.increase_version();
528        self.store
529            .save(version, &action_list.encode()?, is_staging)
530            .await?;
531
532        // For a staging region, the manifest is initially inherited from the previous manifest(i.e., `self.manifest`).
533        // When the staging manifest becomes available, it will be used to construct the new manifest.
534        let mut manifest_builder =
535            if is_staging && let Some(staging_manifest) = self.staging_manifest.as_ref() {
536                RegionManifestBuilder::with_checkpoint(Some(staging_manifest.as_ref().clone()))
537            } else {
538                RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()))
539            };
540
541        for action in action_list.actions {
542            match action {
543                RegionMetaAction::Change(action) => {
544                    manifest_builder.apply_change(version, action);
545                }
546                RegionMetaAction::Edit(action) => {
547                    manifest_builder.apply_edit(version, action);
548                }
549                RegionMetaAction::Remove(_) => {
550                    debug!(
551                        "Unhandled action for region {}, action: {:?}",
552                        self.manifest.metadata.region_id, action
553                    );
554                }
555                RegionMetaAction::Truncate(action) => {
556                    manifest_builder.apply_truncate(version, action);
557                }
558            }
559        }
560
561        if is_staging {
562            let new_manifest = manifest_builder.try_build()?;
563            self.staging_manifest = Some(Arc::new(new_manifest));
564
565            info!(
566                "Skipping checkpoint for region {} in staging mode, manifest version: {}",
567                self.manifest.metadata.region_id, self.manifest.manifest_version
568            );
569        } else {
570            let new_manifest = manifest_builder.try_build()?;
571            new_manifest
572                .removed_files
573                .update_file_removed_cnt_to_stats(&self.stats);
574            let updated_manifest = self
575                .checkpointer
576                .update_manifest_removed_files(new_manifest)?;
577            self.manifest = Arc::new(updated_manifest);
578            self.checkpointer
579                .maybe_do_checkpoint(self.manifest.as_ref());
580        }
581
582        Ok(version)
583    }
584
585    /// Clear deleted files from manifest's `removed_files` field without update version. Notice if datanode exit before checkpoint then new manifest by open region may still contain these deleted files, which is acceptable for gc process.
586    pub fn clear_deleted_files(&mut self, deleted_files: Vec<FileId>) {
587        let mut manifest = (*self.manifest()).clone();
588        manifest.removed_files.clear_deleted_files(deleted_files);
589        self.set_manifest(Arc::new(manifest));
590    }
591
592    pub(crate) fn set_manifest(&mut self, manifest: Arc<RegionManifest>) {
593        self.manifest = manifest;
594    }
595
596    /// Retrieves the current [RegionManifest].
597    pub fn manifest(&self) -> Arc<RegionManifest> {
598        self.manifest.clone()
599    }
600
601    /// Retrieves the current [RegionManifest].
602    pub fn staging_manifest(&self) -> Option<Arc<RegionManifest>> {
603        self.staging_manifest.clone()
604    }
605
606    /// Returns total manifest size.
607    pub fn manifest_usage(&self) -> u64 {
608        self.store.total_manifest_size()
609    }
610
611    /// Returns true if a newer version manifest file is found.
612    ///
613    /// It is typically used in read-only regions to catch up with manifest.
614    /// It doesn't lock the manifest directory in the object store so the result
615    /// may be inaccurate if there are concurrent writes.
616    pub async fn has_update(&self) -> Result<bool> {
617        let last_version = self.last_version();
618
619        let streamer =
620            self.store
621                .manifest_lister(false)
622                .await?
623                .context(error::EmptyManifestDirSnafu {
624                    manifest_dir: self.store.manifest_dir(),
625                })?;
626
627        let need_update = streamer
628            .try_any(|entry| async move {
629                let file_name = entry.name();
630                if is_delta_file(file_name) || is_checkpoint_file(file_name) {
631                    let version = file_version(file_name);
632                    if version > last_version {
633                        return true;
634                    }
635                }
636                false
637            })
638            .await
639            .context(error::OpenDalSnafu)?;
640
641        Ok(need_update)
642    }
643
644    /// Increases last version and returns the increased version.
645    fn increase_version(&mut self) -> ManifestVersion {
646        let previous = self.last_version.fetch_add(1, Ordering::Relaxed);
647        previous + 1
648    }
649
650    /// Sets the last version.
651    fn set_version(&mut self, version: ManifestVersion) -> ManifestVersion {
652        self.last_version.store(version, Ordering::Relaxed);
653        version
654    }
655
656    fn last_version(&self) -> ManifestVersion {
657        self.last_version.load(Ordering::Relaxed)
658    }
659
660    /// Fetches the last [RegionCheckpoint] from storage.
661    ///
662    /// If the checkpoint is not found, returns `None`.
663    /// Otherwise, returns the checkpoint and the size of the checkpoint.
664    pub(crate) async fn last_checkpoint(
665        store: &mut ManifestObjectStore,
666    ) -> Result<Option<(RegionCheckpoint, u64)>> {
667        let last_checkpoint = store.load_last_checkpoint().await?;
668
669        if let Some((_, bytes)) = last_checkpoint {
670            let checkpoint = RegionCheckpoint::decode(&bytes)?;
671            Ok(Some((checkpoint, bytes.len() as u64)))
672        } else {
673            Ok(None)
674        }
675    }
676
677    pub fn store(&self) -> ManifestObjectStore {
678        self.store.clone()
679    }
680
681    #[cfg(test)]
682    pub(crate) fn checkpointer(&self) -> &Checkpointer {
683        &self.checkpointer
684    }
685
686    /// Merge all staged manifest actions into a single action list ready for submission.
687    /// This collects all staging manifests, applies them sequentially, and returns the merged actions.
688    pub(crate) async fn merge_staged_actions(
689        &mut self,
690        region_state: RegionRoleState,
691    ) -> Result<Option<RegionMetaActionList>> {
692        // Only merge if we're in staging mode
693        if region_state != RegionRoleState::Leader(RegionLeaderState::Staging) {
694            return Ok(None);
695        }
696
697        // Fetch all staging manifests
698        let staging_manifests = self.store.fetch_staging_manifests().await?;
699
700        if staging_manifests.is_empty() {
701            info!(
702                "No staging manifests to merge for region {}",
703                self.manifest.metadata.region_id
704            );
705            return Ok(None);
706        }
707
708        info!(
709            "Merging {} staging manifests for region {}",
710            staging_manifests.len(),
711            self.manifest.metadata.region_id
712        );
713
714        // Start with current manifest state as the base
715        let mut merged_actions = Vec::new();
716        let mut latest_version = self.last_version();
717
718        // Apply all staging actions in order
719        for (manifest_version, raw_action_list) in staging_manifests {
720            let action_list = RegionMetaActionList::decode(&raw_action_list)?;
721
722            for action in action_list.actions {
723                merged_actions.push(action);
724            }
725
726            latest_version = latest_version.max(manifest_version);
727        }
728
729        if merged_actions.is_empty() {
730            return Ok(None);
731        }
732
733        info!(
734            "Successfully merged {} actions from staging manifests for region {}, latest version: {}",
735            merged_actions.len(),
736            self.manifest.metadata.region_id,
737            latest_version
738        );
739
740        Ok(Some(RegionMetaActionList::new(merged_actions)))
741    }
742
743    /// Unsets the staging manifest.
744    pub(crate) fn unset_staging_manifest(&mut self) {
745        self.staging_manifest = None;
746    }
747
748    /// Clear all staging manifests.
749    pub(crate) async fn clear_staging_manifest_and_dir(&mut self) -> Result<()> {
750        self.staging_manifest = None;
751        self.store.clear_staging_manifests().await?;
752        info!(
753            "Cleared all staging manifests for region {}",
754            self.manifest.metadata.region_id
755        );
756        Ok(())
757    }
758}
759
760#[cfg(test)]
761impl RegionManifestManager {
762    fn validate_manifest(&self, expect: &RegionMetadataRef, last_version: ManifestVersion) {
763        let manifest = self.manifest();
764        assert_eq!(manifest.metadata, *expect);
765        assert_eq!(self.manifest.manifest_version, self.last_version());
766        assert_eq!(last_version, self.last_version());
767    }
768}
769
770#[cfg(test)]
771mod test {
772    use std::time::Duration;
773
774    use api::v1::SemanticType;
775    use common_datasource::compression::CompressionType;
776    use common_test_util::temp_dir::create_temp_dir;
777    use datatypes::prelude::ConcreteDataType;
778    use datatypes::schema::ColumnSchema;
779    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
780
781    use super::*;
782    use crate::manifest::action::{RegionChange, RegionEdit};
783    use crate::manifest::tests::utils::basic_region_metadata;
784    use crate::test_util::TestEnv;
785
786    #[tokio::test]
787    async fn create_manifest_manager() {
788        let metadata = Arc::new(basic_region_metadata());
789        let env = TestEnv::new().await;
790        let manager = env
791            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
792            .await
793            .unwrap()
794            .unwrap();
795
796        manager.validate_manifest(&metadata, 0);
797    }
798
799    #[tokio::test]
800    async fn open_manifest_manager() {
801        let env = TestEnv::new().await;
802        // Try to opens an empty manifest.
803        assert!(
804            env.create_manifest_manager(CompressionType::Uncompressed, 10, None)
805                .await
806                .unwrap()
807                .is_none()
808        );
809
810        // Creates a manifest.
811        let metadata = Arc::new(basic_region_metadata());
812        let mut manager = env
813            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
814            .await
815            .unwrap()
816            .unwrap();
817        // Stops it.
818        manager.stop().await;
819
820        // Open it.
821        let manager = env
822            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
823            .await
824            .unwrap()
825            .unwrap();
826
827        manager.validate_manifest(&metadata, 0);
828    }
829
830    #[tokio::test]
831    async fn manifest_with_partition_expr_roundtrip() {
832        let env = TestEnv::new().await;
833        let expr_json =
834            r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
835        let mut metadata = basic_region_metadata();
836        metadata.partition_expr = Some(expr_json.to_string());
837        let metadata = Arc::new(metadata);
838        let mut manager = env
839            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
840            .await
841            .unwrap()
842            .unwrap();
843
844        // persisted manifest should contain the same partition_expr JSON
845        let manifest = manager.manifest();
846        assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
847
848        manager.stop().await;
849
850        // Reopen and check again
851        let manager = env
852            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
853            .await
854            .unwrap()
855            .unwrap();
856        let manifest = manager.manifest();
857        assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
858    }
859
860    #[tokio::test]
861    async fn region_change_add_column() {
862        let metadata = Arc::new(basic_region_metadata());
863        let env = TestEnv::new().await;
864        let mut manager = env
865            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
866            .await
867            .unwrap()
868            .unwrap();
869
870        let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
871        new_metadata_builder.push_column_metadata(ColumnMetadata {
872            column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
873            semantic_type: SemanticType::Field,
874            column_id: 252,
875        });
876        let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
877
878        let action_list =
879            RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
880                metadata: new_metadata.clone(),
881                sst_format: FormatType::PrimaryKey,
882            }));
883
884        let current_version = manager.update(action_list, false).await.unwrap();
885        assert_eq!(current_version, 1);
886        manager.validate_manifest(&new_metadata, 1);
887
888        // Reopen the manager.
889        manager.stop().await;
890        let manager = env
891            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
892            .await
893            .unwrap()
894            .unwrap();
895        manager.validate_manifest(&new_metadata, 1);
896    }
897
898    /// Just for test, refer to wal_dir_usage in src/store-api/src/logstore.rs.
899    async fn manifest_dir_usage(path: &str) -> u64 {
900        let mut size = 0;
901        let mut read_dir = tokio::fs::read_dir(path).await.unwrap();
902        while let Ok(dir_entry) = read_dir.next_entry().await {
903            let Some(entry) = dir_entry else {
904                break;
905            };
906            if entry.file_type().await.unwrap().is_file() {
907                let file_name = entry.file_name().into_string().unwrap();
908                if file_name.contains(".checkpoint") || file_name.contains(".json") {
909                    let file_size = entry.metadata().await.unwrap().len() as usize;
910                    debug!("File: {file_name:?}, size: {file_size}");
911                    size += file_size;
912                }
913            }
914        }
915        size as u64
916    }
917
918    #[tokio::test]
919    async fn test_manifest_size() {
920        let metadata = Arc::new(basic_region_metadata());
921        let data_home = create_temp_dir("");
922        let data_home_path = data_home.path().to_str().unwrap().to_string();
923        let env = TestEnv::with_data_home(data_home).await;
924
925        let manifest_dir = format!("{}/manifest", data_home_path);
926
927        let mut manager = env
928            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
929            .await
930            .unwrap()
931            .unwrap();
932
933        let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
934        new_metadata_builder.push_column_metadata(ColumnMetadata {
935            column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
936            semantic_type: SemanticType::Field,
937            column_id: 252,
938        });
939        let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
940
941        let action_list =
942            RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
943                metadata: new_metadata.clone(),
944                sst_format: FormatType::PrimaryKey,
945            }));
946
947        let current_version = manager.update(action_list, false).await.unwrap();
948        assert_eq!(current_version, 1);
949        manager.validate_manifest(&new_metadata, 1);
950
951        // get manifest size
952        let manifest_size = manager.manifest_usage();
953        assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
954
955        // update 10 times nop_action to trigger checkpoint
956        for _ in 0..10 {
957            manager
958                .update(
959                    RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
960                        files_to_add: vec![],
961                        files_to_remove: vec![],
962                        timestamp_ms: None,
963                        compaction_time_window: None,
964                        flushed_entry_id: None,
965                        flushed_sequence: None,
966                        committed_sequence: None,
967                    })]),
968                    false,
969                )
970                .await
971                .unwrap();
972        }
973
974        while manager.checkpointer.is_doing_checkpoint() {
975            tokio::time::sleep(Duration::from_millis(10)).await;
976        }
977
978        // check manifest size again
979        let manifest_size = manager.manifest_usage();
980        assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
981
982        // Reopen the manager,
983        // we just calculate the size from the latest checkpoint file
984        manager.stop().await;
985        let manager = env
986            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
987            .await
988            .unwrap()
989            .unwrap();
990        manager.validate_manifest(&new_metadata, 11);
991
992        // get manifest size again
993        let manifest_size = manager.manifest_usage();
994        assert_eq!(manifest_size, 1378);
995    }
996}