mito2/manifest/
checkpointer.rs1use std::fmt::Debug;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18
19use common_telemetry::{error, info};
20use store_api::storage::RegionId;
21use store_api::{MIN_VERSION, ManifestVersion};
22
23use crate::error::Result;
24use crate::manifest::action::{RegionCheckpoint, RegionManifest};
25use crate::manifest::manager::RegionManifestOptions;
26use crate::manifest::storage::ManifestObjectStore;
27use crate::metrics::MANIFEST_OP_ELAPSED;
28
29#[derive(Debug)]
31pub(crate) struct Checkpointer {
32 manifest_options: RegionManifestOptions,
33 inner: Arc<Inner>,
34}
35
36#[derive(Debug)]
37struct Inner {
38 region_id: RegionId,
39 manifest_store: ManifestObjectStore,
40 last_checkpoint_version: AtomicU64,
41 is_doing_checkpoint: AtomicBool,
42}
43
44impl Inner {
45 async fn do_checkpoint(&self, checkpoint: RegionCheckpoint) {
46 let _guard = scopeguard::guard(&self.is_doing_checkpoint, |x| {
47 x.store(false, Ordering::Relaxed);
48 });
49
50 let _t = MANIFEST_OP_ELAPSED
51 .with_label_values(&["checkpoint"])
52 .start_timer();
53
54 let region_id = self.region_id();
55 let version = checkpoint.last_version();
56 let checkpoint = match checkpoint.encode() {
57 Ok(checkpoint) => checkpoint,
58 Err(e) => {
59 error!(e; "Failed to encode checkpoint {:?}", checkpoint);
60 return;
61 }
62 };
63 if let Err(e) = self
64 .manifest_store
65 .save_checkpoint(version, &checkpoint)
66 .await
67 {
68 error!(e; "Failed to save checkpoint for region {}", region_id);
69 return;
70 }
71
72 if let Err(e) = self.manifest_store.delete_until(version, true).await {
73 error!(e; "Failed to delete manifest actions until version {} for region {}", version, region_id);
74 return;
75 }
76
77 self.last_checkpoint_version
78 .store(version, Ordering::Relaxed);
79
80 info!(
81 "Checkpoint for region {} success, version: {}",
82 region_id, version
83 );
84 }
85
86 fn region_id(&self) -> RegionId {
87 self.region_id
88 }
89
90 fn is_doing_checkpoint(&self) -> bool {
91 self.is_doing_checkpoint.load(Ordering::Relaxed)
92 }
93
94 fn set_doing_checkpoint(&self) {
95 self.is_doing_checkpoint.store(true, Ordering::Relaxed);
96 }
97}
98
99impl Checkpointer {
100 pub(crate) fn new(
101 region_id: RegionId,
102 manifest_options: RegionManifestOptions,
103 manifest_store: ManifestObjectStore,
104 last_checkpoint_version: ManifestVersion,
105 ) -> Self {
106 Self {
107 manifest_options,
108 inner: Arc::new(Inner {
109 region_id,
110 manifest_store,
111 last_checkpoint_version: AtomicU64::new(last_checkpoint_version),
112 is_doing_checkpoint: AtomicBool::new(false),
113 }),
114 }
115 }
116
117 pub(crate) fn last_checkpoint_version(&self) -> ManifestVersion {
118 self.inner.last_checkpoint_version.load(Ordering::Relaxed)
119 }
120
121 pub(crate) fn update_manifest_removed_files(
124 &self,
125 mut manifest: RegionManifest,
126 ) -> Result<RegionManifest> {
127 let opt = &self.manifest_options.remove_file_options;
128
129 manifest.removed_files.evict_old_removed_files(opt)?;
130
131 Ok(manifest)
134 }
135
136 pub(crate) fn maybe_do_checkpoint(&self, manifest: &RegionManifest) {
140 if self.manifest_options.checkpoint_distance == 0 {
141 return;
142 }
143
144 let last_checkpoint_version = self.last_checkpoint_version();
145 if manifest.manifest_version - last_checkpoint_version
146 < self.manifest_options.checkpoint_distance
147 {
148 return;
149 }
150
151 if self.inner.is_doing_checkpoint() {
154 return;
155 }
156
157 let start_version = if last_checkpoint_version == 0 {
158 MIN_VERSION
161 } else {
162 last_checkpoint_version + 1
163 };
164 let end_version = manifest.manifest_version;
165 info!(
166 "Start doing checkpoint for region {}, compacted version: [{}, {}]",
167 self.inner.region_id(),
168 start_version,
169 end_version,
170 );
171
172 let checkpoint = RegionCheckpoint {
173 last_version: end_version,
174 compacted_actions: (end_version - start_version + 1) as usize,
175 checkpoint: Some(manifest.clone()),
176 };
177 self.do_checkpoint(checkpoint);
178 }
179
180 fn do_checkpoint(&self, checkpoint: RegionCheckpoint) {
181 self.inner.set_doing_checkpoint();
182
183 let inner = self.inner.clone();
184 common_runtime::spawn_global(async move {
185 inner.do_checkpoint(checkpoint).await;
186 });
187 }
188
189 #[cfg(test)]
190 pub(crate) fn is_doing_checkpoint(&self) -> bool {
191 self.inner.is_doing_checkpoint()
192 }
193}