mito2/manifest/storage/
checkpoint.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use common_datasource::compression::CompressionType;
19use common_telemetry::debug;
20use object_store::{ErrorKind, ObjectStore};
21use serde::{Deserialize, Serialize};
22use snafu::ResultExt;
23use store_api::ManifestVersion;
24
25use crate::cache::manifest_cache::ManifestCache;
26use crate::error::{
27    CompressObjectSnafu, DecompressObjectSnafu, OpenDalSnafu, Result, SerdeJsonSnafu, Utf8Snafu,
28};
29use crate::manifest::storage::size_tracker::Tracker;
30use crate::manifest::storage::utils::{get_from_cache, put_to_cache, write_and_put_cache};
31use crate::manifest::storage::{
32    FALL_BACK_COMPRESS_TYPE, LAST_CHECKPOINT_FILE, checkpoint_checksum, checkpoint_file, gen_path,
33    verify_checksum,
34};
35
36#[derive(Serialize, Deserialize, Debug)]
37pub(crate) struct CheckpointMetadata {
38    pub size: usize,
39    /// The latest version this checkpoint contains.
40    pub version: ManifestVersion,
41    pub checksum: Option<u32>,
42    pub extend_metadata: HashMap<String, String>,
43}
44
45impl CheckpointMetadata {
46    fn encode(&self) -> Result<Vec<u8>> {
47        Ok(serde_json::to_string(self)
48            .context(SerdeJsonSnafu)?
49            .into_bytes())
50    }
51
52    fn decode(bs: &[u8]) -> Result<Self> {
53        let data = std::str::from_utf8(bs).context(Utf8Snafu)?;
54
55        serde_json::from_str(data).context(SerdeJsonSnafu)
56    }
57}
58
59/// Handle checkpoint storage operations.
60#[derive(Debug, Clone)]
61pub(crate) struct CheckpointStorage<T: Tracker> {
62    object_store: ObjectStore,
63    compress_type: CompressionType,
64    path: String,
65    manifest_cache: Option<ManifestCache>,
66    size_tracker: Arc<T>,
67}
68
69impl<T: Tracker> CheckpointStorage<T> {
70    pub fn new(
71        path: String,
72        object_store: ObjectStore,
73        compress_type: CompressionType,
74        manifest_cache: Option<ManifestCache>,
75        size_tracker: Arc<T>,
76    ) -> Self {
77        Self {
78            object_store,
79            compress_type,
80            path,
81            manifest_cache,
82            size_tracker,
83        }
84    }
85
86    /// Returns the last checkpoint path, because the last checkpoint is not compressed,
87    /// so its path name has nothing to do with the compression algorithm used by `ManifestObjectStore`
88    pub(crate) fn last_checkpoint_path(&self) -> String {
89        format!("{}{}", self.path, LAST_CHECKPOINT_FILE)
90    }
91
92    /// Returns the checkpoint file path under the **current** compression algorithm
93    fn checkpoint_file_path(&self, version: ManifestVersion) -> String {
94        gen_path(&self.path, &checkpoint_file(version), self.compress_type)
95    }
96
97    pub(crate) async fn load_checkpoint(
98        &mut self,
99        metadata: CheckpointMetadata,
100    ) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
101        let version = metadata.version;
102        let path = self.checkpoint_file_path(version);
103
104        // Try to get from cache first
105        if let Some(data) = get_from_cache(self.manifest_cache.as_ref(), &path).await {
106            verify_checksum(&data, metadata.checksum)?;
107            return Ok(Some((version, data)));
108        }
109
110        // Due to backward compatibility, it is possible that the user's checkpoint not compressed,
111        // so if we don't find file by compressed type. fall back to checkpoint not compressed find again.
112        let checkpoint_data = match self.object_store.read(&path).await {
113            Ok(checkpoint) => {
114                let checkpoint_size = checkpoint.len();
115                let decompress_data =
116                    self.compress_type
117                        .decode(checkpoint)
118                        .await
119                        .with_context(|_| DecompressObjectSnafu {
120                            compress_type: self.compress_type,
121                            path: path.clone(),
122                        })?;
123                verify_checksum(&decompress_data, metadata.checksum)?;
124                // set the checkpoint size
125                self.size_tracker.record(version, checkpoint_size as u64);
126                // Add to cache
127                put_to_cache(self.manifest_cache.as_ref(), path, &decompress_data).await;
128                Ok(Some(decompress_data))
129            }
130            Err(e) => {
131                if e.kind() == ErrorKind::NotFound {
132                    if self.compress_type != FALL_BACK_COMPRESS_TYPE {
133                        let fall_back_path = gen_path(
134                            &self.path,
135                            &checkpoint_file(version),
136                            FALL_BACK_COMPRESS_TYPE,
137                        );
138                        debug!(
139                            "Failed to load checkpoint from path: {}, fall back to path: {}",
140                            path, fall_back_path
141                        );
142
143                        // Try to get fallback from cache first
144                        if let Some(data) =
145                            get_from_cache(self.manifest_cache.as_ref(), &fall_back_path).await
146                        {
147                            verify_checksum(&data, metadata.checksum)?;
148                            return Ok(Some((version, data)));
149                        }
150
151                        match self.object_store.read(&fall_back_path).await {
152                            Ok(checkpoint) => {
153                                let checkpoint_size = checkpoint.len();
154                                let decompress_data = FALL_BACK_COMPRESS_TYPE
155                                    .decode(checkpoint)
156                                    .await
157                                    .with_context(|_| DecompressObjectSnafu {
158                                        compress_type: FALL_BACK_COMPRESS_TYPE,
159                                        path: fall_back_path.clone(),
160                                    })?;
161                                verify_checksum(&decompress_data, metadata.checksum)?;
162                                self.size_tracker.record(version, checkpoint_size as u64);
163                                // Add fallback to cache
164                                put_to_cache(
165                                    self.manifest_cache.as_ref(),
166                                    fall_back_path,
167                                    &decompress_data,
168                                )
169                                .await;
170                                Ok(Some(decompress_data))
171                            }
172                            Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
173                            Err(e) => return Err(e).context(OpenDalSnafu),
174                        }
175                    } else {
176                        Ok(None)
177                    }
178                } else {
179                    Err(e).context(OpenDalSnafu)
180                }
181            }
182        }?;
183        Ok(checkpoint_data.map(|data| (version, data)))
184    }
185
186    /// Load the latest checkpoint.
187    /// Return manifest version and the raw [RegionCheckpoint](crate::manifest::action::RegionCheckpoint) content if any
188    pub async fn load_last_checkpoint(&mut self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
189        let last_checkpoint_path = self.last_checkpoint_path();
190
191        // Fetch from remote object store without cache
192        let last_checkpoint_data = match self.object_store.read(&last_checkpoint_path).await {
193            Ok(data) => data.to_vec(),
194            Err(e) if e.kind() == ErrorKind::NotFound => {
195                return Ok(None);
196            }
197            Err(e) => {
198                return Err(e).context(OpenDalSnafu)?;
199            }
200        };
201
202        let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data)?;
203
204        debug!(
205            "Load checkpoint in path: {}, metadata: {:?}",
206            last_checkpoint_path, checkpoint_metadata
207        );
208
209        self.load_checkpoint(checkpoint_metadata).await
210    }
211
212    /// Save the checkpoint manifest file.
213    pub(crate) async fn save_checkpoint(
214        &self,
215        version: ManifestVersion,
216        bytes: &[u8],
217    ) -> Result<()> {
218        let path = self.checkpoint_file_path(version);
219        let data = self
220            .compress_type
221            .encode(bytes)
222            .await
223            .context(CompressObjectSnafu {
224                compress_type: self.compress_type,
225                path: &path,
226            })?;
227        let checkpoint_size = data.len();
228        let checksum = checkpoint_checksum(bytes);
229
230        write_and_put_cache(
231            &self.object_store,
232            self.manifest_cache.as_ref(),
233            &path,
234            data,
235        )
236        .await?;
237        self.size_tracker.record(version, checkpoint_size as u64);
238
239        // Because last checkpoint file only contain size and version, which is tiny, so we don't compress it.
240        let last_checkpoint_path = self.last_checkpoint_path();
241
242        let checkpoint_metadata = CheckpointMetadata {
243            size: bytes.len(),
244            version,
245            checksum: Some(checksum),
246            extend_metadata: HashMap::new(),
247        };
248
249        debug!(
250            "Save checkpoint in path: {},  metadata: {:?}",
251            last_checkpoint_path, checkpoint_metadata
252        );
253
254        let bytes = checkpoint_metadata.encode()?;
255        self.object_store
256            .write(&last_checkpoint_path, bytes)
257            .await
258            .context(OpenDalSnafu)?;
259
260        Ok(())
261    }
262}
263
264#[cfg(test)]
265impl<T: Tracker> CheckpointStorage<T> {
266    pub async fn write_last_checkpoint(
267        &self,
268        version: ManifestVersion,
269        bytes: &[u8],
270    ) -> Result<()> {
271        let path = self.checkpoint_file_path(version);
272        let data = self
273            .compress_type
274            .encode(bytes)
275            .await
276            .context(CompressObjectSnafu {
277                compress_type: self.compress_type,
278                path: &path,
279            })?;
280
281        let checkpoint_size = data.len();
282
283        self.object_store
284            .write(&path, data)
285            .await
286            .context(OpenDalSnafu)?;
287
288        self.size_tracker.record(version, checkpoint_size as u64);
289
290        let last_checkpoint_path = self.last_checkpoint_path();
291        let checkpoint_metadata = CheckpointMetadata {
292            size: bytes.len(),
293            version,
294            checksum: Some(1218259706),
295            extend_metadata: HashMap::new(),
296        };
297
298        debug!(
299            "Rewrite checkpoint in path: {},  metadata: {:?}",
300            last_checkpoint_path, checkpoint_metadata
301        );
302
303        let bytes = checkpoint_metadata.encode()?;
304
305        // Overwrite the last checkpoint with the modified content
306        self.object_store
307            .write(&last_checkpoint_path, bytes.clone())
308            .await
309            .context(OpenDalSnafu)?;
310        Ok(())
311    }
312
313    pub fn set_compress_type(&mut self, compress_type: CompressionType) {
314        self.compress_type = compress_type;
315    }
316}