mito2/manifest/
checkpointer.rs1use std::fmt::Debug;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18
19use common_telemetry::{error, info};
20use store_api::storage::RegionId;
21use store_api::{MIN_VERSION, ManifestVersion};
22
23use crate::error::Result;
24use crate::manifest::action::{RegionCheckpoint, RegionManifest};
25use crate::manifest::manager::RegionManifestOptions;
26use crate::manifest::storage::ManifestObjectStore;
27use crate::metrics::MANIFEST_OP_ELAPSED;
28use crate::region::{RegionLeaderState, RegionRoleState};
29
30#[derive(Debug)]
32pub(crate) struct Checkpointer {
33 manifest_options: RegionManifestOptions,
34 inner: Arc<Inner>,
35}
36
37#[derive(Debug)]
38struct Inner {
39 region_id: RegionId,
40 manifest_store: ManifestObjectStore,
41 last_checkpoint_version: AtomicU64,
42 is_doing_checkpoint: AtomicBool,
43}
44
45impl Inner {
46 async fn do_checkpoint(&self, checkpoint: RegionCheckpoint) {
47 let _guard = scopeguard::guard(&self.is_doing_checkpoint, |x| {
48 x.store(false, Ordering::Relaxed);
49 });
50
51 let _t = MANIFEST_OP_ELAPSED
52 .with_label_values(&["checkpoint"])
53 .start_timer();
54
55 let region_id = self.region_id();
56 let version = checkpoint.last_version();
57 let checkpoint = match checkpoint.encode() {
58 Ok(checkpoint) => checkpoint,
59 Err(e) => {
60 error!(e; "Failed to encode checkpoint {:?}", checkpoint);
61 return;
62 }
63 };
64 if let Err(e) = self
65 .manifest_store
66 .save_checkpoint(version, &checkpoint)
67 .await
68 {
69 error!(e; "Failed to save checkpoint for region {}", region_id);
70 return;
71 }
72
73 if let Err(e) = self.manifest_store.delete_until(version, true).await {
74 error!(e; "Failed to delete manifest actions until version {} for region {}", version, region_id);
75 return;
76 }
77
78 self.last_checkpoint_version
79 .store(version, Ordering::Relaxed);
80
81 info!(
82 "Checkpoint for region {} success, version: {}",
83 region_id, version
84 );
85 }
86
87 fn region_id(&self) -> RegionId {
88 self.region_id
89 }
90
91 fn is_doing_checkpoint(&self) -> bool {
92 self.is_doing_checkpoint.load(Ordering::Relaxed)
93 }
94
95 fn set_doing_checkpoint(&self) {
96 self.is_doing_checkpoint.store(true, Ordering::Relaxed);
97 }
98}
99
100impl Checkpointer {
101 pub(crate) fn new(
102 region_id: RegionId,
103 manifest_options: RegionManifestOptions,
104 manifest_store: ManifestObjectStore,
105 last_checkpoint_version: ManifestVersion,
106 ) -> Self {
107 Self {
108 manifest_options,
109 inner: Arc::new(Inner {
110 region_id,
111 manifest_store,
112 last_checkpoint_version: AtomicU64::new(last_checkpoint_version),
113 is_doing_checkpoint: AtomicBool::new(false),
114 }),
115 }
116 }
117
118 pub(crate) fn last_checkpoint_version(&self) -> ManifestVersion {
119 self.inner.last_checkpoint_version.load(Ordering::Relaxed)
120 }
121
122 pub(crate) fn update_manifest_removed_files(
125 &self,
126 mut manifest: RegionManifest,
127 ) -> Result<RegionManifest> {
128 let opt = &self.manifest_options.remove_file_options;
129
130 manifest.removed_files.evict_old_removed_files(opt)?;
131
132 Ok(manifest)
135 }
136
137 pub(crate) fn maybe_do_checkpoint(
141 &self,
142 manifest: &RegionManifest,
143 region_state: RegionRoleState,
144 ) {
145 if region_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
147 info!(
148 "Skipping checkpoint for region {} in staging mode, manifest version: {}",
149 manifest.metadata.region_id, manifest.manifest_version
150 );
151 return;
152 }
153
154 if self.manifest_options.checkpoint_distance == 0 {
155 return;
156 }
157
158 let last_checkpoint_version = self.last_checkpoint_version();
159 if manifest.manifest_version - last_checkpoint_version
160 < self.manifest_options.checkpoint_distance
161 {
162 return;
163 }
164
165 if self.inner.is_doing_checkpoint() {
168 return;
169 }
170
171 let start_version = if last_checkpoint_version == 0 {
172 MIN_VERSION
175 } else {
176 last_checkpoint_version + 1
177 };
178 let end_version = manifest.manifest_version;
179 info!(
180 "Start doing checkpoint for region {}, compacted version: [{}, {}]",
181 self.inner.region_id(),
182 start_version,
183 end_version,
184 );
185
186 let checkpoint = RegionCheckpoint {
187 last_version: end_version,
188 compacted_actions: (end_version - start_version + 1) as usize,
189 checkpoint: Some(manifest.clone()),
190 };
191 self.do_checkpoint(checkpoint);
192 }
193
194 fn do_checkpoint(&self, checkpoint: RegionCheckpoint) {
195 self.inner.set_doing_checkpoint();
196
197 let inner = self.inner.clone();
198 common_runtime::spawn_global(async move {
199 inner.do_checkpoint(checkpoint).await;
200 });
201 }
202
203 #[cfg(test)]
204 pub(crate) fn is_doing_checkpoint(&self) -> bool {
205 self.inner.is_doing_checkpoint()
206 }
207}