mito2/manifest/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use common_datasource::compression::CompressionType;
19use common_telemetry::{debug, info};
20use futures::TryStreamExt;
21use object_store::ObjectStore;
22use snafu::{OptionExt, ResultExt, ensure};
23use store_api::metadata::RegionMetadataRef;
24use store_api::storage::FileId;
25use store_api::{MAX_VERSION, MIN_VERSION, ManifestVersion};
26
27use crate::cache::manifest_cache::ManifestCache;
28use crate::config::MitoConfig;
29use crate::error::{
30    self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result,
31};
32use crate::manifest::action::{
33    RegionChange, RegionCheckpoint, RegionEdit, RegionManifest, RegionManifestBuilder,
34    RegionMetaAction, RegionMetaActionList,
35};
36use crate::manifest::checkpointer::Checkpointer;
37use crate::manifest::storage::{
38    ManifestObjectStore, file_version, is_checkpoint_file, is_delta_file, manifest_compress_type,
39    manifest_dir,
40};
41use crate::metrics::MANIFEST_OP_ELAPSED;
42use crate::region::{ManifestStats, RegionLeaderState, RegionRoleState};
43use crate::sst::FormatType;
44
45/// Options for [RegionManifestManager].
46#[derive(Debug, Clone)]
47pub struct RegionManifestOptions {
48    /// Directory to store manifest.
49    pub manifest_dir: String,
50    pub object_store: ObjectStore,
51    pub compress_type: CompressionType,
52    /// Interval of version ([ManifestVersion](store_api::manifest::ManifestVersion)) between two checkpoints.
53    /// Set to 0 to disable checkpoint.
54    pub checkpoint_distance: u64,
55    pub remove_file_options: RemoveFileOptions,
56    /// Optional cache for manifest files.
57    pub manifest_cache: Option<ManifestCache>,
58}
59
60impl RegionManifestOptions {
61    /// Creates a new [RegionManifestOptions] with the given region directory, object store, and configuration.
62    pub fn new(config: &MitoConfig, region_dir: &str, object_store: &ObjectStore) -> Self {
63        RegionManifestOptions {
64            manifest_dir: manifest_dir(region_dir),
65            object_store: object_store.clone(),
66            // We don't allow users to set the compression algorithm as we use it as a file suffix.
67            // Currently, the manifest storage doesn't have good support for changing compression algorithms.
68            compress_type: manifest_compress_type(config.compress_manifest),
69            checkpoint_distance: config.manifest_checkpoint_distance,
70            remove_file_options: RemoveFileOptions {
71                enable_gc: config.gc.enable,
72            },
73            manifest_cache: None,
74        }
75    }
76}
77
78/// Options for updating `removed_files` field in [RegionManifest].
79#[derive(Debug, Clone)]
80#[cfg_attr(any(test, feature = "test"), derive(Default))]
81pub struct RemoveFileOptions {
82    /// Whether GC is enabled. If not, the removed files should always be empty when persisting manifest.
83    pub enable_gc: bool,
84}
85
86// rewrite note:
87// trait Checkpoint -> struct RegionCheckpoint
88// trait MetaAction -> struct RegionMetaActionList
89// trait MetaActionIterator -> struct MetaActionIteratorImpl
90
91#[cfg_attr(doc, aquamarine::aquamarine)]
92/// Manage region's manifest. Provide APIs to access (create/modify/recover) region's persisted
93/// metadata.
94///
95/// ```mermaid
96/// classDiagram
97/// class RegionManifestManager {
98///     -ManifestObjectStore store
99///     -RegionManifestOptions options
100///     -RegionManifest manifest
101///     +new() RegionManifestManager
102///     +open() Option~RegionManifestManager~
103///     +stop()
104///     +update(RegionMetaActionList action_list) ManifestVersion
105///     +manifest() RegionManifest
106/// }
107/// class ManifestObjectStore {
108///     -ObjectStore object_store
109/// }
110/// class RegionChange {
111///     -RegionMetadataRef metadata
112/// }
113/// class RegionEdit {
114///     -VersionNumber region_version
115///     -Vec~FileMeta~ files_to_add
116///     -Vec~FileMeta~ files_to_remove
117///     -SequenceNumber flushed_sequence
118/// }
119/// class RegionRemove {
120///     -RegionId region_id
121/// }
122/// RegionManifestManager o-- ManifestObjectStore
123/// RegionManifestManager o-- RegionManifest
124/// RegionManifestManager o-- RegionManifestOptions
125/// RegionManifestManager -- RegionMetaActionList
126/// RegionManifestManager -- RegionCheckpoint
127/// ManifestObjectStore o-- ObjectStore
128/// RegionMetaActionList o-- RegionMetaAction
129/// RegionMetaAction o-- ProtocolAction
130/// RegionMetaAction o-- RegionChange
131/// RegionMetaAction o-- RegionEdit
132/// RegionMetaAction o-- RegionRemove
133/// RegionChange o-- RegionMetadata
134/// RegionEdit o-- FileMeta
135///
136/// class RegionManifest {
137///     -RegionMetadataRef metadata
138///     -HashMap&lt;FileId, FileMeta&gt; files
139///     -ManifestVersion manifest_version
140/// }
141/// class RegionMetadata
142/// class FileMeta
143/// RegionManifest o-- RegionMetadata
144/// RegionManifest o-- FileMeta
145///
146/// class RegionCheckpoint {
147///     -ManifestVersion last_version
148///     -Option~RegionManifest~ checkpoint
149/// }
150/// RegionCheckpoint o-- RegionManifest
151/// ```
152#[derive(Debug)]
153pub struct RegionManifestManager {
154    store: ManifestObjectStore,
155    last_version: Arc<AtomicU64>,
156    checkpointer: Checkpointer,
157    manifest: Arc<RegionManifest>,
158    // Staging manifest is used to store the manifest of the staging region before it becomes available.
159    // It is initially inherited from the previous manifest(i.e., `self.manifest`).
160    // When the staging manifest becomes available, it will be used to construct the new manifest.
161    staging_manifest: Option<Arc<RegionManifest>>,
162    stats: ManifestStats,
163    stopped: bool,
164}
165
166impl RegionManifestManager {
167    /// Constructs a region's manifest and persist it.
168    pub async fn new(
169        metadata: RegionMetadataRef,
170        flushed_entry_id: u64,
171        options: RegionManifestOptions,
172        sst_format: FormatType,
173        stats: &ManifestStats,
174    ) -> Result<Self> {
175        // construct storage
176        let mut store = ManifestObjectStore::new(
177            &options.manifest_dir,
178            options.object_store.clone(),
179            options.compress_type,
180            stats.total_manifest_size.clone(),
181            options.manifest_cache.clone(),
182        );
183        let manifest_version = stats.manifest_version.clone();
184
185        info!(
186            "Creating region manifest in {} with metadata {:?}, flushed_entry_id: {}",
187            options.manifest_dir, metadata, flushed_entry_id
188        );
189
190        let version = MIN_VERSION;
191        let mut manifest_builder = RegionManifestBuilder::default();
192        // set the initial metadata.
193        manifest_builder.apply_change(
194            version,
195            RegionChange {
196                metadata: metadata.clone(),
197                sst_format,
198            },
199        );
200        let manifest = manifest_builder.try_build()?;
201        let region_id = metadata.region_id;
202
203        debug!(
204            "Build region manifest in {}, manifest: {:?}",
205            options.manifest_dir, manifest
206        );
207
208        let mut actions = vec![RegionMetaAction::Change(RegionChange {
209            metadata,
210            sst_format,
211        })];
212        if flushed_entry_id > 0 {
213            actions.push(RegionMetaAction::Edit(RegionEdit {
214                files_to_add: vec![],
215                files_to_remove: vec![],
216                timestamp_ms: None,
217                compaction_time_window: None,
218                flushed_entry_id: Some(flushed_entry_id),
219                flushed_sequence: None,
220                committed_sequence: None,
221            }));
222        }
223
224        // Persist region change.
225        let action_list = RegionMetaActionList::new(actions);
226
227        // New region is not in staging mode.
228        // TODO(ruihang): add staging mode support if needed.
229        store.save(version, &action_list.encode()?, false).await?;
230
231        let checkpointer = Checkpointer::new(region_id, options, store.clone(), MIN_VERSION);
232        manifest_version.store(version, Ordering::Relaxed);
233        manifest
234            .removed_files
235            .update_file_removed_cnt_to_stats(stats);
236        Ok(Self {
237            store,
238            last_version: manifest_version,
239            checkpointer,
240            manifest: Arc::new(manifest),
241            staging_manifest: None,
242            stats: stats.clone(),
243            stopped: false,
244        })
245    }
246
247    /// Opens an existing manifest.
248    ///
249    /// Returns `Ok(None)` if no such manifest.
250    pub async fn open(
251        options: RegionManifestOptions,
252        stats: &ManifestStats,
253    ) -> Result<Option<Self>> {
254        let _t = MANIFEST_OP_ELAPSED
255            .with_label_values(&["open"])
256            .start_timer();
257
258        // construct storage
259        let mut store = ManifestObjectStore::new(
260            &options.manifest_dir,
261            options.object_store.clone(),
262            options.compress_type,
263            stats.total_manifest_size.clone(),
264            options.manifest_cache.clone(),
265        );
266        let manifest_version = stats.manifest_version.clone();
267
268        // recover from storage
269        // construct manifest builder
270        // calculate the manifest size from the latest checkpoint
271        let mut version = MIN_VERSION;
272        let checkpoint = Self::last_checkpoint(&mut store).await?;
273        let last_checkpoint_version = checkpoint
274            .as_ref()
275            .map(|(checkpoint, _)| checkpoint.last_version)
276            .unwrap_or(MIN_VERSION);
277        let mut manifest_builder = if let Some((checkpoint, _)) = checkpoint {
278            info!(
279                "Recover region manifest {} from checkpoint version {}",
280                options.manifest_dir, checkpoint.last_version
281            );
282            version = version.max(checkpoint.last_version + 1);
283            RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint)
284        } else {
285            info!(
286                "Checkpoint not found in {}, build manifest from scratch",
287                options.manifest_dir
288            );
289            RegionManifestBuilder::default()
290        };
291
292        // apply actions from storage
293        let manifests = store.fetch_manifests(version, MAX_VERSION).await?;
294
295        for (manifest_version, raw_action_list) in manifests {
296            let action_list = RegionMetaActionList::decode(&raw_action_list)?;
297            // set manifest size after last checkpoint
298            store.set_delta_file_size(manifest_version, raw_action_list.len() as u64);
299            for action in action_list.actions {
300                match action {
301                    RegionMetaAction::Change(action) => {
302                        manifest_builder.apply_change(manifest_version, action);
303                    }
304                    RegionMetaAction::Edit(action) => {
305                        manifest_builder.apply_edit(manifest_version, action);
306                    }
307                    RegionMetaAction::Remove(_) => {
308                        debug!(
309                            "Unhandled action in {}, action: {:?}",
310                            options.manifest_dir, action
311                        );
312                    }
313                    RegionMetaAction::Truncate(action) => {
314                        manifest_builder.apply_truncate(manifest_version, action);
315                    }
316                }
317            }
318        }
319
320        // set the initial metadata if necessary
321        if !manifest_builder.contains_metadata() {
322            debug!("No region manifest in {}", options.manifest_dir);
323            return Ok(None);
324        }
325
326        let manifest = manifest_builder.try_build()?;
327        debug!(
328            "Recovered region manifest from {}, manifest: {:?}",
329            options.manifest_dir, manifest
330        );
331        let version = manifest.manifest_version;
332
333        let checkpointer = Checkpointer::new(
334            manifest.metadata.region_id,
335            options,
336            store.clone(),
337            last_checkpoint_version,
338        );
339        manifest_version.store(version, Ordering::Relaxed);
340        manifest
341            .removed_files
342            .update_file_removed_cnt_to_stats(stats);
343        Ok(Some(Self {
344            store,
345            last_version: manifest_version,
346            checkpointer,
347            manifest: Arc::new(manifest),
348            // TODO(weny): open the staging manifest if exists.
349            staging_manifest: None,
350            stats: stats.clone(),
351            stopped: false,
352        }))
353    }
354
355    /// Stops the manager.
356    pub async fn stop(&mut self) {
357        self.stopped = true;
358    }
359
360    /// Installs the manifest changes from the current version to the target version (inclusive).
361    ///
362    /// Returns installed version.
363    /// **Note**: This function is not guaranteed to install the target version strictly.
364    /// The installed version may be greater than the target version.
365    pub async fn install_manifest_to(
366        &mut self,
367        target_version: ManifestVersion,
368    ) -> Result<ManifestVersion> {
369        let _t = MANIFEST_OP_ELAPSED
370            .with_label_values(&["install_manifest_to"])
371            .start_timer();
372
373        let last_version = self.last_version();
374        // Case 1: If the target version is less than the current version, return the current version.
375        if last_version >= target_version {
376            debug!(
377                "Target version {} is less than or equal to the current version {}, region: {}, skip install",
378                target_version, last_version, self.manifest.metadata.region_id
379            );
380            return Ok(last_version);
381        }
382
383        ensure!(
384            !self.stopped,
385            RegionStoppedSnafu {
386                region_id: self.manifest.metadata.region_id,
387            }
388        );
389
390        let region_id = self.manifest.metadata.region_id;
391        // Fetches manifests from the last version strictly.
392        let mut manifests = self
393            .store
394            // Invariant: last_version < target_version.
395            .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
396            .await?;
397
398        // Case 2: No manifests in range: [current_version+1, target_version+1)
399        //
400        // |---------Has been deleted------------|     [Checkpoint Version]...[Latest Version]
401        //                                                                    [Leader region]
402        // [Current Version]......[Target Version]
403        // [Follower region]
404        if manifests.is_empty() {
405            info!(
406                "Manifests are not strict from {}, region: {}, tries to install the last checkpoint",
407                last_version, self.manifest.metadata.region_id
408            );
409            let last_version = self.install_last_checkpoint().await?;
410            // Case 2.1: If the installed checkpoint version is greater than or equal to the target version, return the last version.
411            if last_version >= target_version {
412                return Ok(last_version);
413            }
414
415            // Fetches manifests from the installed version strictly.
416            manifests = self
417                .store
418                // Invariant: last_version < target_version.
419                .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
420                .await?;
421        }
422
423        if manifests.is_empty() {
424            return NoManifestsSnafu {
425                region_id: self.manifest.metadata.region_id,
426                start_version: last_version + 1,
427                end_version: target_version + 1,
428                last_version,
429            }
430            .fail();
431        }
432
433        debug_assert_eq!(manifests.first().unwrap().0, last_version + 1);
434        let mut manifest_builder =
435            RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
436
437        for (manifest_version, raw_action_list) in manifests {
438            self.store
439                .set_delta_file_size(manifest_version, raw_action_list.len() as u64);
440            let action_list = RegionMetaActionList::decode(&raw_action_list)?;
441            for action in action_list.actions {
442                match action {
443                    RegionMetaAction::Change(action) => {
444                        manifest_builder.apply_change(manifest_version, action);
445                    }
446                    RegionMetaAction::Edit(action) => {
447                        manifest_builder.apply_edit(manifest_version, action);
448                    }
449                    RegionMetaAction::Remove(_) => {
450                        debug!(
451                            "Unhandled action for region {}, action: {:?}",
452                            self.manifest.metadata.region_id, action
453                        );
454                    }
455                    RegionMetaAction::Truncate(action) => {
456                        manifest_builder.apply_truncate(manifest_version, action);
457                    }
458                }
459            }
460        }
461
462        let new_manifest = manifest_builder.try_build()?;
463        ensure!(
464            new_manifest.manifest_version >= target_version,
465            InstallManifestToSnafu {
466                region_id: self.manifest.metadata.region_id,
467                target_version,
468                available_version: new_manifest.manifest_version,
469                last_version,
470            }
471        );
472
473        let version = self.last_version();
474        new_manifest
475            .removed_files
476            .update_file_removed_cnt_to_stats(&self.stats);
477        self.manifest = Arc::new(new_manifest);
478        let last_version = self.set_version(self.manifest.manifest_version);
479        info!(
480            "Install manifest changes from {} to {}, region: {}",
481            version, last_version, self.manifest.metadata.region_id
482        );
483
484        Ok(last_version)
485    }
486
487    /// Installs the last checkpoint.
488    pub(crate) async fn install_last_checkpoint(&mut self) -> Result<ManifestVersion> {
489        let last_version = self.last_version();
490        let Some((checkpoint, checkpoint_size)) = Self::last_checkpoint(&mut self.store).await?
491        else {
492            return NoCheckpointSnafu {
493                region_id: self.manifest.metadata.region_id,
494                last_version,
495            }
496            .fail();
497        };
498        self.store.reset_manifest_size();
499        self.store
500            .set_checkpoint_file_size(checkpoint.last_version, checkpoint_size);
501        let builder = RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint);
502        let manifest = builder.try_build()?;
503        let last_version = self.set_version(manifest.manifest_version);
504        manifest
505            .removed_files
506            .update_file_removed_cnt_to_stats(&self.stats);
507        self.manifest = Arc::new(manifest);
508        info!(
509            "Installed region manifest from checkpoint: {}, region: {}",
510            checkpoint.last_version, self.manifest.metadata.region_id
511        );
512
513        Ok(last_version)
514    }
515
516    /// Updates the manifest. Returns the current manifest version number.
517    pub async fn update(
518        &mut self,
519        action_list: RegionMetaActionList,
520        is_staging: bool,
521    ) -> Result<ManifestVersion> {
522        let _t = MANIFEST_OP_ELAPSED
523            .with_label_values(&["update"])
524            .start_timer();
525
526        ensure!(
527            !self.stopped,
528            RegionStoppedSnafu {
529                region_id: self.manifest.metadata.region_id,
530            }
531        );
532
533        let version = self.increase_version();
534        self.store
535            .save(version, &action_list.encode()?, is_staging)
536            .await?;
537
538        // For a staging region, the manifest is initially inherited from the previous manifest(i.e., `self.manifest`).
539        // When the staging manifest becomes available, it will be used to construct the new manifest.
540        let mut manifest_builder =
541            if is_staging && let Some(staging_manifest) = self.staging_manifest.as_ref() {
542                RegionManifestBuilder::with_checkpoint(Some(staging_manifest.as_ref().clone()))
543            } else {
544                RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()))
545            };
546
547        for action in action_list.actions {
548            match action {
549                RegionMetaAction::Change(action) => {
550                    manifest_builder.apply_change(version, action);
551                }
552                RegionMetaAction::Edit(action) => {
553                    manifest_builder.apply_edit(version, action);
554                }
555                RegionMetaAction::Remove(_) => {
556                    debug!(
557                        "Unhandled action for region {}, action: {:?}",
558                        self.manifest.metadata.region_id, action
559                    );
560                }
561                RegionMetaAction::Truncate(action) => {
562                    manifest_builder.apply_truncate(version, action);
563                }
564            }
565        }
566
567        if is_staging {
568            let new_manifest = manifest_builder.try_build()?;
569            self.staging_manifest = Some(Arc::new(new_manifest));
570
571            info!(
572                "Skipping checkpoint for region {} in staging mode, manifest version: {}",
573                self.manifest.metadata.region_id, self.manifest.manifest_version
574            );
575        } else {
576            let new_manifest = manifest_builder.try_build()?;
577            new_manifest
578                .removed_files
579                .update_file_removed_cnt_to_stats(&self.stats);
580            let updated_manifest = self
581                .checkpointer
582                .update_manifest_removed_files(new_manifest)?;
583            self.manifest = Arc::new(updated_manifest);
584            self.checkpointer
585                .maybe_do_checkpoint(self.manifest.as_ref());
586        }
587
588        Ok(version)
589    }
590
591    /// Clear deleted files from manifest's `removed_files` field without update version. Notice if datanode exit before checkpoint then new manifest by open region may still contain these deleted files, which is acceptable for gc process.
592    pub fn clear_deleted_files(&mut self, deleted_files: Vec<FileId>) {
593        let mut manifest = (*self.manifest()).clone();
594        manifest.removed_files.clear_deleted_files(deleted_files);
595        self.set_manifest(Arc::new(manifest));
596    }
597
598    pub(crate) fn set_manifest(&mut self, manifest: Arc<RegionManifest>) {
599        self.manifest = manifest;
600    }
601
602    /// Retrieves the current [RegionManifest].
603    pub fn manifest(&self) -> Arc<RegionManifest> {
604        self.manifest.clone()
605    }
606
607    /// Retrieves the current [RegionManifest].
608    pub fn staging_manifest(&self) -> Option<Arc<RegionManifest>> {
609        self.staging_manifest.clone()
610    }
611
612    /// Returns total manifest size.
613    pub fn manifest_usage(&self) -> u64 {
614        self.store.total_manifest_size()
615    }
616
617    /// Returns true if a newer version manifest file is found.
618    ///
619    /// It is typically used in read-only regions to catch up with manifest.
620    /// It doesn't lock the manifest directory in the object store so the result
621    /// may be inaccurate if there are concurrent writes.
622    pub async fn has_update(&self) -> Result<bool> {
623        let last_version = self.last_version();
624
625        let streamer =
626            self.store
627                .manifest_lister(false)
628                .await?
629                .context(error::EmptyManifestDirSnafu {
630                    manifest_dir: self.store.manifest_dir(),
631                })?;
632
633        let need_update = streamer
634            .try_any(|entry| async move {
635                let file_name = entry.name();
636                if is_delta_file(file_name) || is_checkpoint_file(file_name) {
637                    let version = file_version(file_name);
638                    if version > last_version {
639                        return true;
640                    }
641                }
642                false
643            })
644            .await
645            .context(error::OpenDalSnafu)?;
646
647        Ok(need_update)
648    }
649
650    /// Increases last version and returns the increased version.
651    fn increase_version(&mut self) -> ManifestVersion {
652        let previous = self.last_version.fetch_add(1, Ordering::Relaxed);
653        previous + 1
654    }
655
656    /// Sets the last version.
657    fn set_version(&mut self, version: ManifestVersion) -> ManifestVersion {
658        self.last_version.store(version, Ordering::Relaxed);
659        version
660    }
661
662    fn last_version(&self) -> ManifestVersion {
663        self.last_version.load(Ordering::Relaxed)
664    }
665
666    /// Fetches the last [RegionCheckpoint] from storage.
667    ///
668    /// If the checkpoint is not found, returns `None`.
669    /// Otherwise, returns the checkpoint and the size of the checkpoint.
670    pub(crate) async fn last_checkpoint(
671        store: &mut ManifestObjectStore,
672    ) -> Result<Option<(RegionCheckpoint, u64)>> {
673        let last_checkpoint = store.load_last_checkpoint().await?;
674
675        if let Some((_, bytes)) = last_checkpoint {
676            let checkpoint = RegionCheckpoint::decode(&bytes)?;
677            Ok(Some((checkpoint, bytes.len() as u64)))
678        } else {
679            Ok(None)
680        }
681    }
682
683    pub fn store(&self) -> ManifestObjectStore {
684        self.store.clone()
685    }
686
687    #[cfg(test)]
688    pub(crate) fn checkpointer(&self) -> &Checkpointer {
689        &self.checkpointer
690    }
691
692    /// Merge all staged manifest actions into a single action list ready for submission.
693    /// This collects all staging manifests, applies them sequentially, and returns the merged actions.
694    pub(crate) async fn merge_staged_actions(
695        &mut self,
696        region_state: RegionRoleState,
697    ) -> Result<Option<RegionMetaActionList>> {
698        // Only merge if we're in staging mode
699        if region_state != RegionRoleState::Leader(RegionLeaderState::Staging) {
700            return Ok(None);
701        }
702
703        // Fetch all staging manifests
704        let staging_manifests = self.store.fetch_staging_manifests().await?;
705
706        if staging_manifests.is_empty() {
707            info!(
708                "No staging manifests to merge for region {}",
709                self.manifest.metadata.region_id
710            );
711            return Ok(None);
712        }
713
714        info!(
715            "Merging {} staging manifests for region {}",
716            staging_manifests.len(),
717            self.manifest.metadata.region_id
718        );
719
720        // Start with current manifest state as the base
721        let mut merged_actions = Vec::new();
722        let mut latest_version = self.last_version();
723
724        // Apply all staging actions in order
725        for (manifest_version, raw_action_list) in staging_manifests {
726            let action_list = RegionMetaActionList::decode(&raw_action_list)?;
727
728            for action in action_list.actions {
729                merged_actions.push(action);
730            }
731
732            latest_version = latest_version.max(manifest_version);
733        }
734
735        if merged_actions.is_empty() {
736            return Ok(None);
737        }
738
739        info!(
740            "Successfully merged {} actions from staging manifests for region {}, latest version: {}",
741            merged_actions.len(),
742            self.manifest.metadata.region_id,
743            latest_version
744        );
745
746        Ok(Some(RegionMetaActionList::new(merged_actions)))
747    }
748
749    /// Unsets the staging manifest.
750    pub(crate) fn unset_staging_manifest(&mut self) {
751        self.staging_manifest = None;
752    }
753
754    /// Clear all staging manifests.
755    pub(crate) async fn clear_staging_manifest_and_dir(&mut self) -> Result<()> {
756        self.staging_manifest = None;
757        self.store.clear_staging_manifests().await?;
758        info!(
759            "Cleared all staging manifests for region {}",
760            self.manifest.metadata.region_id
761        );
762        Ok(())
763    }
764}
765
766#[cfg(test)]
767impl RegionManifestManager {
768    fn validate_manifest(&self, expect: &RegionMetadataRef, last_version: ManifestVersion) {
769        let manifest = self.manifest();
770        assert_eq!(manifest.metadata, *expect);
771        assert_eq!(self.manifest.manifest_version, self.last_version());
772        assert_eq!(last_version, self.last_version());
773    }
774}
775
776#[cfg(test)]
777mod test {
778    use std::time::Duration;
779
780    use api::v1::SemanticType;
781    use common_datasource::compression::CompressionType;
782    use common_test_util::temp_dir::create_temp_dir;
783    use datatypes::prelude::ConcreteDataType;
784    use datatypes::schema::ColumnSchema;
785    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
786
787    use super::*;
788    use crate::manifest::action::{RegionChange, RegionEdit};
789    use crate::manifest::tests::utils::basic_region_metadata;
790    use crate::test_util::TestEnv;
791
792    #[tokio::test]
793    async fn create_manifest_manager() {
794        let metadata = Arc::new(basic_region_metadata());
795        let env = TestEnv::new().await;
796        let manager = env
797            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
798            .await
799            .unwrap()
800            .unwrap();
801
802        manager.validate_manifest(&metadata, 0);
803    }
804
805    #[tokio::test]
806    async fn open_manifest_manager() {
807        let env = TestEnv::new().await;
808        // Try to opens an empty manifest.
809        assert!(
810            env.create_manifest_manager(CompressionType::Uncompressed, 10, None)
811                .await
812                .unwrap()
813                .is_none()
814        );
815
816        // Creates a manifest.
817        let metadata = Arc::new(basic_region_metadata());
818        let mut manager = env
819            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
820            .await
821            .unwrap()
822            .unwrap();
823        // Stops it.
824        manager.stop().await;
825
826        // Open it.
827        let manager = env
828            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
829            .await
830            .unwrap()
831            .unwrap();
832
833        manager.validate_manifest(&metadata, 0);
834    }
835
836    #[tokio::test]
837    async fn manifest_with_partition_expr_roundtrip() {
838        let env = TestEnv::new().await;
839        let expr_json =
840            r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
841        let mut metadata = basic_region_metadata();
842        metadata.partition_expr = Some(expr_json.to_string());
843        let metadata = Arc::new(metadata);
844        let mut manager = env
845            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
846            .await
847            .unwrap()
848            .unwrap();
849
850        // persisted manifest should contain the same partition_expr JSON
851        let manifest = manager.manifest();
852        assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
853
854        manager.stop().await;
855
856        // Reopen and check again
857        let manager = env
858            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
859            .await
860            .unwrap()
861            .unwrap();
862        let manifest = manager.manifest();
863        assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
864    }
865
866    #[tokio::test]
867    async fn region_change_add_column() {
868        let metadata = Arc::new(basic_region_metadata());
869        let env = TestEnv::new().await;
870        let mut manager = env
871            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
872            .await
873            .unwrap()
874            .unwrap();
875
876        let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
877        new_metadata_builder.push_column_metadata(ColumnMetadata {
878            column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
879            semantic_type: SemanticType::Field,
880            column_id: 252,
881        });
882        let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
883
884        let action_list =
885            RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
886                metadata: new_metadata.clone(),
887                sst_format: FormatType::PrimaryKey,
888            }));
889
890        let current_version = manager.update(action_list, false).await.unwrap();
891        assert_eq!(current_version, 1);
892        manager.validate_manifest(&new_metadata, 1);
893
894        // Reopen the manager.
895        manager.stop().await;
896        let manager = env
897            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
898            .await
899            .unwrap()
900            .unwrap();
901        manager.validate_manifest(&new_metadata, 1);
902    }
903
904    /// Just for test, refer to wal_dir_usage in src/store-api/src/logstore.rs.
905    async fn manifest_dir_usage(path: &str) -> u64 {
906        let mut size = 0;
907        let mut read_dir = tokio::fs::read_dir(path).await.unwrap();
908        while let Ok(dir_entry) = read_dir.next_entry().await {
909            let Some(entry) = dir_entry else {
910                break;
911            };
912            if entry.file_type().await.unwrap().is_file() {
913                let file_name = entry.file_name().into_string().unwrap();
914                if file_name.contains(".checkpoint") || file_name.contains(".json") {
915                    let file_size = entry.metadata().await.unwrap().len() as usize;
916                    debug!("File: {file_name:?}, size: {file_size}");
917                    size += file_size;
918                }
919            }
920        }
921        size as u64
922    }
923
924    #[tokio::test]
925    async fn test_manifest_size() {
926        let metadata = Arc::new(basic_region_metadata());
927        let data_home = create_temp_dir("");
928        let data_home_path = data_home.path().to_str().unwrap().to_string();
929        let env = TestEnv::with_data_home(data_home).await;
930
931        let manifest_dir = format!("{}/manifest", data_home_path);
932
933        let mut manager = env
934            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
935            .await
936            .unwrap()
937            .unwrap();
938
939        let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
940        new_metadata_builder.push_column_metadata(ColumnMetadata {
941            column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
942            semantic_type: SemanticType::Field,
943            column_id: 252,
944        });
945        let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
946
947        let action_list =
948            RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
949                metadata: new_metadata.clone(),
950                sst_format: FormatType::PrimaryKey,
951            }));
952
953        let current_version = manager.update(action_list, false).await.unwrap();
954        assert_eq!(current_version, 1);
955        manager.validate_manifest(&new_metadata, 1);
956
957        // get manifest size
958        let manifest_size = manager.manifest_usage();
959        assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
960
961        // update 10 times nop_action to trigger checkpoint
962        for _ in 0..10 {
963            manager
964                .update(
965                    RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
966                        files_to_add: vec![],
967                        files_to_remove: vec![],
968                        timestamp_ms: None,
969                        compaction_time_window: None,
970                        flushed_entry_id: None,
971                        flushed_sequence: None,
972                        committed_sequence: None,
973                    })]),
974                    false,
975                )
976                .await
977                .unwrap();
978        }
979
980        while manager.checkpointer.is_doing_checkpoint() {
981            tokio::time::sleep(Duration::from_millis(10)).await;
982        }
983
984        // check manifest size again
985        let manifest_size = manager.manifest_usage();
986        assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
987
988        // Reopen the manager,
989        // we just calculate the size from the latest checkpoint file
990        manager.stop().await;
991        let manager = env
992            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
993            .await
994            .unwrap()
995            .unwrap();
996        manager.validate_manifest(&new_metadata, 11);
997
998        // get manifest size again
999        let manifest_size = manager.manifest_usage();
1000        assert_eq!(manifest_size, 1378);
1001    }
1002}