mito2/manifest/
checkpointer.rs1use std::fmt::Debug;
16use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
17use std::sync::Arc;
18
19use common_telemetry::{error, info};
20use store_api::manifest::{ManifestVersion, MIN_VERSION};
21use store_api::storage::RegionId;
22
23use crate::manifest::action::{RegionCheckpoint, RegionManifest};
24use crate::manifest::manager::RegionManifestOptions;
25use crate::manifest::storage::ManifestObjectStore;
26use crate::metrics::MANIFEST_OP_ELAPSED;
27
28#[derive(Debug)]
30pub(crate) struct Checkpointer {
31 manifest_options: RegionManifestOptions,
32 inner: Arc<Inner>,
33}
34
35#[derive(Debug)]
36struct Inner {
37 region_id: RegionId,
38 manifest_store: ManifestObjectStore,
39 last_checkpoint_version: AtomicU64,
40 is_doing_checkpoint: AtomicBool,
41}
42
43impl Inner {
44 async fn do_checkpoint(&self, checkpoint: RegionCheckpoint) {
45 let _guard = scopeguard::guard(&self.is_doing_checkpoint, |x| {
46 x.store(false, Ordering::Relaxed);
47 });
48
49 let _t = MANIFEST_OP_ELAPSED
50 .with_label_values(&["checkpoint"])
51 .start_timer();
52
53 let region_id = self.region_id();
54 let version = checkpoint.last_version();
55 let checkpoint = match checkpoint.encode() {
56 Ok(checkpoint) => checkpoint,
57 Err(e) => {
58 error!(e; "Failed to encode checkpoint {:?}", checkpoint);
59 return;
60 }
61 };
62 if let Err(e) = self
63 .manifest_store
64 .save_checkpoint(version, &checkpoint)
65 .await
66 {
67 error!(e; "Failed to save checkpoint for region {}", region_id);
68 return;
69 }
70
71 if let Err(e) = self.manifest_store.delete_until(version, true).await {
72 error!(e; "Failed to delete manifest actions until version {} for region {}", version, region_id);
73 return;
74 }
75
76 self.last_checkpoint_version
77 .store(version, Ordering::Relaxed);
78
79 info!(
80 "Checkpoint for region {} success, version: {}",
81 region_id, version
82 );
83 }
84
85 fn region_id(&self) -> RegionId {
86 self.region_id
87 }
88
89 fn is_doing_checkpoint(&self) -> bool {
90 self.is_doing_checkpoint.load(Ordering::Relaxed)
91 }
92
93 fn set_doing_checkpoint(&self) {
94 self.is_doing_checkpoint.store(true, Ordering::Relaxed);
95 }
96}
97
98impl Checkpointer {
99 pub(crate) fn new(
100 region_id: RegionId,
101 manifest_options: RegionManifestOptions,
102 manifest_store: ManifestObjectStore,
103 last_checkpoint_version: ManifestVersion,
104 ) -> Self {
105 Self {
106 manifest_options,
107 inner: Arc::new(Inner {
108 region_id,
109 manifest_store,
110 last_checkpoint_version: AtomicU64::new(last_checkpoint_version),
111 is_doing_checkpoint: AtomicBool::new(false),
112 }),
113 }
114 }
115
116 pub(crate) fn last_checkpoint_version(&self) -> ManifestVersion {
117 self.inner.last_checkpoint_version.load(Ordering::Relaxed)
118 }
119
120 pub(crate) fn maybe_do_checkpoint(&self, manifest: &RegionManifest) {
124 if self.manifest_options.checkpoint_distance == 0 {
125 return;
126 }
127
128 let last_checkpoint_version = self.last_checkpoint_version();
129 if manifest.manifest_version - last_checkpoint_version
130 < self.manifest_options.checkpoint_distance
131 {
132 return;
133 }
134
135 if self.inner.is_doing_checkpoint() {
138 return;
139 }
140
141 let start_version = if last_checkpoint_version == 0 {
142 MIN_VERSION
145 } else {
146 last_checkpoint_version + 1
147 };
148 let end_version = manifest.manifest_version;
149 info!(
150 "Start doing checkpoint for region {}, compacted version: [{}, {}]",
151 self.inner.region_id(),
152 start_version,
153 end_version,
154 );
155
156 let checkpoint = RegionCheckpoint {
157 last_version: end_version,
158 compacted_actions: (end_version - start_version + 1) as usize,
159 checkpoint: Some(manifest.clone()),
160 };
161 self.do_checkpoint(checkpoint);
162 }
163
164 fn do_checkpoint(&self, checkpoint: RegionCheckpoint) {
165 self.inner.set_doing_checkpoint();
166
167 let inner = self.inner.clone();
168 common_runtime::spawn_global(async move {
169 inner.do_checkpoint(checkpoint).await;
170 });
171 }
172
173 #[cfg(test)]
174 pub(crate) fn is_doing_checkpoint(&self) -> bool {
175 self.inner.is_doing_checkpoint()
176 }
177}