1use std::collections::HashMap;
16use std::sync::Arc;
17
18use common_datasource::compression::CompressionType;
19use common_telemetry::debug;
20use object_store::{ErrorKind, ObjectStore};
21use serde::{Deserialize, Serialize};
22use snafu::ResultExt;
23use store_api::ManifestVersion;
24
25use crate::cache::manifest_cache::ManifestCache;
26use crate::error::{
27 CompressObjectSnafu, DecompressObjectSnafu, OpenDalSnafu, Result, SerdeJsonSnafu, Utf8Snafu,
28};
29use crate::manifest::storage::size_tracker::Tracker;
30use crate::manifest::storage::utils::{get_from_cache, put_to_cache, write_and_put_cache};
31use crate::manifest::storage::{
32 FALL_BACK_COMPRESS_TYPE, LAST_CHECKPOINT_FILE, checkpoint_checksum, checkpoint_file, gen_path,
33 verify_checksum,
34};
35
36#[derive(Serialize, Deserialize, Debug)]
37pub(crate) struct CheckpointMetadata {
38 pub size: usize,
39 pub version: ManifestVersion,
41 pub checksum: Option<u32>,
42 pub extend_metadata: HashMap<String, String>,
43}
44
45impl CheckpointMetadata {
46 fn encode(&self) -> Result<Vec<u8>> {
47 Ok(serde_json::to_string(self)
48 .context(SerdeJsonSnafu)?
49 .into_bytes())
50 }
51
52 fn decode(bs: &[u8]) -> Result<Self> {
53 let data = std::str::from_utf8(bs).context(Utf8Snafu)?;
54
55 serde_json::from_str(data).context(SerdeJsonSnafu)
56 }
57}
58
59#[derive(Debug, Clone)]
61pub(crate) struct CheckpointStorage<T: Tracker> {
62 object_store: ObjectStore,
63 compress_type: CompressionType,
64 path: String,
65 manifest_cache: Option<ManifestCache>,
66 size_tracker: Arc<T>,
67}
68
69impl<T: Tracker> CheckpointStorage<T> {
70 pub fn new(
71 path: String,
72 object_store: ObjectStore,
73 compress_type: CompressionType,
74 manifest_cache: Option<ManifestCache>,
75 size_tracker: Arc<T>,
76 ) -> Self {
77 Self {
78 object_store,
79 compress_type,
80 path,
81 manifest_cache,
82 size_tracker,
83 }
84 }
85
86 pub(crate) fn last_checkpoint_path(&self) -> String {
89 format!("{}{}", self.path, LAST_CHECKPOINT_FILE)
90 }
91
92 fn checkpoint_file_path(&self, version: ManifestVersion) -> String {
94 gen_path(&self.path, &checkpoint_file(version), self.compress_type)
95 }
96
97 pub(crate) async fn load_checkpoint(
98 &mut self,
99 metadata: CheckpointMetadata,
100 ) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
101 let version = metadata.version;
102 let path = self.checkpoint_file_path(version);
103
104 if let Some(data) = get_from_cache(self.manifest_cache.as_ref(), &path).await {
106 verify_checksum(&data, metadata.checksum)?;
107 return Ok(Some((version, data)));
108 }
109
110 let checkpoint_data = match self.object_store.read(&path).await {
113 Ok(checkpoint) => {
114 let checkpoint_size = checkpoint.len();
115 let decompress_data =
116 self.compress_type
117 .decode(checkpoint)
118 .await
119 .with_context(|_| DecompressObjectSnafu {
120 compress_type: self.compress_type,
121 path: path.clone(),
122 })?;
123 verify_checksum(&decompress_data, metadata.checksum)?;
124 self.size_tracker.record(version, checkpoint_size as u64);
126 put_to_cache(self.manifest_cache.as_ref(), path, &decompress_data).await;
128 Ok(Some(decompress_data))
129 }
130 Err(e) => {
131 if e.kind() == ErrorKind::NotFound {
132 if self.compress_type != FALL_BACK_COMPRESS_TYPE {
133 let fall_back_path = gen_path(
134 &self.path,
135 &checkpoint_file(version),
136 FALL_BACK_COMPRESS_TYPE,
137 );
138 debug!(
139 "Failed to load checkpoint from path: {}, fall back to path: {}",
140 path, fall_back_path
141 );
142
143 if let Some(data) =
145 get_from_cache(self.manifest_cache.as_ref(), &fall_back_path).await
146 {
147 verify_checksum(&data, metadata.checksum)?;
148 return Ok(Some((version, data)));
149 }
150
151 match self.object_store.read(&fall_back_path).await {
152 Ok(checkpoint) => {
153 let checkpoint_size = checkpoint.len();
154 let decompress_data = FALL_BACK_COMPRESS_TYPE
155 .decode(checkpoint)
156 .await
157 .with_context(|_| DecompressObjectSnafu {
158 compress_type: FALL_BACK_COMPRESS_TYPE,
159 path: fall_back_path.clone(),
160 })?;
161 verify_checksum(&decompress_data, metadata.checksum)?;
162 self.size_tracker.record(version, checkpoint_size as u64);
163 put_to_cache(
165 self.manifest_cache.as_ref(),
166 fall_back_path,
167 &decompress_data,
168 )
169 .await;
170 Ok(Some(decompress_data))
171 }
172 Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
173 Err(e) => return Err(e).context(OpenDalSnafu),
174 }
175 } else {
176 Ok(None)
177 }
178 } else {
179 Err(e).context(OpenDalSnafu)
180 }
181 }
182 }?;
183 Ok(checkpoint_data.map(|data| (version, data)))
184 }
185
186 pub async fn load_last_checkpoint(&mut self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
189 let last_checkpoint_path = self.last_checkpoint_path();
190
191 let last_checkpoint_data = match self.object_store.read(&last_checkpoint_path).await {
193 Ok(data) => data.to_vec(),
194 Err(e) if e.kind() == ErrorKind::NotFound => {
195 return Ok(None);
196 }
197 Err(e) => {
198 return Err(e).context(OpenDalSnafu)?;
199 }
200 };
201
202 let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data)?;
203
204 debug!(
205 "Load checkpoint in path: {}, metadata: {:?}",
206 last_checkpoint_path, checkpoint_metadata
207 );
208
209 self.load_checkpoint(checkpoint_metadata).await
210 }
211
212 pub(crate) async fn save_checkpoint(
214 &self,
215 version: ManifestVersion,
216 bytes: &[u8],
217 ) -> Result<()> {
218 let path = self.checkpoint_file_path(version);
219 let data = self
220 .compress_type
221 .encode(bytes)
222 .await
223 .context(CompressObjectSnafu {
224 compress_type: self.compress_type,
225 path: &path,
226 })?;
227 let checkpoint_size = data.len();
228 let checksum = checkpoint_checksum(bytes);
229
230 write_and_put_cache(
231 &self.object_store,
232 self.manifest_cache.as_ref(),
233 &path,
234 data,
235 )
236 .await?;
237 self.size_tracker.record(version, checkpoint_size as u64);
238
239 let last_checkpoint_path = self.last_checkpoint_path();
241
242 let checkpoint_metadata = CheckpointMetadata {
243 size: bytes.len(),
244 version,
245 checksum: Some(checksum),
246 extend_metadata: HashMap::new(),
247 };
248
249 debug!(
250 "Save checkpoint in path: {}, metadata: {:?}",
251 last_checkpoint_path, checkpoint_metadata
252 );
253
254 let bytes = checkpoint_metadata.encode()?;
255 self.object_store
256 .write(&last_checkpoint_path, bytes)
257 .await
258 .context(OpenDalSnafu)?;
259
260 Ok(())
261 }
262}
263
264#[cfg(test)]
265impl<T: Tracker> CheckpointStorage<T> {
266 pub async fn write_last_checkpoint(
267 &self,
268 version: ManifestVersion,
269 bytes: &[u8],
270 ) -> Result<()> {
271 let path = self.checkpoint_file_path(version);
272 let data = self
273 .compress_type
274 .encode(bytes)
275 .await
276 .context(CompressObjectSnafu {
277 compress_type: self.compress_type,
278 path: &path,
279 })?;
280
281 let checkpoint_size = data.len();
282
283 self.object_store
284 .write(&path, data)
285 .await
286 .context(OpenDalSnafu)?;
287
288 self.size_tracker.record(version, checkpoint_size as u64);
289
290 let last_checkpoint_path = self.last_checkpoint_path();
291 let checkpoint_metadata = CheckpointMetadata {
292 size: bytes.len(),
293 version,
294 checksum: Some(1218259706),
295 extend_metadata: HashMap::new(),
296 };
297
298 debug!(
299 "Rewrite checkpoint in path: {}, metadata: {:?}",
300 last_checkpoint_path, checkpoint_metadata
301 );
302
303 let bytes = checkpoint_metadata.encode()?;
304
305 self.object_store
307 .write(&last_checkpoint_path, bytes.clone())
308 .await
309 .context(OpenDalSnafu)?;
310 Ok(())
311 }
312
313 pub fn set_compress_type(&mut self, compress_type: CompressionType) {
314 self.compress_type = compress_type;
315 }
316}