mito2/manifest/
checkpointer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18
19use common_telemetry::{error, info};
20use store_api::storage::RegionId;
21use store_api::{MIN_VERSION, ManifestVersion};
22
23use crate::error::Result;
24use crate::manifest::action::{RegionCheckpoint, RegionManifest};
25use crate::manifest::manager::RegionManifestOptions;
26use crate::manifest::storage::ManifestObjectStore;
27use crate::metrics::MANIFEST_OP_ELAPSED;
28use crate::region::{RegionLeaderState, RegionRoleState};
29
30/// [`Checkpointer`] is responsible for doing checkpoint for a region, in an asynchronous way.
31#[derive(Debug)]
32pub(crate) struct Checkpointer {
33    manifest_options: RegionManifestOptions,
34    inner: Arc<Inner>,
35}
36
37#[derive(Debug)]
38struct Inner {
39    region_id: RegionId,
40    manifest_store: ManifestObjectStore,
41    last_checkpoint_version: AtomicU64,
42    is_doing_checkpoint: AtomicBool,
43}
44
45impl Inner {
46    async fn do_checkpoint(&self, checkpoint: RegionCheckpoint) {
47        let _guard = scopeguard::guard(&self.is_doing_checkpoint, |x| {
48            x.store(false, Ordering::Relaxed);
49        });
50
51        let _t = MANIFEST_OP_ELAPSED
52            .with_label_values(&["checkpoint"])
53            .start_timer();
54
55        let region_id = self.region_id();
56        let version = checkpoint.last_version();
57        let checkpoint = match checkpoint.encode() {
58            Ok(checkpoint) => checkpoint,
59            Err(e) => {
60                error!(e; "Failed to encode checkpoint {:?}", checkpoint);
61                return;
62            }
63        };
64        if let Err(e) = self
65            .manifest_store
66            .save_checkpoint(version, &checkpoint)
67            .await
68        {
69            error!(e; "Failed to save checkpoint for region {}", region_id);
70            return;
71        }
72
73        if let Err(e) = self.manifest_store.delete_until(version, true).await {
74            error!(e; "Failed to delete manifest actions until version {} for region {}", version, region_id);
75            return;
76        }
77
78        self.last_checkpoint_version
79            .store(version, Ordering::Relaxed);
80
81        info!(
82            "Checkpoint for region {} success, version: {}",
83            region_id, version
84        );
85    }
86
87    fn region_id(&self) -> RegionId {
88        self.region_id
89    }
90
91    fn is_doing_checkpoint(&self) -> bool {
92        self.is_doing_checkpoint.load(Ordering::Relaxed)
93    }
94
95    fn set_doing_checkpoint(&self) {
96        self.is_doing_checkpoint.store(true, Ordering::Relaxed);
97    }
98}
99
100impl Checkpointer {
101    pub(crate) fn new(
102        region_id: RegionId,
103        manifest_options: RegionManifestOptions,
104        manifest_store: ManifestObjectStore,
105        last_checkpoint_version: ManifestVersion,
106    ) -> Self {
107        Self {
108            manifest_options,
109            inner: Arc::new(Inner {
110                region_id,
111                manifest_store,
112                last_checkpoint_version: AtomicU64::new(last_checkpoint_version),
113                is_doing_checkpoint: AtomicBool::new(false),
114            }),
115        }
116    }
117
118    pub(crate) fn last_checkpoint_version(&self) -> ManifestVersion {
119        self.inner.last_checkpoint_version.load(Ordering::Relaxed)
120    }
121
122    /// Update the `removed_files` field in the manifest by the options in `manifest_options`.
123    /// This should be called before maybe do checkpoint to update the manifest.
124    pub(crate) fn update_manifest_removed_files(
125        &self,
126        mut manifest: RegionManifest,
127    ) -> Result<RegionManifest> {
128        let opt = &self.manifest_options.remove_file_options;
129
130        manifest.removed_files.evict_old_removed_files(opt)?;
131
132        // TODO(discord9): consider also check object store to clear removed files that are already deleted? How costly it is?
133
134        Ok(manifest)
135    }
136
137    /// Check if it's needed to do checkpoint for the region by the checkpoint distance.
138    /// If needed, and there's no currently running checkpoint task, it will start a new checkpoint
139    /// task running in the background.
140    pub(crate) fn maybe_do_checkpoint(
141        &self,
142        manifest: &RegionManifest,
143        region_state: RegionRoleState,
144    ) {
145        // Skip checkpoint if region is in staging state
146        if region_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
147            info!(
148                "Skipping checkpoint for region {} in staging mode, manifest version: {}",
149                manifest.metadata.region_id, manifest.manifest_version
150            );
151            return;
152        }
153
154        if self.manifest_options.checkpoint_distance == 0 {
155            return;
156        }
157
158        let last_checkpoint_version = self.last_checkpoint_version();
159        if manifest.manifest_version - last_checkpoint_version
160            < self.manifest_options.checkpoint_distance
161        {
162            return;
163        }
164
165        // We can simply check whether there's a running checkpoint task like this, all because of
166        // the caller of this function is ran single threaded, inside the lock of RegionManifestManager.
167        if self.inner.is_doing_checkpoint() {
168            return;
169        }
170
171        let start_version = if last_checkpoint_version == 0 {
172            // Checkpoint version can't be zero by implementation.
173            // So last checkpoint version is zero means no last checkpoint.
174            MIN_VERSION
175        } else {
176            last_checkpoint_version + 1
177        };
178        let end_version = manifest.manifest_version;
179        info!(
180            "Start doing checkpoint for region {}, compacted version: [{}, {}]",
181            self.inner.region_id(),
182            start_version,
183            end_version,
184        );
185
186        let checkpoint = RegionCheckpoint {
187            last_version: end_version,
188            compacted_actions: (end_version - start_version + 1) as usize,
189            checkpoint: Some(manifest.clone()),
190        };
191        self.do_checkpoint(checkpoint);
192    }
193
194    fn do_checkpoint(&self, checkpoint: RegionCheckpoint) {
195        self.inner.set_doing_checkpoint();
196
197        let inner = self.inner.clone();
198        common_runtime::spawn_global(async move {
199            inner.do_checkpoint(checkpoint).await;
200        });
201    }
202
203    #[cfg(test)]
204    pub(crate) fn is_doing_checkpoint(&self) -> bool {
205        self.inner.is_doing_checkpoint()
206    }
207}