mito2/manifest/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use common_datasource::compression::CompressionType;
19use common_telemetry::{debug, info};
20use futures::TryStreamExt;
21use object_store::ObjectStore;
22use snafu::{OptionExt, ResultExt, ensure};
23use store_api::metadata::RegionMetadataRef;
24use store_api::{MAX_VERSION, MIN_VERSION, ManifestVersion};
25
26use crate::cache::manifest_cache::ManifestCache;
27use crate::config::MitoConfig;
28use crate::error::{
29    self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result,
30};
31use crate::manifest::action::{
32    RegionChange, RegionCheckpoint, RegionEdit, RegionManifest, RegionManifestBuilder,
33    RegionMetaAction, RegionMetaActionList, RemovedFile,
34};
35use crate::manifest::checkpointer::Checkpointer;
36use crate::manifest::storage::{
37    ManifestObjectStore, file_version, is_checkpoint_file, is_delta_file, manifest_compress_type,
38    manifest_dir,
39};
40use crate::metrics::MANIFEST_OP_ELAPSED;
41use crate::region::{ManifestStats, RegionLeaderState, RegionRoleState};
42use crate::sst::FormatType;
43
44/// Options for [RegionManifestManager].
45#[derive(Debug, Clone)]
46pub struct RegionManifestOptions {
47    /// Directory to store manifest.
48    pub manifest_dir: String,
49    pub object_store: ObjectStore,
50    pub compress_type: CompressionType,
51    /// Interval of version ([ManifestVersion](store_api::manifest::ManifestVersion)) between two checkpoints.
52    /// Set to 0 to disable checkpoint.
53    pub checkpoint_distance: u64,
54    pub remove_file_options: RemoveFileOptions,
55    /// Optional cache for manifest files.
56    pub manifest_cache: Option<ManifestCache>,
57}
58
59impl RegionManifestOptions {
60    /// Creates a new [RegionManifestOptions] with the given region directory, object store, and configuration.
61    pub fn new(config: &MitoConfig, region_dir: &str, object_store: &ObjectStore) -> Self {
62        RegionManifestOptions {
63            manifest_dir: manifest_dir(region_dir),
64            object_store: object_store.clone(),
65            // We don't allow users to set the compression algorithm as we use it as a file suffix.
66            // Currently, the manifest storage doesn't have good support for changing compression algorithms.
67            compress_type: manifest_compress_type(config.compress_manifest),
68            checkpoint_distance: config.manifest_checkpoint_distance,
69            remove_file_options: RemoveFileOptions {
70                enable_gc: config.gc.enable,
71            },
72            manifest_cache: None,
73        }
74    }
75}
76
77/// Options for updating `removed_files` field in [RegionManifest].
78#[derive(Debug, Clone)]
79#[cfg_attr(any(test, feature = "test"), derive(Default))]
80pub struct RemoveFileOptions {
81    /// Whether GC is enabled. If not, the removed files should always be empty when persisting manifest.
82    pub enable_gc: bool,
83}
84
85// rewrite note:
86// trait Checkpoint -> struct RegionCheckpoint
87// trait MetaAction -> struct RegionMetaActionList
88// trait MetaActionIterator -> struct MetaActionIteratorImpl
89
90#[cfg_attr(doc, aquamarine::aquamarine)]
91/// Manage region's manifest. Provide APIs to access (create/modify/recover) region's persisted
92/// metadata.
93///
94/// ```mermaid
95/// classDiagram
96/// class RegionManifestManager {
97///     -ManifestObjectStore store
98///     -RegionManifestOptions options
99///     -RegionManifest manifest
100///     +new() RegionManifestManager
101///     +open() Option~RegionManifestManager~
102///     +stop()
103///     +update(RegionMetaActionList action_list) ManifestVersion
104///     +manifest() RegionManifest
105/// }
106/// class ManifestObjectStore {
107///     -ObjectStore object_store
108/// }
109/// class RegionChange {
110///     -RegionMetadataRef metadata
111/// }
112/// class RegionEdit {
113///     -VersionNumber region_version
114///     -Vec~FileMeta~ files_to_add
115///     -Vec~FileMeta~ files_to_remove
116///     -SequenceNumber flushed_sequence
117/// }
118/// class RegionRemove {
119///     -RegionId region_id
120/// }
121/// RegionManifestManager o-- ManifestObjectStore
122/// RegionManifestManager o-- RegionManifest
123/// RegionManifestManager o-- RegionManifestOptions
124/// RegionManifestManager -- RegionMetaActionList
125/// RegionManifestManager -- RegionCheckpoint
126/// ManifestObjectStore o-- ObjectStore
127/// RegionMetaActionList o-- RegionMetaAction
128/// RegionMetaAction o-- ProtocolAction
129/// RegionMetaAction o-- RegionChange
130/// RegionMetaAction o-- RegionEdit
131/// RegionMetaAction o-- RegionRemove
132/// RegionChange o-- RegionMetadata
133/// RegionEdit o-- FileMeta
134///
135/// class RegionManifest {
136///     -RegionMetadataRef metadata
137///     -HashMap&lt;FileId, FileMeta&gt; files
138///     -ManifestVersion manifest_version
139/// }
140/// class RegionMetadata
141/// class FileMeta
142/// RegionManifest o-- RegionMetadata
143/// RegionManifest o-- FileMeta
144///
145/// class RegionCheckpoint {
146///     -ManifestVersion last_version
147///     -Option~RegionManifest~ checkpoint
148/// }
149/// RegionCheckpoint o-- RegionManifest
150/// ```
151#[derive(Debug)]
152pub struct RegionManifestManager {
153    store: ManifestObjectStore,
154    last_version: Arc<AtomicU64>,
155    checkpointer: Checkpointer,
156    manifest: Arc<RegionManifest>,
157    // Staging manifest is used to store the manifest of the staging region before it becomes available.
158    // It is initially inherited from the previous manifest(i.e., `self.manifest`).
159    // When the staging manifest becomes available, it will be used to construct the new manifest.
160    staging_manifest: Option<Arc<RegionManifest>>,
161    stats: ManifestStats,
162    stopped: bool,
163}
164
165impl RegionManifestManager {
166    /// Constructs a region's manifest and persist it.
167    pub async fn new(
168        metadata: RegionMetadataRef,
169        flushed_entry_id: u64,
170        options: RegionManifestOptions,
171        sst_format: FormatType,
172        stats: &ManifestStats,
173    ) -> Result<Self> {
174        // construct storage
175        let mut store = ManifestObjectStore::new(
176            &options.manifest_dir,
177            options.object_store.clone(),
178            options.compress_type,
179            stats.total_manifest_size.clone(),
180            options.manifest_cache.clone(),
181        );
182        let manifest_version = stats.manifest_version.clone();
183
184        info!(
185            "Creating region manifest in {} with metadata {:?}, flushed_entry_id: {}",
186            options.manifest_dir, metadata, flushed_entry_id
187        );
188
189        let version = MIN_VERSION;
190        let mut manifest_builder = RegionManifestBuilder::default();
191        // set the initial metadata.
192        manifest_builder.apply_change(
193            version,
194            RegionChange {
195                metadata: metadata.clone(),
196                sst_format,
197            },
198        );
199        let manifest = manifest_builder.try_build()?;
200        let region_id = metadata.region_id;
201
202        debug!(
203            "Build region manifest in {}, manifest: {:?}",
204            options.manifest_dir, manifest
205        );
206
207        let mut actions = vec![RegionMetaAction::Change(RegionChange {
208            metadata,
209            sst_format,
210        })];
211        if flushed_entry_id > 0 {
212            actions.push(RegionMetaAction::Edit(RegionEdit {
213                files_to_add: vec![],
214                files_to_remove: vec![],
215                timestamp_ms: None,
216                compaction_time_window: None,
217                flushed_entry_id: Some(flushed_entry_id),
218                flushed_sequence: None,
219                committed_sequence: None,
220            }));
221        }
222
223        // Persist region change.
224        let action_list = RegionMetaActionList::new(actions);
225
226        // New region is not in staging mode.
227        // TODO(ruihang): add staging mode support if needed.
228        store.save(version, &action_list.encode()?, false).await?;
229
230        let checkpointer = Checkpointer::new(region_id, options, store.clone(), MIN_VERSION);
231        manifest_version.store(version, Ordering::Relaxed);
232        manifest
233            .removed_files
234            .update_file_removed_cnt_to_stats(stats);
235        Ok(Self {
236            store,
237            last_version: manifest_version,
238            checkpointer,
239            manifest: Arc::new(manifest),
240            staging_manifest: None,
241            stats: stats.clone(),
242            stopped: false,
243        })
244    }
245
246    /// Opens an existing manifest.
247    ///
248    /// Returns `Ok(None)` if no such manifest.
249    pub async fn open(
250        options: RegionManifestOptions,
251        stats: &ManifestStats,
252    ) -> Result<Option<Self>> {
253        let _t = MANIFEST_OP_ELAPSED
254            .with_label_values(&["open"])
255            .start_timer();
256
257        // construct storage
258        let mut store = ManifestObjectStore::new(
259            &options.manifest_dir,
260            options.object_store.clone(),
261            options.compress_type,
262            stats.total_manifest_size.clone(),
263            options.manifest_cache.clone(),
264        );
265        let manifest_version = stats.manifest_version.clone();
266
267        // recover from storage
268        // construct manifest builder
269        // calculate the manifest size from the latest checkpoint
270        let mut version = MIN_VERSION;
271        let checkpoint = Self::last_checkpoint(&mut store).await?;
272        let last_checkpoint_version = checkpoint
273            .as_ref()
274            .map(|(checkpoint, _)| checkpoint.last_version)
275            .unwrap_or(MIN_VERSION);
276        let mut manifest_builder = if let Some((checkpoint, _)) = checkpoint {
277            info!(
278                "Recover region manifest {} from checkpoint version {}",
279                options.manifest_dir, checkpoint.last_version
280            );
281            version = version.max(checkpoint.last_version + 1);
282            RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint)
283        } else {
284            info!(
285                "Checkpoint not found in {}, build manifest from scratch",
286                options.manifest_dir
287            );
288            RegionManifestBuilder::default()
289        };
290
291        // apply actions from storage
292        let manifests = store.fetch_manifests(version, MAX_VERSION).await?;
293
294        for (manifest_version, raw_action_list) in manifests {
295            let action_list = RegionMetaActionList::decode(&raw_action_list)?;
296            // set manifest size after last checkpoint
297            store.set_delta_file_size(manifest_version, raw_action_list.len() as u64);
298            for action in action_list.actions {
299                match action {
300                    RegionMetaAction::Change(action) => {
301                        manifest_builder.apply_change(manifest_version, action);
302                    }
303                    RegionMetaAction::Edit(action) => {
304                        manifest_builder.apply_edit(manifest_version, action);
305                    }
306                    RegionMetaAction::Remove(_) => {
307                        debug!(
308                            "Unhandled action in {}, action: {:?}",
309                            options.manifest_dir, action
310                        );
311                    }
312                    RegionMetaAction::Truncate(action) => {
313                        manifest_builder.apply_truncate(manifest_version, action);
314                    }
315                }
316            }
317        }
318
319        // set the initial metadata if necessary
320        if !manifest_builder.contains_metadata() {
321            debug!("No region manifest in {}", options.manifest_dir);
322            return Ok(None);
323        }
324
325        let manifest = manifest_builder.try_build()?;
326        debug!(
327            "Recovered region manifest from {}, manifest: {:?}",
328            options.manifest_dir, manifest
329        );
330        let version = manifest.manifest_version;
331
332        let checkpointer = Checkpointer::new(
333            manifest.metadata.region_id,
334            options,
335            store.clone(),
336            last_checkpoint_version,
337        );
338        manifest_version.store(version, Ordering::Relaxed);
339        manifest
340            .removed_files
341            .update_file_removed_cnt_to_stats(stats);
342        Ok(Some(Self {
343            store,
344            last_version: manifest_version,
345            checkpointer,
346            manifest: Arc::new(manifest),
347            // TODO(weny): open the staging manifest if exists.
348            staging_manifest: None,
349            stats: stats.clone(),
350            stopped: false,
351        }))
352    }
353
354    /// Stops the manager.
355    pub async fn stop(&mut self) {
356        self.stopped = true;
357    }
358
359    /// Installs the manifest changes from the current version to the target version (inclusive).
360    ///
361    /// Returns installed version.
362    /// **Note**: This function is not guaranteed to install the target version strictly.
363    /// The installed version may be greater than the target version.
364    pub async fn install_manifest_to(
365        &mut self,
366        target_version: ManifestVersion,
367    ) -> Result<ManifestVersion> {
368        let _t = MANIFEST_OP_ELAPSED
369            .with_label_values(&["install_manifest_to"])
370            .start_timer();
371
372        let last_version = self.last_version();
373        // Case 1: If the target version is less than the current version, return the current version.
374        if last_version >= target_version {
375            debug!(
376                "Target version {} is less than or equal to the current version {}, region: {}, skip install",
377                target_version, last_version, self.manifest.metadata.region_id
378            );
379            return Ok(last_version);
380        }
381
382        ensure!(
383            !self.stopped,
384            RegionStoppedSnafu {
385                region_id: self.manifest.metadata.region_id,
386            }
387        );
388
389        let region_id = self.manifest.metadata.region_id;
390        // Fetches manifests from the last version strictly.
391        let mut manifests = self
392            .store
393            // Invariant: last_version < target_version.
394            .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
395            .await?;
396
397        // Case 2: No manifests in range: [current_version+1, target_version+1)
398        //
399        // |---------Has been deleted------------|     [Checkpoint Version]...[Latest Version]
400        //                                                                    [Leader region]
401        // [Current Version]......[Target Version]
402        // [Follower region]
403        if manifests.is_empty() {
404            info!(
405                "Manifests are not strict from {}, region: {}, tries to install the last checkpoint",
406                last_version, self.manifest.metadata.region_id
407            );
408            let last_version = self.install_last_checkpoint().await?;
409            // Case 2.1: If the installed checkpoint version is greater than or equal to the target version, return the last version.
410            if last_version >= target_version {
411                return Ok(last_version);
412            }
413
414            // Fetches manifests from the installed version strictly.
415            manifests = self
416                .store
417                // Invariant: last_version < target_version.
418                .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
419                .await?;
420        }
421
422        if manifests.is_empty() {
423            return NoManifestsSnafu {
424                region_id: self.manifest.metadata.region_id,
425                start_version: last_version + 1,
426                end_version: target_version + 1,
427                last_version,
428            }
429            .fail();
430        }
431
432        debug_assert_eq!(manifests.first().unwrap().0, last_version + 1);
433        let mut manifest_builder =
434            RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
435
436        for (manifest_version, raw_action_list) in manifests {
437            self.store
438                .set_delta_file_size(manifest_version, raw_action_list.len() as u64);
439            let action_list = RegionMetaActionList::decode(&raw_action_list)?;
440            for action in action_list.actions {
441                match action {
442                    RegionMetaAction::Change(action) => {
443                        manifest_builder.apply_change(manifest_version, action);
444                    }
445                    RegionMetaAction::Edit(action) => {
446                        manifest_builder.apply_edit(manifest_version, action);
447                    }
448                    RegionMetaAction::Remove(_) => {
449                        debug!(
450                            "Unhandled action for region {}, action: {:?}",
451                            self.manifest.metadata.region_id, action
452                        );
453                    }
454                    RegionMetaAction::Truncate(action) => {
455                        manifest_builder.apply_truncate(manifest_version, action);
456                    }
457                }
458            }
459        }
460
461        let new_manifest = manifest_builder.try_build()?;
462        ensure!(
463            new_manifest.manifest_version >= target_version,
464            InstallManifestToSnafu {
465                region_id: self.manifest.metadata.region_id,
466                target_version,
467                available_version: new_manifest.manifest_version,
468                last_version,
469            }
470        );
471
472        let version = self.last_version();
473        new_manifest
474            .removed_files
475            .update_file_removed_cnt_to_stats(&self.stats);
476        self.manifest = Arc::new(new_manifest);
477        let last_version = self.set_version(self.manifest.manifest_version);
478        info!(
479            "Install manifest changes from {} to {}, region: {}",
480            version, last_version, self.manifest.metadata.region_id
481        );
482
483        Ok(last_version)
484    }
485
486    /// Installs the last checkpoint.
487    pub(crate) async fn install_last_checkpoint(&mut self) -> Result<ManifestVersion> {
488        let last_version = self.last_version();
489        let Some((checkpoint, checkpoint_size)) = Self::last_checkpoint(&mut self.store).await?
490        else {
491            return NoCheckpointSnafu {
492                region_id: self.manifest.metadata.region_id,
493                last_version,
494            }
495            .fail();
496        };
497        self.store.reset_manifest_size();
498        self.store
499            .set_checkpoint_file_size(checkpoint.last_version, checkpoint_size);
500        let builder = RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint);
501        let manifest = builder.try_build()?;
502        let last_version = self.set_version(manifest.manifest_version);
503        manifest
504            .removed_files
505            .update_file_removed_cnt_to_stats(&self.stats);
506        self.manifest = Arc::new(manifest);
507        info!(
508            "Installed region manifest from checkpoint: {}, region: {}",
509            checkpoint.last_version, self.manifest.metadata.region_id
510        );
511
512        Ok(last_version)
513    }
514
515    /// Updates the manifest. Returns the current manifest version number.
516    pub async fn update(
517        &mut self,
518        action_list: RegionMetaActionList,
519        is_staging: bool,
520    ) -> Result<ManifestVersion> {
521        let _t = MANIFEST_OP_ELAPSED
522            .with_label_values(&["update"])
523            .start_timer();
524
525        ensure!(
526            !self.stopped,
527            RegionStoppedSnafu {
528                region_id: self.manifest.metadata.region_id,
529            }
530        );
531
532        let version = self.increase_version();
533        self.store
534            .save(version, &action_list.encode()?, is_staging)
535            .await?;
536
537        // For a staging region, the manifest is initially inherited from the previous manifest(i.e., `self.manifest`).
538        // When the staging manifest becomes available, it will be used to construct the new manifest.
539        let mut manifest_builder =
540            if is_staging && let Some(staging_manifest) = self.staging_manifest.as_ref() {
541                RegionManifestBuilder::with_checkpoint(Some(staging_manifest.as_ref().clone()))
542            } else {
543                RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()))
544            };
545
546        for action in action_list.actions {
547            match action {
548                RegionMetaAction::Change(action) => {
549                    manifest_builder.apply_change(version, action);
550                }
551                RegionMetaAction::Edit(action) => {
552                    manifest_builder.apply_edit(version, action);
553                }
554                RegionMetaAction::Remove(_) => {
555                    debug!(
556                        "Unhandled action for region {}, action: {:?}",
557                        self.manifest.metadata.region_id, action
558                    );
559                }
560                RegionMetaAction::Truncate(action) => {
561                    manifest_builder.apply_truncate(version, action);
562                }
563            }
564        }
565
566        if is_staging {
567            let new_manifest = manifest_builder.try_build()?;
568            self.staging_manifest = Some(Arc::new(new_manifest));
569
570            info!(
571                "Skipping checkpoint for region {} in staging mode, manifest version: {}",
572                self.manifest.metadata.region_id, self.manifest.manifest_version
573            );
574        } else {
575            let new_manifest = manifest_builder.try_build()?;
576            new_manifest
577                .removed_files
578                .update_file_removed_cnt_to_stats(&self.stats);
579            let updated_manifest = self
580                .checkpointer
581                .update_manifest_removed_files(new_manifest)?;
582            self.manifest = Arc::new(updated_manifest);
583            self.checkpointer
584                .maybe_do_checkpoint(self.manifest.as_ref());
585        }
586
587        Ok(version)
588    }
589
590    /// Clear deleted files from manifest's `removed_files` field without update version. Notice if datanode exit before checkpoint then new manifest by open region may still contain these deleted files, which is acceptable for gc process.
591    pub fn clear_deleted_files(&mut self, deleted_files: Vec<RemovedFile>) {
592        let mut manifest = (*self.manifest()).clone();
593        manifest.removed_files.clear_deleted_files(deleted_files);
594        self.set_manifest(Arc::new(manifest));
595    }
596
597    pub(crate) fn set_manifest(&mut self, manifest: Arc<RegionManifest>) {
598        self.manifest = manifest;
599    }
600
601    /// Retrieves the current [RegionManifest].
602    pub fn manifest(&self) -> Arc<RegionManifest> {
603        self.manifest.clone()
604    }
605
606    /// Retrieves the current [RegionManifest].
607    pub fn staging_manifest(&self) -> Option<Arc<RegionManifest>> {
608        self.staging_manifest.clone()
609    }
610
611    /// Returns total manifest size.
612    pub fn manifest_usage(&self) -> u64 {
613        self.store.total_manifest_size()
614    }
615
616    /// Returns true if a newer version manifest file is found.
617    ///
618    /// It is typically used in read-only regions to catch up with manifest.
619    /// It doesn't lock the manifest directory in the object store so the result
620    /// may be inaccurate if there are concurrent writes.
621    pub async fn has_update(&self) -> Result<bool> {
622        let last_version = self.last_version();
623
624        let streamer =
625            self.store
626                .manifest_lister(false)
627                .await?
628                .context(error::EmptyManifestDirSnafu {
629                    manifest_dir: self.store.manifest_dir(),
630                })?;
631
632        let need_update = streamer
633            .try_any(|entry| async move {
634                let file_name = entry.name();
635                if is_delta_file(file_name) || is_checkpoint_file(file_name) {
636                    let version = file_version(file_name);
637                    if version > last_version {
638                        return true;
639                    }
640                }
641                false
642            })
643            .await
644            .context(error::OpenDalSnafu)?;
645
646        Ok(need_update)
647    }
648
649    /// Increases last version and returns the increased version.
650    fn increase_version(&mut self) -> ManifestVersion {
651        let previous = self.last_version.fetch_add(1, Ordering::Relaxed);
652        previous + 1
653    }
654
655    /// Sets the last version.
656    fn set_version(&mut self, version: ManifestVersion) -> ManifestVersion {
657        self.last_version.store(version, Ordering::Relaxed);
658        version
659    }
660
661    fn last_version(&self) -> ManifestVersion {
662        self.last_version.load(Ordering::Relaxed)
663    }
664
665    /// Fetches the last [RegionCheckpoint] from storage.
666    ///
667    /// If the checkpoint is not found, returns `None`.
668    /// Otherwise, returns the checkpoint and the size of the checkpoint.
669    pub(crate) async fn last_checkpoint(
670        store: &mut ManifestObjectStore,
671    ) -> Result<Option<(RegionCheckpoint, u64)>> {
672        let last_checkpoint = store.load_last_checkpoint().await?;
673
674        if let Some((_, bytes)) = last_checkpoint {
675            let checkpoint = RegionCheckpoint::decode(&bytes)?;
676            Ok(Some((checkpoint, bytes.len() as u64)))
677        } else {
678            Ok(None)
679        }
680    }
681
682    pub fn store(&self) -> ManifestObjectStore {
683        self.store.clone()
684    }
685
686    #[cfg(test)]
687    pub(crate) fn checkpointer(&self) -> &Checkpointer {
688        &self.checkpointer
689    }
690
691    /// Merge all staged manifest actions into a single action list ready for submission.
692    /// This collects all staging manifests, applies them sequentially, and returns the merged actions.
693    pub(crate) async fn merge_staged_actions(
694        &mut self,
695        region_state: RegionRoleState,
696    ) -> Result<Option<RegionMetaActionList>> {
697        // Only merge if we're in staging mode
698        if region_state != RegionRoleState::Leader(RegionLeaderState::Staging) {
699            return Ok(None);
700        }
701
702        // Fetch all staging manifests
703        let staging_manifests = self.store.fetch_staging_manifests().await?;
704
705        if staging_manifests.is_empty() {
706            info!(
707                "No staging manifests to merge for region {}",
708                self.manifest.metadata.region_id
709            );
710            return Ok(None);
711        }
712
713        info!(
714            "Merging {} staging manifests for region {}",
715            staging_manifests.len(),
716            self.manifest.metadata.region_id
717        );
718
719        // Start with current manifest state as the base
720        let mut merged_actions = Vec::new();
721        let mut latest_version = self.last_version();
722
723        // Apply all staging actions in order
724        for (manifest_version, raw_action_list) in staging_manifests {
725            let action_list = RegionMetaActionList::decode(&raw_action_list)?;
726
727            for action in action_list.actions {
728                merged_actions.push(action);
729            }
730
731            latest_version = latest_version.max(manifest_version);
732        }
733
734        if merged_actions.is_empty() {
735            return Ok(None);
736        }
737
738        info!(
739            "Successfully merged {} actions from staging manifests for region {}, latest version: {}",
740            merged_actions.len(),
741            self.manifest.metadata.region_id,
742            latest_version
743        );
744
745        Ok(Some(RegionMetaActionList::new(merged_actions)))
746    }
747
748    /// Unsets the staging manifest.
749    pub(crate) fn unset_staging_manifest(&mut self) {
750        self.staging_manifest = None;
751    }
752
753    /// Clear all staging manifests.
754    pub(crate) async fn clear_staging_manifest_and_dir(&mut self) -> Result<()> {
755        self.staging_manifest = None;
756        self.store.clear_staging_manifests().await?;
757        info!(
758            "Cleared all staging manifests for region {}",
759            self.manifest.metadata.region_id
760        );
761        Ok(())
762    }
763}
764
765#[cfg(test)]
766impl RegionManifestManager {
767    fn validate_manifest(&self, expect: &RegionMetadataRef, last_version: ManifestVersion) {
768        let manifest = self.manifest();
769        assert_eq!(manifest.metadata, *expect);
770        assert_eq!(self.manifest.manifest_version, self.last_version());
771        assert_eq!(last_version, self.last_version());
772    }
773}
774
775#[cfg(test)]
776mod test {
777    use std::time::Duration;
778
779    use api::v1::SemanticType;
780    use common_datasource::compression::CompressionType;
781    use common_test_util::temp_dir::create_temp_dir;
782    use datatypes::prelude::ConcreteDataType;
783    use datatypes::schema::ColumnSchema;
784    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
785
786    use super::*;
787    use crate::manifest::action::{RegionChange, RegionEdit};
788    use crate::manifest::tests::utils::basic_region_metadata;
789    use crate::test_util::TestEnv;
790
791    #[tokio::test]
792    async fn create_manifest_manager() {
793        let metadata = Arc::new(basic_region_metadata());
794        let env = TestEnv::new().await;
795        let manager = env
796            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
797            .await
798            .unwrap()
799            .unwrap();
800
801        manager.validate_manifest(&metadata, 0);
802    }
803
804    #[tokio::test]
805    async fn open_manifest_manager() {
806        let env = TestEnv::new().await;
807        // Try to opens an empty manifest.
808        assert!(
809            env.create_manifest_manager(CompressionType::Uncompressed, 10, None)
810                .await
811                .unwrap()
812                .is_none()
813        );
814
815        // Creates a manifest.
816        let metadata = Arc::new(basic_region_metadata());
817        let mut manager = env
818            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
819            .await
820            .unwrap()
821            .unwrap();
822        // Stops it.
823        manager.stop().await;
824
825        // Open it.
826        let manager = env
827            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
828            .await
829            .unwrap()
830            .unwrap();
831
832        manager.validate_manifest(&metadata, 0);
833    }
834
835    #[tokio::test]
836    async fn manifest_with_partition_expr_roundtrip() {
837        let env = TestEnv::new().await;
838        let expr_json =
839            r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
840        let mut metadata = basic_region_metadata();
841        metadata.partition_expr = Some(expr_json.to_string());
842        let metadata = Arc::new(metadata);
843        let mut manager = env
844            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
845            .await
846            .unwrap()
847            .unwrap();
848
849        // persisted manifest should contain the same partition_expr JSON
850        let manifest = manager.manifest();
851        assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
852
853        manager.stop().await;
854
855        // Reopen and check again
856        let manager = env
857            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
858            .await
859            .unwrap()
860            .unwrap();
861        let manifest = manager.manifest();
862        assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
863    }
864
865    #[tokio::test]
866    async fn region_change_add_column() {
867        let metadata = Arc::new(basic_region_metadata());
868        let env = TestEnv::new().await;
869        let mut manager = env
870            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
871            .await
872            .unwrap()
873            .unwrap();
874
875        let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
876        new_metadata_builder.push_column_metadata(ColumnMetadata {
877            column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
878            semantic_type: SemanticType::Field,
879            column_id: 252,
880        });
881        let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
882
883        let action_list =
884            RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
885                metadata: new_metadata.clone(),
886                sst_format: FormatType::PrimaryKey,
887            }));
888
889        let current_version = manager.update(action_list, false).await.unwrap();
890        assert_eq!(current_version, 1);
891        manager.validate_manifest(&new_metadata, 1);
892
893        // Reopen the manager.
894        manager.stop().await;
895        let manager = env
896            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
897            .await
898            .unwrap()
899            .unwrap();
900        manager.validate_manifest(&new_metadata, 1);
901    }
902
903    /// Just for test, refer to wal_dir_usage in src/store-api/src/logstore.rs.
904    async fn manifest_dir_usage(path: &str) -> u64 {
905        let mut size = 0;
906        let mut read_dir = tokio::fs::read_dir(path).await.unwrap();
907        while let Ok(dir_entry) = read_dir.next_entry().await {
908            let Some(entry) = dir_entry else {
909                break;
910            };
911            if entry.file_type().await.unwrap().is_file() {
912                let file_name = entry.file_name().into_string().unwrap();
913                if file_name.contains(".checkpoint") || file_name.contains(".json") {
914                    let file_size = entry.metadata().await.unwrap().len() as usize;
915                    debug!("File: {file_name:?}, size: {file_size}");
916                    size += file_size;
917                }
918            }
919        }
920        size as u64
921    }
922
923    #[tokio::test]
924    async fn test_manifest_size() {
925        let metadata = Arc::new(basic_region_metadata());
926        let data_home = create_temp_dir("");
927        let data_home_path = data_home.path().to_str().unwrap().to_string();
928        let env = TestEnv::with_data_home(data_home).await;
929
930        let manifest_dir = format!("{}/manifest", data_home_path);
931
932        let mut manager = env
933            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
934            .await
935            .unwrap()
936            .unwrap();
937
938        let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
939        new_metadata_builder.push_column_metadata(ColumnMetadata {
940            column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
941            semantic_type: SemanticType::Field,
942            column_id: 252,
943        });
944        let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
945
946        let action_list =
947            RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
948                metadata: new_metadata.clone(),
949                sst_format: FormatType::PrimaryKey,
950            }));
951
952        let current_version = manager.update(action_list, false).await.unwrap();
953        assert_eq!(current_version, 1);
954        manager.validate_manifest(&new_metadata, 1);
955
956        // get manifest size
957        let manifest_size = manager.manifest_usage();
958        assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
959
960        // update 10 times nop_action to trigger checkpoint
961        for _ in 0..10 {
962            manager
963                .update(
964                    RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
965                        files_to_add: vec![],
966                        files_to_remove: vec![],
967                        timestamp_ms: None,
968                        compaction_time_window: None,
969                        flushed_entry_id: None,
970                        flushed_sequence: None,
971                        committed_sequence: None,
972                    })]),
973                    false,
974                )
975                .await
976                .unwrap();
977        }
978
979        while manager.checkpointer.is_doing_checkpoint() {
980            tokio::time::sleep(Duration::from_millis(10)).await;
981        }
982
983        // check manifest size again
984        let manifest_size = manager.manifest_usage();
985        assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
986
987        // Reopen the manager,
988        // we just calculate the size from the latest checkpoint file
989        manager.stop().await;
990        let manager = env
991            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
992            .await
993            .unwrap()
994            .unwrap();
995        manager.validate_manifest(&new_metadata, 11);
996
997        // get manifest size again
998        let manifest_size = manager.manifest_usage();
999        assert_eq!(manifest_size, 1378);
1000    }
1001}