Skip to main content

mito2/manifest/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::time::Instant;
18
19use common_datasource::compression::CompressionType;
20use common_telemetry::{debug, info};
21use futures::TryStreamExt;
22use object_store::ObjectStore;
23use snafu::{OptionExt, ResultExt, ensure};
24use store_api::metadata::RegionMetadataRef;
25use store_api::{MAX_VERSION, MIN_VERSION, ManifestVersion};
26
27use crate::cache::manifest_cache::ManifestCache;
28use crate::config::MitoConfig;
29use crate::error::{
30    self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result,
31};
32use crate::manifest::action::{
33    RegionChange, RegionCheckpoint, RegionEdit, RegionManifest, RegionManifestBuilder,
34    RegionMetaAction, RegionMetaActionList, RemovedFile,
35};
36use crate::manifest::checkpointer::Checkpointer;
37use crate::manifest::storage::{
38    ManifestObjectStore, file_version, is_checkpoint_file, is_delta_file, list_start_after,
39    manifest_compress_type, manifest_dir,
40};
41use crate::metrics::MANIFEST_OP_ELAPSED;
42use crate::region::{ManifestStats, RegionLeaderState, RegionRoleState};
43use crate::sst::FormatType;
44
45/// Options for [RegionManifestManager].
46#[derive(Debug, Clone)]
47pub struct RegionManifestOptions {
48    /// Directory to store manifest.
49    pub manifest_dir: String,
50    pub object_store: ObjectStore,
51    pub compress_type: CompressionType,
52    /// Interval of version ([ManifestVersion](store_api::manifest::ManifestVersion)) between two checkpoints.
53    /// Set to 0 to disable checkpoint.
54    pub checkpoint_distance: u64,
55    pub remove_file_options: RemoveFileOptions,
56    /// Optional cache for manifest files.
57    pub manifest_cache: Option<ManifestCache>,
58}
59
60impl RegionManifestOptions {
61    /// Creates a new [RegionManifestOptions] with the given region directory, object store, and configuration.
62    pub fn new(config: &MitoConfig, region_dir: &str, object_store: &ObjectStore) -> Self {
63        RegionManifestOptions {
64            manifest_dir: manifest_dir(region_dir),
65            object_store: object_store.clone(),
66            // We don't allow users to set the compression algorithm as we use it as a file suffix.
67            // Currently, the manifest storage doesn't have good support for changing compression algorithms.
68            compress_type: manifest_compress_type(config.compress_manifest),
69            checkpoint_distance: config.manifest_checkpoint_distance,
70            remove_file_options: RemoveFileOptions {
71                enable_gc: config.gc.enable,
72            },
73            manifest_cache: None,
74        }
75    }
76}
77
78/// Options for updating `removed_files` field in [RegionManifest].
79#[derive(Debug, Clone)]
80#[cfg_attr(any(test, feature = "test"), derive(Default))]
81pub struct RemoveFileOptions {
82    /// Whether GC is enabled. If not, the removed files should always be empty when persisting manifest.
83    pub enable_gc: bool,
84}
85
86// rewrite note:
87// trait Checkpoint -> struct RegionCheckpoint
88// trait MetaAction -> struct RegionMetaActionList
89// trait MetaActionIterator -> struct MetaActionIteratorImpl
90
91#[cfg_attr(doc, aquamarine::aquamarine)]
92/// Manage region's manifest. Provide APIs to access (create/modify/recover) region's persisted
93/// metadata.
94///
95/// ```mermaid
96/// classDiagram
97/// class RegionManifestManager {
98///     -ManifestObjectStore store
99///     -RegionManifestOptions options
100///     -RegionManifest manifest
101///     +new() RegionManifestManager
102///     +open() Option~RegionManifestManager~
103///     +stop()
104///     +update(RegionMetaActionList action_list) ManifestVersion
105///     +manifest() RegionManifest
106/// }
107/// class ManifestObjectStore {
108///     -ObjectStore object_store
109/// }
110/// class RegionChange {
111///     -RegionMetadataRef metadata
112/// }
113/// class RegionEdit {
114///     -VersionNumber region_version
115///     -Vec~FileMeta~ files_to_add
116///     -Vec~FileMeta~ files_to_remove
117///     -SequenceNumber flushed_sequence
118/// }
119/// class RegionRemove {
120///     -RegionId region_id
121/// }
122/// RegionManifestManager o-- ManifestObjectStore
123/// RegionManifestManager o-- RegionManifest
124/// RegionManifestManager o-- RegionManifestOptions
125/// RegionManifestManager -- RegionMetaActionList
126/// RegionManifestManager -- RegionCheckpoint
127/// ManifestObjectStore o-- ObjectStore
128/// RegionMetaActionList o-- RegionMetaAction
129/// RegionMetaAction o-- ProtocolAction
130/// RegionMetaAction o-- RegionChange
131/// RegionMetaAction o-- RegionEdit
132/// RegionMetaAction o-- RegionRemove
133/// RegionChange o-- RegionMetadata
134/// RegionEdit o-- FileMeta
135///
136/// class RegionManifest {
137///     -RegionMetadataRef metadata
138///     -HashMap&lt;FileId, FileMeta&gt; files
139///     -ManifestVersion manifest_version
140/// }
141/// class RegionMetadata
142/// class FileMeta
143/// RegionManifest o-- RegionMetadata
144/// RegionManifest o-- FileMeta
145///
146/// class RegionCheckpoint {
147///     -ManifestVersion last_version
148///     -Option~RegionManifest~ checkpoint
149/// }
150/// RegionCheckpoint o-- RegionManifest
151/// ```
152#[derive(Debug)]
153pub struct RegionManifestManager {
154    store: ManifestObjectStore,
155    last_version: Arc<AtomicU64>,
156    checkpointer: Checkpointer,
157    manifest: Arc<RegionManifest>,
158    // Staging manifest is used to store the manifest of the staging region before it becomes available.
159    // It is initially inherited from the previous manifest(i.e., `self.manifest`).
160    // When the staging manifest becomes available, it will be used to construct the new manifest.
161    staging_manifest: Option<Arc<RegionManifest>>,
162    stats: ManifestStats,
163    stopped: bool,
164}
165
166impl RegionManifestManager {
167    /// Constructs a region's manifest and persist it.
168    pub async fn new(
169        metadata: RegionMetadataRef,
170        flushed_entry_id: u64,
171        options: RegionManifestOptions,
172        sst_format: FormatType,
173        stats: &ManifestStats,
174    ) -> Result<Self> {
175        // construct storage
176        let mut store = ManifestObjectStore::new(
177            &options.manifest_dir,
178            options.object_store.clone(),
179            options.compress_type,
180            stats.total_manifest_size.clone(),
181            options.manifest_cache.clone(),
182        );
183        let manifest_version = stats.manifest_version.clone();
184
185        info!(
186            "Creating region manifest in {} with metadata {:?}, flushed_entry_id: {}",
187            options.manifest_dir, metadata, flushed_entry_id
188        );
189
190        let version = MIN_VERSION;
191        let mut manifest_builder = RegionManifestBuilder::default();
192        // set the initial metadata.
193        manifest_builder.apply_change(
194            version,
195            RegionChange {
196                metadata: metadata.clone(),
197                sst_format,
198                append_mode: None,
199            },
200        );
201        let manifest = manifest_builder.try_build()?;
202        let region_id = metadata.region_id;
203
204        debug!(
205            "Build region manifest in {}, manifest: {:?}",
206            options.manifest_dir, manifest
207        );
208
209        let mut actions = vec![RegionMetaAction::Change(RegionChange {
210            metadata,
211            sst_format,
212            append_mode: None,
213        })];
214        if flushed_entry_id > 0 {
215            actions.push(RegionMetaAction::Edit(RegionEdit {
216                files_to_add: vec![],
217                files_to_remove: vec![],
218                timestamp_ms: None,
219                compaction_time_window: None,
220                flushed_entry_id: Some(flushed_entry_id),
221                flushed_sequence: None,
222                committed_sequence: None,
223            }));
224        }
225
226        // Persist region change.
227        let action_list = RegionMetaActionList::new(actions);
228
229        // New region is not in staging mode.
230        // TODO(ruihang): add staging mode support if needed.
231        store.save(version, &action_list.encode()?, false).await?;
232
233        let checkpointer = Checkpointer::new(region_id, options, store.clone(), MIN_VERSION);
234        manifest_version.store(version, Ordering::Relaxed);
235        manifest
236            .removed_files
237            .update_file_removed_cnt_to_stats(stats);
238        Ok(Self {
239            store,
240            last_version: manifest_version,
241            checkpointer,
242            manifest: Arc::new(manifest),
243            staging_manifest: None,
244            stats: stats.clone(),
245            stopped: false,
246        })
247    }
248
249    /// Opens an existing manifest.
250    ///
251    /// Returns `Ok(None)` if no such manifest.
252    pub async fn open(
253        options: RegionManifestOptions,
254        stats: &ManifestStats,
255    ) -> Result<Option<Self>> {
256        let _t = MANIFEST_OP_ELAPSED
257            .with_label_values(&["open"])
258            .start_timer();
259        let open_start = Instant::now();
260
261        // construct storage
262        let mut store = ManifestObjectStore::new(
263            &options.manifest_dir,
264            options.object_store.clone(),
265            options.compress_type,
266            stats.total_manifest_size.clone(),
267            options.manifest_cache.clone(),
268        );
269        let manifest_version = stats.manifest_version.clone();
270
271        // recover from storage
272        // construct manifest builder
273        // calculate the manifest size from the latest checkpoint
274        let mut version = MIN_VERSION;
275        let checkpoint = Self::last_checkpoint(&mut store).await?;
276        let last_checkpoint_version = checkpoint
277            .as_ref()
278            .map(|(checkpoint, _)| checkpoint.last_version)
279            .unwrap_or(MIN_VERSION);
280        let mut manifest_builder = if let Some((checkpoint, _)) = checkpoint {
281            info!(
282                "Recover region manifest {} from checkpoint version {}",
283                options.manifest_dir, checkpoint.last_version
284            );
285            version = version.max(checkpoint.last_version + 1);
286            RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint)
287        } else {
288            info!(
289                "Checkpoint not found in {}, build manifest from scratch",
290                options.manifest_dir
291            );
292            RegionManifestBuilder::default()
293        };
294
295        let replay_start_version = version;
296        info!(
297            "Replaying region manifest {} from version {}, last checkpoint version: {}",
298            options.manifest_dir, replay_start_version, last_checkpoint_version,
299        );
300
301        // apply actions from storage
302        let manifests = store.fetch_manifests(version, MAX_VERSION).await?;
303        let replayed_deltas = manifests.len();
304
305        for (manifest_version, raw_action_list) in manifests {
306            let action_list = RegionMetaActionList::decode(&raw_action_list)?;
307            // set manifest size after last checkpoint
308            store.set_delta_file_size(manifest_version, raw_action_list.len() as u64);
309            for action in action_list.actions {
310                match action {
311                    RegionMetaAction::Change(action) => {
312                        manifest_builder.apply_change(manifest_version, action);
313                    }
314                    RegionMetaAction::PartitionExprChange(action) => {
315                        manifest_builder.apply_partition_expr_change(manifest_version, action);
316                    }
317                    RegionMetaAction::Edit(action) => {
318                        manifest_builder.apply_edit(manifest_version, action);
319                    }
320                    RegionMetaAction::Remove(_) => {
321                        debug!(
322                            "Unhandled action in {}, action: {:?}",
323                            options.manifest_dir, action
324                        );
325                    }
326                    RegionMetaAction::Truncate(action) => {
327                        manifest_builder.apply_truncate(manifest_version, action);
328                    }
329                }
330            }
331        }
332
333        // set the initial metadata if necessary
334        if !manifest_builder.contains_metadata() {
335            debug!("No region manifest in {}", options.manifest_dir);
336            return Ok(None);
337        }
338
339        let manifest = manifest_builder.try_build()?;
340        debug!(
341            "Recovered region manifest from {}, manifest: {:?}",
342            options.manifest_dir, manifest
343        );
344        let version = manifest.manifest_version;
345
346        let manifest_dir = options.manifest_dir.clone();
347        let checkpointer = Checkpointer::new(
348            manifest.metadata.region_id,
349            options,
350            store.clone(),
351            last_checkpoint_version,
352        );
353        manifest_version.store(version, Ordering::Relaxed);
354        manifest
355            .removed_files
356            .update_file_removed_cnt_to_stats(stats);
357        info!(
358            "Opened region manifest {}, region_id: {}, start_version: {}, last_checkpoint_version: {}, replayed_deltas: {}, final_version: {}, cost: {:?}",
359            manifest_dir,
360            manifest.metadata.region_id,
361            replay_start_version,
362            last_checkpoint_version,
363            replayed_deltas,
364            version,
365            open_start.elapsed(),
366        );
367        Ok(Some(Self {
368            store,
369            last_version: manifest_version,
370            checkpointer,
371            manifest: Arc::new(manifest),
372            // TODO(weny): open the staging manifest if exists.
373            staging_manifest: None,
374            stats: stats.clone(),
375            stopped: false,
376        }))
377    }
378
379    /// Stops the manager.
380    pub async fn stop(&mut self) {
381        self.stopped = true;
382    }
383
384    /// Installs the manifest changes from the current version to the target version (inclusive).
385    ///
386    /// Returns installed version.
387    /// **Note**: This function is not guaranteed to install the target version strictly.
388    /// The installed version may be greater than the target version.
389    pub async fn install_manifest_to(
390        &mut self,
391        target_version: ManifestVersion,
392    ) -> Result<ManifestVersion> {
393        let _t = MANIFEST_OP_ELAPSED
394            .with_label_values(&["install_manifest_to"])
395            .start_timer();
396
397        let last_version = self.last_version();
398        // Case 1: If the target version is less than the current version, return the current version.
399        if last_version >= target_version {
400            debug!(
401                "Target version {} is less than or equal to the current version {}, region: {}, skip install",
402                target_version, last_version, self.manifest.metadata.region_id
403            );
404            return Ok(last_version);
405        }
406
407        ensure!(
408            !self.stopped,
409            RegionStoppedSnafu {
410                region_id: self.manifest.metadata.region_id,
411            }
412        );
413
414        let region_id = self.manifest.metadata.region_id;
415        // Fetches manifests from the last version strictly.
416        let mut manifests = self
417            .store
418            // Invariant: last_version < target_version.
419            .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
420            .await?;
421
422        // Case 2: No manifests in range: [current_version+1, target_version+1)
423        //
424        // |---------Has been deleted------------|     [Checkpoint Version]...[Latest Version]
425        //                                                                    [Leader region]
426        // [Current Version]......[Target Version]
427        // [Follower region]
428        if manifests.is_empty() {
429            info!(
430                "Manifests are not strict from {}, region: {}, tries to install the last checkpoint",
431                last_version, self.manifest.metadata.region_id
432            );
433            let last_version = self.install_last_checkpoint().await?;
434            // Case 2.1: If the installed checkpoint version is greater than or equal to the target version, return the last version.
435            if last_version >= target_version {
436                return Ok(last_version);
437            }
438
439            // Fetches manifests from the installed version strictly.
440            manifests = self
441                .store
442                // Invariant: last_version < target_version.
443                .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
444                .await?;
445        }
446
447        if manifests.is_empty() {
448            return NoManifestsSnafu {
449                region_id: self.manifest.metadata.region_id,
450                start_version: last_version + 1,
451                end_version: target_version + 1,
452                last_version,
453            }
454            .fail();
455        }
456
457        debug_assert_eq!(manifests.first().unwrap().0, last_version + 1);
458        let mut manifest_builder =
459            RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
460
461        for (manifest_version, raw_action_list) in manifests {
462            self.store
463                .set_delta_file_size(manifest_version, raw_action_list.len() as u64);
464            let action_list = RegionMetaActionList::decode(&raw_action_list)?;
465            for action in action_list.actions {
466                match action {
467                    RegionMetaAction::Change(action) => {
468                        manifest_builder.apply_change(manifest_version, action);
469                    }
470                    RegionMetaAction::PartitionExprChange(action) => {
471                        manifest_builder.apply_partition_expr_change(manifest_version, action);
472                    }
473                    RegionMetaAction::Edit(action) => {
474                        manifest_builder.apply_edit(manifest_version, action);
475                    }
476                    RegionMetaAction::Remove(_) => {
477                        debug!(
478                            "Unhandled action for region {}, action: {:?}",
479                            self.manifest.metadata.region_id, action
480                        );
481                    }
482                    RegionMetaAction::Truncate(action) => {
483                        manifest_builder.apply_truncate(manifest_version, action);
484                    }
485                }
486            }
487        }
488
489        let new_manifest = manifest_builder.try_build()?;
490        ensure!(
491            new_manifest.manifest_version >= target_version,
492            InstallManifestToSnafu {
493                region_id: self.manifest.metadata.region_id,
494                target_version,
495                available_version: new_manifest.manifest_version,
496                last_version,
497            }
498        );
499
500        let version = self.last_version();
501        new_manifest
502            .removed_files
503            .update_file_removed_cnt_to_stats(&self.stats);
504        self.manifest = Arc::new(new_manifest);
505        let last_version = self.set_version(self.manifest.manifest_version);
506        info!(
507            "Install manifest changes from {} to {}, region: {}",
508            version, last_version, self.manifest.metadata.region_id
509        );
510
511        Ok(last_version)
512    }
513
514    /// Installs the last checkpoint.
515    pub(crate) async fn install_last_checkpoint(&mut self) -> Result<ManifestVersion> {
516        let last_version = self.last_version();
517        let Some((checkpoint, checkpoint_size)) = Self::last_checkpoint(&mut self.store).await?
518        else {
519            return NoCheckpointSnafu {
520                region_id: self.manifest.metadata.region_id,
521                last_version,
522            }
523            .fail();
524        };
525        self.store.reset_manifest_size();
526        self.store
527            .set_checkpoint_file_size(checkpoint.last_version, checkpoint_size);
528        let builder = RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint);
529        let manifest = builder.try_build()?;
530        let last_version = self.set_version(manifest.manifest_version);
531        manifest
532            .removed_files
533            .update_file_removed_cnt_to_stats(&self.stats);
534        self.manifest = Arc::new(manifest);
535        info!(
536            "Installed region manifest from checkpoint: {}, region: {}",
537            checkpoint.last_version, self.manifest.metadata.region_id
538        );
539
540        Ok(last_version)
541    }
542
543    /// Updates the manifest. Returns the current manifest version number.
544    pub async fn update(
545        &mut self,
546        action_list: RegionMetaActionList,
547        is_staging: bool,
548    ) -> Result<ManifestVersion> {
549        let _t = MANIFEST_OP_ELAPSED
550            .with_label_values(&["update"])
551            .start_timer();
552
553        ensure!(
554            !self.stopped,
555            RegionStoppedSnafu {
556                region_id: self.manifest.metadata.region_id,
557            }
558        );
559
560        let version = self.increase_version();
561        self.store
562            .save(version, &action_list.encode()?, is_staging)
563            .await?;
564
565        // For a staging region, the manifest is initially inherited from the previous manifest(i.e., `self.manifest`).
566        // When the staging manifest becomes available, it will be used to construct the new manifest.
567        let mut manifest_builder =
568            if is_staging && let Some(staging_manifest) = self.staging_manifest.as_ref() {
569                RegionManifestBuilder::with_checkpoint(Some(staging_manifest.as_ref().clone()))
570            } else {
571                RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()))
572            };
573
574        for action in action_list.actions {
575            match action {
576                RegionMetaAction::Change(action) => {
577                    manifest_builder.apply_change(version, action);
578                }
579                RegionMetaAction::PartitionExprChange(action) => {
580                    manifest_builder.apply_partition_expr_change(version, action);
581                }
582                RegionMetaAction::Edit(action) => {
583                    manifest_builder.apply_edit(version, action);
584                }
585                RegionMetaAction::Remove(_) => {
586                    debug!(
587                        "Unhandled action for region {}, action: {:?}",
588                        self.manifest.metadata.region_id, action
589                    );
590                }
591                RegionMetaAction::Truncate(action) => {
592                    manifest_builder.apply_truncate(version, action);
593                }
594            }
595        }
596
597        if is_staging {
598            let new_manifest = manifest_builder.try_build()?;
599            self.staging_manifest = Some(Arc::new(new_manifest));
600
601            info!(
602                "Skipping checkpoint for region {} in staging mode, manifest version: {}",
603                self.manifest.metadata.region_id, self.manifest.manifest_version
604            );
605        } else {
606            let new_manifest = manifest_builder.try_build()?;
607            new_manifest
608                .removed_files
609                .update_file_removed_cnt_to_stats(&self.stats);
610            let updated_manifest = self
611                .checkpointer
612                .update_manifest_removed_files(new_manifest)?;
613            self.manifest = Arc::new(updated_manifest);
614            self.checkpointer
615                .maybe_do_checkpoint(self.manifest.as_ref());
616        }
617
618        Ok(version)
619    }
620
621    /// Clear deleted files from manifest's `removed_files` field without update version. Notice if datanode exit before checkpoint then new manifest by open region may still contain these deleted files, which is acceptable for gc process.
622    pub fn clear_deleted_files(&mut self, deleted_files: Vec<RemovedFile>) {
623        let mut manifest = (*self.manifest()).clone();
624        manifest.removed_files.clear_deleted_files(deleted_files);
625        self.set_manifest(Arc::new(manifest));
626    }
627
628    pub(crate) fn set_manifest(&mut self, manifest: Arc<RegionManifest>) {
629        self.manifest = manifest;
630    }
631
632    /// Retrieves the current [RegionManifest].
633    pub fn manifest(&self) -> Arc<RegionManifest> {
634        self.manifest.clone()
635    }
636
637    /// Retrieves the current [RegionManifest].
638    pub fn staging_manifest(&self) -> Option<Arc<RegionManifest>> {
639        self.staging_manifest.clone()
640    }
641
642    /// Returns total manifest size.
643    pub fn manifest_usage(&self) -> u64 {
644        self.store.total_manifest_size()
645    }
646
647    /// Returns true if a newer version manifest file is found.
648    ///
649    /// It is typically used in read-only regions to catch up with manifest.
650    /// It doesn't lock the manifest directory in the object store so the result
651    /// may be inaccurate if there are concurrent writes.
652    pub async fn has_update(&self) -> Result<bool> {
653        let last_version = self.last_version();
654
655        // Skip older files at the object-store layer. Files for `v == last_version`
656        // may still appear (`{path}{v:020}` sorts before `{path}{v:020}.json`) but
657        // they are filtered out below by the `version > last_version` check.
658        let start_after = list_start_after(self.store.manifest_dir(), last_version);
659        let streamer = self
660            .store
661            .manifest_lister(false, Some(&start_after))
662            .await?
663            .context(error::EmptyManifestDirSnafu {
664                manifest_dir: self.store.manifest_dir(),
665            })?;
666
667        let need_update = streamer
668            .try_any(|entry| async move {
669                let file_name = entry.name();
670                if is_delta_file(file_name) || is_checkpoint_file(file_name) {
671                    let version = file_version(file_name);
672                    if version > last_version {
673                        return true;
674                    }
675                }
676                false
677            })
678            .await
679            .context(error::OpenDalSnafu)?;
680
681        Ok(need_update)
682    }
683
684    /// Increases last version and returns the increased version.
685    fn increase_version(&mut self) -> ManifestVersion {
686        let previous = self.last_version.fetch_add(1, Ordering::Relaxed);
687        previous + 1
688    }
689
690    /// Sets the last version.
691    fn set_version(&mut self, version: ManifestVersion) -> ManifestVersion {
692        self.last_version.store(version, Ordering::Relaxed);
693        version
694    }
695
696    fn last_version(&self) -> ManifestVersion {
697        self.last_version.load(Ordering::Relaxed)
698    }
699
700    /// Fetches the last [RegionCheckpoint] from storage.
701    ///
702    /// If the checkpoint is not found, returns `None`.
703    /// Otherwise, returns the checkpoint and the size of the checkpoint.
704    pub(crate) async fn last_checkpoint(
705        store: &mut ManifestObjectStore,
706    ) -> Result<Option<(RegionCheckpoint, u64)>> {
707        let last_checkpoint = store.load_last_checkpoint().await?;
708
709        if let Some((_, bytes)) = last_checkpoint {
710            let checkpoint = RegionCheckpoint::decode(&bytes)?;
711            Ok(Some((checkpoint, bytes.len() as u64)))
712        } else {
713            Ok(None)
714        }
715    }
716
717    pub fn store(&self) -> ManifestObjectStore {
718        self.store.clone()
719    }
720
721    #[cfg(test)]
722    pub(crate) fn checkpointer(&self) -> &Checkpointer {
723        &self.checkpointer
724    }
725
726    /// Merge all staged manifest actions into a single action list ready for submission.
727    /// This collects all staging manifests, applies them sequentially, and returns the merged actions.
728    pub(crate) async fn merge_staged_actions(
729        &mut self,
730        region_state: RegionRoleState,
731    ) -> Result<Option<RegionMetaActionList>> {
732        // Only merge if we're in staging mode
733        if region_state != RegionRoleState::Leader(RegionLeaderState::Staging) {
734            return Ok(None);
735        }
736
737        // Fetch all staging manifests
738        let staging_manifests = self.store.fetch_staging_manifests().await?;
739
740        if staging_manifests.is_empty() {
741            info!(
742                "No staging manifests to merge for region {}",
743                self.manifest.metadata.region_id
744            );
745            return Ok(None);
746        }
747
748        info!(
749            "Merging {} staging manifests for region {}",
750            staging_manifests.len(),
751            self.manifest.metadata.region_id
752        );
753
754        // Start with current manifest state as the base
755        let mut merged_actions = Vec::new();
756        let mut latest_version = self.last_version();
757
758        // Apply all staging actions in order
759        for (manifest_version, raw_action_list) in staging_manifests {
760            let action_list = RegionMetaActionList::decode(&raw_action_list)?;
761
762            for action in action_list.actions {
763                merged_actions.push(action);
764            }
765
766            latest_version = latest_version.max(manifest_version);
767        }
768
769        if merged_actions.is_empty() {
770            return Ok(None);
771        }
772
773        info!(
774            "Successfully merged {} actions from staging manifests for region {}, latest version: {}",
775            merged_actions.len(),
776            self.manifest.metadata.region_id,
777            latest_version
778        );
779
780        Ok(Some(RegionMetaActionList::new(merged_actions)))
781    }
782
783    /// Unsets the staging manifest.
784    pub(crate) fn unset_staging_manifest(&mut self) {
785        self.staging_manifest = None;
786    }
787
788    /// Clear all staging manifests.
789    pub(crate) async fn clear_staging_manifest_and_dir(&mut self) -> Result<()> {
790        self.staging_manifest = None;
791        self.store.clear_staging_manifests().await?;
792        info!(
793            "Cleared all staging manifests for region {}",
794            self.manifest.metadata.region_id
795        );
796        Ok(())
797    }
798}
799
800#[cfg(test)]
801impl RegionManifestManager {
802    fn validate_manifest(&self, expect: &RegionMetadataRef, last_version: ManifestVersion) {
803        let manifest = self.manifest();
804        assert_eq!(manifest.metadata, *expect);
805        assert_eq!(self.manifest.manifest_version, self.last_version());
806        assert_eq!(last_version, self.last_version());
807    }
808}
809
810#[cfg(test)]
811mod test {
812    use std::time::Duration;
813
814    use api::v1::SemanticType;
815    use common_datasource::compression::CompressionType;
816    use common_test_util::temp_dir::create_temp_dir;
817    use datatypes::prelude::ConcreteDataType;
818    use datatypes::schema::ColumnSchema;
819    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
820
821    use super::*;
822    use crate::manifest::action::{RegionChange, RegionEdit};
823    use crate::manifest::tests::utils::basic_region_metadata;
824    use crate::test_util::TestEnv;
825
826    #[tokio::test]
827    async fn create_manifest_manager() {
828        let metadata = Arc::new(basic_region_metadata());
829        let env = TestEnv::new().await;
830        let manager = env
831            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
832            .await
833            .unwrap()
834            .unwrap();
835
836        manager.validate_manifest(&metadata, 0);
837    }
838
839    #[tokio::test]
840    async fn open_manifest_manager() {
841        let env = TestEnv::new().await;
842        // Try to opens an empty manifest.
843        assert!(
844            env.create_manifest_manager(CompressionType::Uncompressed, 10, None)
845                .await
846                .unwrap()
847                .is_none()
848        );
849
850        // Creates a manifest.
851        let metadata = Arc::new(basic_region_metadata());
852        let mut manager = env
853            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
854            .await
855            .unwrap()
856            .unwrap();
857        // Stops it.
858        manager.stop().await;
859
860        // Open it.
861        let manager = env
862            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
863            .await
864            .unwrap()
865            .unwrap();
866
867        manager.validate_manifest(&metadata, 0);
868    }
869
870    #[tokio::test]
871    async fn manifest_with_partition_expr_roundtrip() {
872        let env = TestEnv::new().await;
873        let expr_json =
874            r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
875        let mut metadata = basic_region_metadata();
876        metadata.set_partition_expr(Some(expr_json.to_string()));
877        let metadata = Arc::new(metadata);
878        let mut manager = env
879            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
880            .await
881            .unwrap()
882            .unwrap();
883
884        // persisted manifest should contain the same partition_expr JSON
885        let manifest = manager.manifest();
886        assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
887
888        manager.stop().await;
889
890        // Reopen and check again
891        let manager = env
892            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
893            .await
894            .unwrap()
895            .unwrap();
896        let manifest = manager.manifest();
897        assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
898    }
899
900    #[tokio::test]
901    async fn region_change_add_column() {
902        let metadata = Arc::new(basic_region_metadata());
903        let env = TestEnv::new().await;
904        let mut manager = env
905            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
906            .await
907            .unwrap()
908            .unwrap();
909
910        let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
911        new_metadata_builder.push_column_metadata(ColumnMetadata {
912            column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
913            semantic_type: SemanticType::Field,
914            column_id: 252,
915        });
916        let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
917
918        let action_list =
919            RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
920                metadata: new_metadata.clone(),
921                sst_format: FormatType::PrimaryKey,
922                append_mode: None,
923            }));
924
925        let current_version = manager.update(action_list, false).await.unwrap();
926        assert_eq!(current_version, 1);
927        manager.validate_manifest(&new_metadata, 1);
928
929        // Reopen the manager.
930        manager.stop().await;
931        let manager = env
932            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
933            .await
934            .unwrap()
935            .unwrap();
936        manager.validate_manifest(&new_metadata, 1);
937    }
938
939    /// Just for test, refer to wal_dir_usage in src/store-api/src/logstore.rs.
940    async fn manifest_dir_usage(path: &str) -> u64 {
941        let mut size = 0;
942        let mut read_dir = tokio::fs::read_dir(path).await.unwrap();
943        while let Ok(dir_entry) = read_dir.next_entry().await {
944            let Some(entry) = dir_entry else {
945                break;
946            };
947            if entry.file_type().await.unwrap().is_file() {
948                let file_name = entry.file_name().into_string().unwrap();
949                if file_name.contains(".checkpoint") || file_name.contains(".json") {
950                    let file_size = entry.metadata().await.unwrap().len() as usize;
951                    debug!("File: {file_name:?}, size: {file_size}");
952                    size += file_size;
953                }
954            }
955        }
956        size as u64
957    }
958
959    #[tokio::test]
960    async fn test_manifest_size() {
961        let metadata = Arc::new(basic_region_metadata());
962        let data_home = create_temp_dir("");
963        let data_home_path = data_home.path().to_str().unwrap().to_string();
964        let env = TestEnv::with_data_home(data_home).await;
965
966        let manifest_dir = format!("{}/manifest", data_home_path);
967
968        let mut manager = env
969            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
970            .await
971            .unwrap()
972            .unwrap();
973
974        let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
975        new_metadata_builder.push_column_metadata(ColumnMetadata {
976            column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
977            semantic_type: SemanticType::Field,
978            column_id: 252,
979        });
980        let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
981
982        let action_list =
983            RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
984                metadata: new_metadata.clone(),
985                sst_format: FormatType::PrimaryKey,
986                append_mode: None,
987            }));
988
989        let current_version = manager.update(action_list, false).await.unwrap();
990        assert_eq!(current_version, 1);
991        manager.validate_manifest(&new_metadata, 1);
992
993        // get manifest size
994        let manifest_size = manager.manifest_usage();
995        assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
996
997        // update 10 times nop_action to trigger checkpoint
998        for _ in 0..10 {
999            manager
1000                .update(
1001                    RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
1002                        files_to_add: vec![],
1003                        files_to_remove: vec![],
1004                        timestamp_ms: None,
1005                        compaction_time_window: None,
1006                        flushed_entry_id: None,
1007                        flushed_sequence: None,
1008                        committed_sequence: None,
1009                    })]),
1010                    false,
1011                )
1012                .await
1013                .unwrap();
1014        }
1015
1016        while manager.checkpointer.is_doing_checkpoint() {
1017            tokio::time::sleep(Duration::from_millis(10)).await;
1018        }
1019
1020        // check manifest size again
1021        let manifest_size = manager.manifest_usage();
1022        assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
1023
1024        // Reopen the manager,
1025        // we just calculate the size from the latest checkpoint file
1026        manager.stop().await;
1027        let manager = env
1028            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
1029            .await
1030            .unwrap()
1031            .unwrap();
1032        manager.validate_manifest(&new_metadata, 11);
1033
1034        // get manifest size again
1035        let manifest_size = manager.manifest_usage();
1036        assert_eq!(manifest_size, 1397);
1037    }
1038}