mito2/manifest/
checkpointer.rs1use std::fmt::Debug;
16use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
17use std::sync::Arc;
18
19use common_telemetry::{error, info};
20use store_api::storage::RegionId;
21use store_api::{ManifestVersion, MIN_VERSION};
22
23use crate::error::Result;
24use crate::manifest::action::{RegionCheckpoint, RegionManifest};
25use crate::manifest::manager::RegionManifestOptions;
26use crate::manifest::storage::ManifestObjectStore;
27use crate::metrics::MANIFEST_OP_ELAPSED;
28use crate::region::{RegionLeaderState, RegionRoleState};
29
30#[derive(Debug)]
32pub(crate) struct Checkpointer {
33 manifest_options: RegionManifestOptions,
34 inner: Arc<Inner>,
35}
36
37#[derive(Debug)]
38struct Inner {
39 region_id: RegionId,
40 manifest_store: ManifestObjectStore,
41 last_checkpoint_version: AtomicU64,
42 is_doing_checkpoint: AtomicBool,
43}
44
45impl Inner {
46 async fn do_checkpoint(&self, checkpoint: RegionCheckpoint) {
47 let _guard = scopeguard::guard(&self.is_doing_checkpoint, |x| {
48 x.store(false, Ordering::Relaxed);
49 });
50
51 let _t = MANIFEST_OP_ELAPSED
52 .with_label_values(&["checkpoint"])
53 .start_timer();
54
55 let region_id = self.region_id();
56 let version = checkpoint.last_version();
57 let checkpoint = match checkpoint.encode() {
58 Ok(checkpoint) => checkpoint,
59 Err(e) => {
60 error!(e; "Failed to encode checkpoint {:?}", checkpoint);
61 return;
62 }
63 };
64 if let Err(e) = self
65 .manifest_store
66 .save_checkpoint(version, &checkpoint)
67 .await
68 {
69 error!(e; "Failed to save checkpoint for region {}", region_id);
70 return;
71 }
72
73 if let Err(e) = self.manifest_store.delete_until(version, true).await {
74 error!(e; "Failed to delete manifest actions until version {} for region {}", version, region_id);
75 return;
76 }
77
78 self.last_checkpoint_version
79 .store(version, Ordering::Relaxed);
80
81 info!(
82 "Checkpoint for region {} success, version: {}",
83 region_id, version
84 );
85 }
86
87 fn region_id(&self) -> RegionId {
88 self.region_id
89 }
90
91 fn is_doing_checkpoint(&self) -> bool {
92 self.is_doing_checkpoint.load(Ordering::Relaxed)
93 }
94
95 fn set_doing_checkpoint(&self) {
96 self.is_doing_checkpoint.store(true, Ordering::Relaxed);
97 }
98}
99
100impl Checkpointer {
101 pub(crate) fn new(
102 region_id: RegionId,
103 manifest_options: RegionManifestOptions,
104 manifest_store: ManifestObjectStore,
105 last_checkpoint_version: ManifestVersion,
106 ) -> Self {
107 Self {
108 manifest_options,
109 inner: Arc::new(Inner {
110 region_id,
111 manifest_store,
112 last_checkpoint_version: AtomicU64::new(last_checkpoint_version),
113 is_doing_checkpoint: AtomicBool::new(false),
114 }),
115 }
116 }
117
118 pub(crate) fn last_checkpoint_version(&self) -> ManifestVersion {
119 self.inner.last_checkpoint_version.load(Ordering::Relaxed)
120 }
121
122 pub(crate) fn update_manifest_removed_files(
125 &self,
126 mut manifest: RegionManifest,
127 ) -> Result<RegionManifest> {
128 let opt = &self.manifest_options.remove_file_options;
129
130 manifest.removed_files.evict_old_removed_files(opt)?;
131
132 Ok(manifest)
133 }
134
135 pub(crate) fn maybe_do_checkpoint(
139 &self,
140 manifest: &RegionManifest,
141 region_state: RegionRoleState,
142 ) {
143 if region_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
145 info!(
146 "Skipping checkpoint for region {} in staging mode, manifest version: {}",
147 manifest.metadata.region_id, manifest.manifest_version
148 );
149 return;
150 }
151
152 if self.manifest_options.checkpoint_distance == 0 {
153 return;
154 }
155
156 let last_checkpoint_version = self.last_checkpoint_version();
157 if manifest.manifest_version - last_checkpoint_version
158 < self.manifest_options.checkpoint_distance
159 {
160 return;
161 }
162
163 if self.inner.is_doing_checkpoint() {
166 return;
167 }
168
169 let start_version = if last_checkpoint_version == 0 {
170 MIN_VERSION
173 } else {
174 last_checkpoint_version + 1
175 };
176 let end_version = manifest.manifest_version;
177 info!(
178 "Start doing checkpoint for region {}, compacted version: [{}, {}]",
179 self.inner.region_id(),
180 start_version,
181 end_version,
182 );
183
184 let checkpoint = RegionCheckpoint {
185 last_version: end_version,
186 compacted_actions: (end_version - start_version + 1) as usize,
187 checkpoint: Some(manifest.clone()),
188 };
189 self.do_checkpoint(checkpoint);
190 }
191
192 fn do_checkpoint(&self, checkpoint: RegionCheckpoint) {
193 self.inner.set_doing_checkpoint();
194
195 let inner = self.inner.clone();
196 common_runtime::spawn_global(async move {
197 inner.do_checkpoint(checkpoint).await;
198 });
199 }
200
201 #[cfg(test)]
202 pub(crate) fn is_doing_checkpoint(&self) -> bool {
203 self.inner.is_doing_checkpoint()
204 }
205}