mito2/manifest/
checkpointer.rs1use std::fmt::Debug;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18
19use common_telemetry::{error, info, warn};
20use store_api::storage::RegionId;
21use store_api::{MIN_VERSION, ManifestVersion};
22
23use crate::error::Result;
24use crate::manifest::action::{RegionCheckpoint, RegionManifest};
25use crate::manifest::manager::RegionManifestOptions;
26use crate::manifest::storage::ManifestObjectStore;
27use crate::metrics::MANIFEST_OP_ELAPSED;
28
29#[derive(Debug)]
31pub(crate) struct Checkpointer {
32 manifest_options: RegionManifestOptions,
33 inner: Arc<Inner>,
34}
35
36#[derive(Debug)]
37struct Inner {
38 region_id: RegionId,
39 manifest_store: ManifestObjectStore,
40 last_checkpoint_version: AtomicU64,
41 is_doing_checkpoint: AtomicBool,
42}
43
44impl Inner {
45 async fn do_checkpoint(&self, checkpoint: RegionCheckpoint) {
46 let _guard = scopeguard::guard(&self.is_doing_checkpoint, |x| {
47 x.store(false, Ordering::Relaxed);
48 });
49
50 let _t = MANIFEST_OP_ELAPSED
51 .with_label_values(&["checkpoint"])
52 .start_timer();
53
54 let region_id = self.region_id();
55 let version = checkpoint.last_version();
56 let checkpoint = match checkpoint.encode() {
57 Ok(checkpoint) => checkpoint,
58 Err(e) => {
59 error!(e; "Failed to encode checkpoint {:?}", checkpoint);
60 return;
61 }
62 };
63 if let Err(e) = self
64 .manifest_store
65 .save_checkpoint(version, &checkpoint)
66 .await
67 {
68 error!(e; "Failed to save checkpoint for region {}", region_id);
69 return;
70 }
71
72 self.last_checkpoint_version
78 .store(version, Ordering::Relaxed);
79
80 if let Err(e) = self.manifest_store.delete_until(version, true).await {
81 warn!(e; "Failed to delete manifest actions until version {} for region {}, leftover files will be ignored on recovery", version, region_id);
82 }
83
84 info!(
85 "Checkpoint for region {} success, version: {}",
86 region_id, version
87 );
88 }
89
90 fn region_id(&self) -> RegionId {
91 self.region_id
92 }
93
94 fn is_doing_checkpoint(&self) -> bool {
95 self.is_doing_checkpoint.load(Ordering::Relaxed)
96 }
97
98 fn set_doing_checkpoint(&self) {
99 self.is_doing_checkpoint.store(true, Ordering::Relaxed);
100 }
101}
102
103impl Checkpointer {
104 pub(crate) fn new(
105 region_id: RegionId,
106 manifest_options: RegionManifestOptions,
107 manifest_store: ManifestObjectStore,
108 last_checkpoint_version: ManifestVersion,
109 ) -> Self {
110 Self {
111 manifest_options,
112 inner: Arc::new(Inner {
113 region_id,
114 manifest_store,
115 last_checkpoint_version: AtomicU64::new(last_checkpoint_version),
116 is_doing_checkpoint: AtomicBool::new(false),
117 }),
118 }
119 }
120
121 pub(crate) fn last_checkpoint_version(&self) -> ManifestVersion {
122 self.inner.last_checkpoint_version.load(Ordering::Relaxed)
123 }
124
125 pub(crate) fn update_manifest_removed_files(
128 &self,
129 mut manifest: RegionManifest,
130 ) -> Result<RegionManifest> {
131 let opt = &self.manifest_options.remove_file_options;
132
133 manifest.removed_files.evict_old_removed_files(opt)?;
134
135 Ok(manifest)
138 }
139
140 pub(crate) fn maybe_do_checkpoint(&self, manifest: &RegionManifest) {
144 if self.manifest_options.checkpoint_distance == 0 {
145 return;
146 }
147
148 let last_checkpoint_version = self.last_checkpoint_version();
149 if manifest.manifest_version - last_checkpoint_version
150 < self.manifest_options.checkpoint_distance
151 {
152 return;
153 }
154
155 if self.inner.is_doing_checkpoint() {
158 return;
159 }
160
161 let start_version = if last_checkpoint_version == 0 {
162 MIN_VERSION
165 } else {
166 last_checkpoint_version + 1
167 };
168 let end_version = manifest.manifest_version;
169 info!(
170 "Start doing checkpoint for region {}, compacted version: [{}, {}]",
171 self.inner.region_id(),
172 start_version,
173 end_version,
174 );
175
176 let checkpoint = RegionCheckpoint {
177 last_version: end_version,
178 compacted_actions: (end_version - start_version + 1) as usize,
179 checkpoint: Some(manifest.clone()),
180 };
181 self.do_checkpoint(checkpoint);
182 }
183
184 fn do_checkpoint(&self, checkpoint: RegionCheckpoint) {
185 self.inner.set_doing_checkpoint();
186
187 let inner = self.inner.clone();
188 common_runtime::spawn_global(async move {
189 inner.do_checkpoint(checkpoint).await;
190 });
191 }
192
193 #[cfg(test)]
194 pub(crate) fn is_doing_checkpoint(&self) -> bool {
195 self.inner.is_doing_checkpoint()
196 }
197}