Skip to main content

mito2/manifest/
checkpointer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18
19use common_telemetry::{error, info, warn};
20use store_api::storage::RegionId;
21use store_api::{MIN_VERSION, ManifestVersion};
22
23use crate::error::Result;
24use crate::manifest::action::{RegionCheckpoint, RegionManifest};
25use crate::manifest::manager::RegionManifestOptions;
26use crate::manifest::storage::ManifestObjectStore;
27use crate::metrics::MANIFEST_OP_ELAPSED;
28
29/// [`Checkpointer`] is responsible for doing checkpoint for a region, in an asynchronous way.
30#[derive(Debug)]
31pub(crate) struct Checkpointer {
32    manifest_options: RegionManifestOptions,
33    inner: Arc<Inner>,
34}
35
36#[derive(Debug)]
37struct Inner {
38    region_id: RegionId,
39    manifest_store: ManifestObjectStore,
40    last_checkpoint_version: AtomicU64,
41    is_doing_checkpoint: AtomicBool,
42}
43
44impl Inner {
45    async fn do_checkpoint(&self, checkpoint: RegionCheckpoint) {
46        let _guard = scopeguard::guard(&self.is_doing_checkpoint, |x| {
47            x.store(false, Ordering::Relaxed);
48        });
49
50        let _t = MANIFEST_OP_ELAPSED
51            .with_label_values(&["checkpoint"])
52            .start_timer();
53
54        let region_id = self.region_id();
55        let version = checkpoint.last_version();
56        let checkpoint = match checkpoint.encode() {
57            Ok(checkpoint) => checkpoint,
58            Err(e) => {
59                error!(e; "Failed to encode checkpoint {:?}", checkpoint);
60                return;
61            }
62        };
63        if let Err(e) = self
64            .manifest_store
65            .save_checkpoint(version, &checkpoint)
66            .await
67        {
68            error!(e; "Failed to save checkpoint for region {}", region_id);
69            return;
70        }
71
72        // Advance the in-memory checkpoint version as soon as the checkpoint file
73        // is durable. If the subsequent delta cleanup fails, the on-disk state is
74        // still consistent (the `_last_checkpoint` metadata points at the new
75        // checkpoint) and `maybe_do_checkpoint` must not re-checkpoint the same
76        // range.
77        self.last_checkpoint_version
78            .store(version, Ordering::Relaxed);
79
80        if let Err(e) = self.manifest_store.delete_until(version, true).await {
81            warn!(e; "Failed to delete manifest actions until version {} for region {}, leftover files will be ignored on recovery", version, region_id);
82        }
83
84        info!(
85            "Checkpoint for region {} success, version: {}",
86            region_id, version
87        );
88    }
89
90    fn region_id(&self) -> RegionId {
91        self.region_id
92    }
93
94    fn is_doing_checkpoint(&self) -> bool {
95        self.is_doing_checkpoint.load(Ordering::Relaxed)
96    }
97
98    fn set_doing_checkpoint(&self) {
99        self.is_doing_checkpoint.store(true, Ordering::Relaxed);
100    }
101}
102
103impl Checkpointer {
104    pub(crate) fn new(
105        region_id: RegionId,
106        manifest_options: RegionManifestOptions,
107        manifest_store: ManifestObjectStore,
108        last_checkpoint_version: ManifestVersion,
109    ) -> Self {
110        Self {
111            manifest_options,
112            inner: Arc::new(Inner {
113                region_id,
114                manifest_store,
115                last_checkpoint_version: AtomicU64::new(last_checkpoint_version),
116                is_doing_checkpoint: AtomicBool::new(false),
117            }),
118        }
119    }
120
121    pub(crate) fn last_checkpoint_version(&self) -> ManifestVersion {
122        self.inner.last_checkpoint_version.load(Ordering::Relaxed)
123    }
124
125    /// Update the `removed_files` field in the manifest by the options in `manifest_options`.
126    /// This should be called before maybe do checkpoint to update the manifest.
127    pub(crate) fn update_manifest_removed_files(
128        &self,
129        mut manifest: RegionManifest,
130    ) -> Result<RegionManifest> {
131        let opt = &self.manifest_options.remove_file_options;
132
133        manifest.removed_files.evict_old_removed_files(opt)?;
134
135        // TODO(discord9): consider also check object store to clear removed files that are already deleted? How costly it is?
136
137        Ok(manifest)
138    }
139
140    /// Check if it's needed to do checkpoint for the region by the checkpoint distance.
141    /// If needed, and there's no currently running checkpoint task, it will start a new checkpoint
142    /// task running in the background.
143    pub(crate) fn maybe_do_checkpoint(&self, manifest: &RegionManifest) {
144        if self.manifest_options.checkpoint_distance == 0 {
145            return;
146        }
147
148        let last_checkpoint_version = self.last_checkpoint_version();
149        if manifest.manifest_version - last_checkpoint_version
150            < self.manifest_options.checkpoint_distance
151        {
152            return;
153        }
154
155        // We can simply check whether there's a running checkpoint task like this, all because of
156        // the caller of this function is ran single threaded, inside the lock of RegionManifestManager.
157        if self.inner.is_doing_checkpoint() {
158            return;
159        }
160
161        let start_version = if last_checkpoint_version == 0 {
162            // Checkpoint version can't be zero by implementation.
163            // So last checkpoint version is zero means no last checkpoint.
164            MIN_VERSION
165        } else {
166            last_checkpoint_version + 1
167        };
168        let end_version = manifest.manifest_version;
169        info!(
170            "Start doing checkpoint for region {}, compacted version: [{}, {}]",
171            self.inner.region_id(),
172            start_version,
173            end_version,
174        );
175
176        let checkpoint = RegionCheckpoint {
177            last_version: end_version,
178            compacted_actions: (end_version - start_version + 1) as usize,
179            checkpoint: Some(manifest.clone()),
180        };
181        self.do_checkpoint(checkpoint);
182    }
183
184    fn do_checkpoint(&self, checkpoint: RegionCheckpoint) {
185        self.inner.set_doing_checkpoint();
186
187        let inner = self.inner.clone();
188        common_runtime::spawn_global(async move {
189            inner.do_checkpoint(checkpoint).await;
190        });
191    }
192
193    #[cfg(test)]
194    pub(crate) fn is_doing_checkpoint(&self) -> bool {
195        self.inner.is_doing_checkpoint()
196    }
197}