mito2/manifest/
checkpointer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
17use std::sync::Arc;
18
19use common_telemetry::{error, info};
20use store_api::manifest::{ManifestVersion, MIN_VERSION};
21use store_api::storage::RegionId;
22
23use crate::manifest::action::{RegionCheckpoint, RegionManifest};
24use crate::manifest::manager::RegionManifestOptions;
25use crate::manifest::storage::ManifestObjectStore;
26use crate::metrics::MANIFEST_OP_ELAPSED;
27
28/// [`Checkpointer`] is responsible for doing checkpoint for a region, in an asynchronous way.
29#[derive(Debug)]
30pub(crate) struct Checkpointer {
31    manifest_options: RegionManifestOptions,
32    inner: Arc<Inner>,
33}
34
35#[derive(Debug)]
36struct Inner {
37    region_id: RegionId,
38    manifest_store: ManifestObjectStore,
39    last_checkpoint_version: AtomicU64,
40    is_doing_checkpoint: AtomicBool,
41}
42
43impl Inner {
44    async fn do_checkpoint(&self, checkpoint: RegionCheckpoint) {
45        let _guard = scopeguard::guard(&self.is_doing_checkpoint, |x| {
46            x.store(false, Ordering::Relaxed);
47        });
48
49        let _t = MANIFEST_OP_ELAPSED
50            .with_label_values(&["checkpoint"])
51            .start_timer();
52
53        let region_id = self.region_id();
54        let version = checkpoint.last_version();
55        let checkpoint = match checkpoint.encode() {
56            Ok(checkpoint) => checkpoint,
57            Err(e) => {
58                error!(e; "Failed to encode checkpoint {:?}", checkpoint);
59                return;
60            }
61        };
62        if let Err(e) = self
63            .manifest_store
64            .save_checkpoint(version, &checkpoint)
65            .await
66        {
67            error!(e; "Failed to save checkpoint for region {}", region_id);
68            return;
69        }
70
71        if let Err(e) = self.manifest_store.delete_until(version, true).await {
72            error!(e; "Failed to delete manifest actions until version {} for region {}", version, region_id);
73            return;
74        }
75
76        self.last_checkpoint_version
77            .store(version, Ordering::Relaxed);
78
79        info!(
80            "Checkpoint for region {} success, version: {}",
81            region_id, version
82        );
83    }
84
85    fn region_id(&self) -> RegionId {
86        self.region_id
87    }
88
89    fn is_doing_checkpoint(&self) -> bool {
90        self.is_doing_checkpoint.load(Ordering::Relaxed)
91    }
92
93    fn set_doing_checkpoint(&self) {
94        self.is_doing_checkpoint.store(true, Ordering::Relaxed);
95    }
96}
97
98impl Checkpointer {
99    pub(crate) fn new(
100        region_id: RegionId,
101        manifest_options: RegionManifestOptions,
102        manifest_store: ManifestObjectStore,
103        last_checkpoint_version: ManifestVersion,
104    ) -> Self {
105        Self {
106            manifest_options,
107            inner: Arc::new(Inner {
108                region_id,
109                manifest_store,
110                last_checkpoint_version: AtomicU64::new(last_checkpoint_version),
111                is_doing_checkpoint: AtomicBool::new(false),
112            }),
113        }
114    }
115
116    pub(crate) fn last_checkpoint_version(&self) -> ManifestVersion {
117        self.inner.last_checkpoint_version.load(Ordering::Relaxed)
118    }
119
120    /// Check if it's needed to do checkpoint for the region by the checkpoint distance.
121    /// If needed, and there's no currently running checkpoint task, it will start a new checkpoint
122    /// task running in the background.
123    pub(crate) fn maybe_do_checkpoint(&self, manifest: &RegionManifest) {
124        if self.manifest_options.checkpoint_distance == 0 {
125            return;
126        }
127
128        let last_checkpoint_version = self.last_checkpoint_version();
129        if manifest.manifest_version - last_checkpoint_version
130            < self.manifest_options.checkpoint_distance
131        {
132            return;
133        }
134
135        // We can simply check whether there's a running checkpoint task like this, all because of
136        // the caller of this function is ran single threaded, inside the lock of RegionManifestManager.
137        if self.inner.is_doing_checkpoint() {
138            return;
139        }
140
141        let start_version = if last_checkpoint_version == 0 {
142            // Checkpoint version can't be zero by implementation.
143            // So last checkpoint version is zero means no last checkpoint.
144            MIN_VERSION
145        } else {
146            last_checkpoint_version + 1
147        };
148        let end_version = manifest.manifest_version;
149        info!(
150            "Start doing checkpoint for region {}, compacted version: [{}, {}]",
151            self.inner.region_id(),
152            start_version,
153            end_version,
154        );
155
156        let checkpoint = RegionCheckpoint {
157            last_version: end_version,
158            compacted_actions: (end_version - start_version + 1) as usize,
159            checkpoint: Some(manifest.clone()),
160        };
161        self.do_checkpoint(checkpoint);
162    }
163
164    fn do_checkpoint(&self, checkpoint: RegionCheckpoint) {
165        self.inner.set_doing_checkpoint();
166
167        let inner = self.inner.clone();
168        common_runtime::spawn_global(async move {
169            inner.do_checkpoint(checkpoint).await;
170        });
171    }
172
173    #[cfg(test)]
174    pub(crate) fn is_doing_checkpoint(&self) -> bool {
175        self.inner.is_doing_checkpoint()
176    }
177}